001/* 002 * Copyright (c) 2012-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33070 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.hadoop.mapreduce; 031 032import java.io.IOException; 033import java.util.List; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.io.IOUtils; 040import org.apache.hadoop.io.SequenceFile; 041import org.apache.hadoop.io.Writable; 042import org.apache.hadoop.io.WritableComparable; 043import org.apache.hadoop.mapreduce.Mapper; 044import org.apache.hadoop.util.ReflectionUtils; 045import org.apache.log4j.Logger; 046import org.kepler.ddp.Utilities; 047import org.kepler.hadoop.execution.KeplerApp4Cross; 048import org.kepler.hadoop.io.KeyValuePair; 049import org.kepler.hadoop.util.StubUtilities; 050 051////////////////////////////////////////////////////////////////////////// 052////Mapper4Cross 053 054/** 055* This class include a mapper for Cross actor. 056* It gets data from hadoop, executes sub-workflow in the cross actor via Kepler engine, 057* and sends outputs back to hadoop. 058* 059* @author Jianwu Wang (jianwu@sdsc.edu) 060* @version $Id: Mapper4Cross.java 33070 2014-11-12 23:21:09Z crawl $ 061*/ 062 063public class Mapper4Cross extends 064 Mapper<WritableComparable, Writable, WritableComparable, Writable> { 065 066 static Logger logger = Logger.getLogger(Mapper4Cross.class.getName()); 067 068 SequenceFile.Reader reader = null; 069 String strPath; 070 KeplerApp4Cross _kepler; 071 FileStatus[] status; 072 073 @Override 074 public void setup(Context context) { 075 strPath = context.getConfiguration() 076 .get(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH); 077 logger.debug(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH + ":" + strPath); 078 //Date now = new Date(); 079 //SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 080 try { 081 FileSystem fs = FileSystem.get(context.getConfiguration()); 082 status = fs.listStatus(new Path(strPath)); 083 //logger.debug("entering into setup() of Mapper4Cross:" 084 // + sdf.format(now)); 085 logger.debug("thread info:" + Thread.currentThread().getId()); 086 final Configuration hadoopConf = context.getConfiguration(); 087 // initialize kepler to run the workflow in the stub 088 StubUtilities.initializeKepler(hadoopConf); 089 // synchronize because concurrent MoMLPaser threads sometimes have 090 // exceptions. 091// synchronized (MoMLParser.class) { 092// MoMLParser parser = new MoMLParser(); 093// parser.resetAll(); 094// MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters()); 095// MoMLParser.addMoMLFilter(new RemoveGraphicalClasses()); 096// // CompositeActor reduceSubWf = 097// // (CompositeActor)parser.parseFile(localFiles[0] + 098// // "/reduceSubFlow.xml"); 099// subWf = (CompositeActor) parser.parse(wfString); 100// } 101 _kepler = new KeplerApp4Cross(hadoopConf); 102 } catch (Exception e) { 103 //System.err 104 // .println("entering into excepion catch() of Reducer4Kepler:" 105 // + sdf.format(now)); 106 System.err.println("thread info:" + Thread.currentThread().getId()); 107 e.printStackTrace(); 108 } 109 } 110 111 @Override 112 public void map(WritableComparable key, Writable value, Context context) throws IOException, 113 InterruptedException { 114 List output = null; 115 logger.debug("key in map of Mapper4Cross:" + key); 116 logger.debug("value in map of Mapper4Cross:" + value); 117 118 for (int i=0; i<status.length; i++){ 119 try { 120 logger.debug("one path from reader:" + status[i].getPath()); 121 reader = new SequenceFile.Reader(context.getConfiguration(), 122 SequenceFile.Reader.file(status[i].getPath())); 123 WritableComparable key2 = (WritableComparable) ReflectionUtils.newInstance( 124 reader.getKeyClass(), context.getConfiguration()); 125 Writable value2 = (Writable) ReflectionUtils.newInstance( 126 reader.getValueClass(), context.getConfiguration()); 127 long position = reader.getPosition(); 128 while (reader.next(key2, value2)) { 129 String syncSeen = reader.syncSeen() ? "*" : ""; 130 position = reader.getPosition(); 131 // cross function can be done here. 132 logger.debug("parameters of a cross function. key1:" 133 + key + ", value1:" + value + ", key2:" + key2 134 + ", value2:" + value2); 135 output = _kepler.executeWf(key, value, key2, value2); 136 if (output != null) { 137 java.util.Iterator outputIt = output.iterator(); 138 while (outputIt.hasNext()) { 139 KeyValuePair oneEle = (KeyValuePair) outputIt 140 .next(); 141 context.write(oneEle.getKey(), oneEle.getValue()); 142 } 143 } 144 } 145 } finally { 146 IOUtils.closeStream(reader); 147 } 148 } 149 150 } 151 152 @Override 153 protected void cleanup(Context context 154 ) throws IOException, InterruptedException { 155 _kepler.cleanup(); 156 _kepler = null; 157 } 158 159}