001/* 002 * Copyright (c) 2012-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33070 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.hadoop.mapreduce; 031 032import java.io.IOException; 033import java.text.SimpleDateFormat; 034import java.util.Date; 035import java.util.List; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.io.IOUtils; 042import org.apache.hadoop.io.SequenceFile; 043import org.apache.hadoop.io.Writable; 044import org.apache.hadoop.io.WritableComparable; 045import org.apache.hadoop.mapreduce.Mapper; 046import org.apache.hadoop.util.ReflectionUtils; 047import org.apache.log4j.Logger; 048import org.kepler.ddp.Utilities; 049import org.kepler.hadoop.execution.KeplerApp4Match; 050import org.kepler.hadoop.io.KeyValuePair; 051import org.kepler.hadoop.util.StubUtilities; 052 053import ptolemy.moml.MoMLFilter; 054import ptolemy.moml.MoMLParser; 055 056////////////////////////////////////////////////////////////////////////// 057////Mapper4Match 058 059/** 060* This class include a mapper for Match actor. 061* It gets data from hadoop, executes sub-workflow in the match actor via Kepler engine, 062* and sends outputs back to hadoop. 063* 064* @author Jianwu Wang (jianwu@sdsc.edu) 065* @version $Id: Mapper4Match.java 33070 2014-11-12 23:21:09Z crawl $ 066*/ 067 068public class Mapper4Match extends 069 Mapper<WritableComparable, Writable, WritableComparable, Writable> { 070 071 static Logger logger = Logger.getLogger(Mapper4Match.class.getName()); 072 073 SequenceFile.Reader[] readers = null; 074 String strPath; 075 KeplerApp4Match _kepler; 076 FileStatus[] status; 077 WritableComparable key2 = null; 078 Writable value2 = null; 079 080 @Override 081 public void setup(Context context) { 082 strPath = context.getConfiguration() 083 .get(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH); 084 logger.debug(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH + ":" + strPath); 085 // Get the cached archives/files 086 Date now = new Date(); 087 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 088 try { 089 FileSystem fs = FileSystem.get(context.getConfiguration()); 090 status = fs.listStatus(new Path(strPath)); 091 // Path[] localFiles = 092 // DistributedCache.getLocalCacheFiles(context.getConfiguration()); 093 logger.debug("entering into setup() of Mapper4Match:" 094 + sdf.format(now)); 095 logger.debug("thread info:" + Thread.currentThread().getId()); 096 final Configuration hadoopConf = context.getConfiguration(); 097 // initialize kepler to run the workflow in the stub 098 StubUtilities.initializeKepler(hadoopConf); 099// String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL); 100// System.err.println(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString); 101// CompositeActor subWf; 102// // synchronize because concurrent MoMLPaser threads sometimes have 103// // exceptions. 104// synchronized (MoMLParser.class) { 105// MoMLParser parser = new MoMLParser(); 106// parser.resetAll(); 107// MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters()); 108// MoMLParser.addMoMLFilter(new RemoveGraphicalClasses()); 109// // CompositeActor reduceSubWf = 110// // (CompositeActor)parser.parseFile(localFiles[0] + 111// // "/reduceSubFlow.xml"); 112// subWf = (CompositeActor) parser.parse(wfString); 113// } 114 _kepler = new KeplerApp4Match(hadoopConf); 115 } catch (Exception e) { 116 System.err 117 .println("entering into excepion catch() of Reducer4Kepler:" 118 + sdf.format(now)); 119 System.err.println("thread info:" + Thread.currentThread().getId()); 120 for (Object filter : MoMLParser.getMoMLFilters()) { 121 System.err.println("one filter: " + ((MoMLFilter) filter)); 122 } 123 e.printStackTrace(); 124 } 125 } 126 127 @Override 128 public void map(WritableComparable key, Writable value, Context context) throws IOException, 129 InterruptedException { 130 List output = null; 131// System.out.println("key in map of Mapper4Match:" + key); 132// System.out.println("value in map of Mapper4Match:" + value); 133 134 if (readers == null) { 135 readers = new SequenceFile.Reader[status.length]; 136 for (int i=0; i<status.length; i++){ 137// System.out.println("one input path for match reader:" + status[i].getPath()); 138 readers[i] = new SequenceFile.Reader(context.getConfiguration(), 139 SequenceFile.Reader.file(status[i].getPath())); 140 } 141 } 142 143 for (int i=0; i<status.length; i++) { 144// System.out.println("one input path for match reader:" + status[i].getPath()); 145 key2 = (WritableComparable) ReflectionUtils.newInstance( 146 readers[i].getKeyClass(), context.getConfiguration()); 147 value2 = (Writable) ReflectionUtils.newInstance( 148 readers[i].getValueClass(), context.getConfiguration()); 149 long position = readers[i].getPosition(); 150// logger.debug("data before start reading. key1:" 151// + key + ", value1:" + value + ", key2:" + key2 152// + ", value2:" + value2 + " at position: " + position); 153 while (readers[i].next(key2, value2)) { 154// logger.debug("new record. key1:" 155// + key + ", value1:" + value + ", key2:" + key2 156// + ", value2:" + value2 + " at position: " + position); 157 if (key.compareTo(key2) < 0) { 158 //since data read by reader is sorted, we do not need to compare it anymore once we found the current key2 is bigger than key1. 159// logger.debug("key1 is smaller than key2. key1:" 160// + key + ", value1:" + value + ", key2:" + key2 161// + ", value2:" + value2 + " at position: " + position); 162 readers[i].seek(position); 163 break; 164 } 165 else { 166 //System.out.println("position where key >= key2: " + position); 167 if (key.compareTo(key2) == 0) { 168 //if key == key2, do not remember the position because the next key could be the same. 169// logger.debug("found same key. key1:" 170// + key + ", value1:" + value + ", key2:" + key2 171// + ", value2:" + value2 + " at position: " + position); 172 output = _kepler.executeWf(key, value, value2); 173 if (output != null) { 174 java.util.Iterator outputIt = output.iterator(); 175 while (outputIt.hasNext()) { 176 KeyValuePair oneEle = (KeyValuePair) outputIt 177 .next(); 178 context.write(oneEle.getKey(), oneEle.getValue()); 179 } 180 } 181 } else { //remember the position if key > key2. 182 position = readers[i].getPosition(); 183 } 184 } 185 } 186 } 187 } 188 189 @Override 190 protected void cleanup(Context context 191 ) throws IOException, InterruptedException { 192 _kepler.cleanup(); 193 _kepler = null; 194 if (readers != null) 195 for (SequenceFile.Reader reader : readers) 196 IOUtils.closeStream(reader); 197 } 198}