001/* 002 * Copyright (c) 2010-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * Permission is hereby granted, without written agreement and without 006 * license or royalty fees, to use, copy, modify, and distribute this 007 * software and its documentation for any purpose, provided that the above 008 * copyright notice and the following two paragraphs appear in all copies 009 * of this software. 010 * 011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 * SUCH DAMAGE. 016 * 017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 * ENHANCEMENTS, OR MODIFICATIONS. 023 * 024 */ 025package org.kepler.hadoop.mapreduce; 026 027import java.io.IOException; 028import java.text.SimpleDateFormat; 029import java.util.Date; 030import java.util.List; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.io.Writable; 034import org.apache.hadoop.io.WritableComparable; 035import org.apache.hadoop.mapreduce.Mapper; 036import org.apache.hadoop.mapreduce.Reducer; 037import org.apache.log4j.Logger; 038import org.kepler.hadoop.execution.KeplerApp4Map; 039import org.kepler.hadoop.execution.KeplerApp4Reduce; 040import org.kepler.hadoop.io.KeyValuePair; 041import org.kepler.hadoop.util.StubUtilities; 042 043////////////////////////////////////////////////////////////////////////// 044////MapReduce4Kepler 045 046/** 047 * This class include Mapper4Kepler and Reducer4Kepler classes which extend 048 * Mapper and Reducer class provided by Hadoop. Through this class, map and 049 * reduce sub-workflow in Map and Reduce Actor will be invoked within Hadoop 050 * infrastructure. 051 * 052 * @author Jianwu Wang (jianwu@sdsc.edu) 053 * @version $Id: MapReduce4Kepler.java 33070 2014-11-12 23:21:09Z crawl $ 054 */ 055 056public class MapReduce4Kepler { 057 058 //private static final Log LOG = LogFactory.getLog(MapReduce4Kepler.class); 059 static Logger logger = Logger.getLogger(MapReduce4Kepler.class.getName()); 060 061 public static class Mapper4Kepler extends 062 Mapper<WritableComparable, Writable, WritableComparable, Writable> { 063 064 private KeplerApp4Map _kepler; 065 private String inputPath; 066 067 @Override 068 public void setup(Context context) { 069// Date now = new Date(); 070// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 071 // Get the cached archives/files 072 try { 073// logger.debug("entering into setup() of Mapper4Kepler:" + sdf.format(now)); 074 logger.debug("thread info:" + Thread.currentThread().getId()); 075 final Configuration hadoopConf = context.getConfiguration(); 076 // initialize kepler to run the workflow in the stub 077 StubUtilities.initializeKepler(hadoopConf); 078// String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL); 079// logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString); 080// //synchronize because concurrent MoMLPaser threads sometimes have exceptions. 081// synchronized (MoMLParser.class) { 082// MoMLParser parser = new MoMLParser(); 083// parser.resetAll(); 084// MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters()); 085// MoMLParser.addMoMLFilter(new RemoveGraphicalClasses()); 086// mapSubWf = (CompositeActor)parser.parse(wfString); 087// } 088 _kepler = new KeplerApp4Map(hadoopConf); 089 090 } catch (Exception e) { 091// System.err.println("entering into excepion catch() of Mapper4Kepler:" + sdf.format(now)); 092 System.err.println("thread info:" + Thread.currentThread().getId()); 093 e.printStackTrace(); 094 } 095 } 096 097 // add parameter value update, useful for distributed cache. 098 // Path[] cachedFiles = new Path[0]; 099 // finish adding parameter value update 100 101 @Override 102 public void map(WritableComparable key, Writable value, Context context) 103 throws IOException, InterruptedException { 104 105// System.out.println("Job ID:" + 106// context.getTaskAttemptID().getJobID()); 107// System.out.println("Task ID:" + 108// context.getTaskAttemptID().getTaskID()); 109// System.out.println("ID:" + context.getTaskAttemptID().getId()); 110// Date now = new Date(); 111// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 112// logger.debug("entering into map() of Mapper4Kepler:" + sdf.format(now)); 113 logger.debug("thread info:" + Thread.currentThread().getId()); 114// System.out.println("key:" + key); 115// System.out.println("value:" + value); 116 List<KeyValuePair> output = _kepler.executeWf(key, value, inputPath, 117 context.getTaskAttemptID().getTaskID().toString()); 118 if (output != null) { 119 java.util.Iterator<KeyValuePair> outputIt = output.iterator(); 120 // LOG.info("execution result for hadoop map function " + 121 // output.size()); 122 123 while (outputIt.hasNext()) { 124 KeyValuePair oneEle = (KeyValuePair) outputIt 125 .next(); 126 context.write(oneEle.getKey(), oneEle.getValue()); 127 } 128 } 129 } 130 131 @Override 132 protected void cleanup(Context context 133 ) throws IOException, InterruptedException { 134 _kepler.cleanup(); 135 _kepler = null; 136 } 137 } 138 139 public static class Reducer4Kepler extends 140 Reducer<WritableComparable, Writable, WritableComparable, Writable> { 141 private KeplerApp4Reduce _kepler; 142 143 @Override 144 public void setup(Context context) { 145 // Get the cached archives/files 146 Date now = new Date(); 147 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 148 try { 149 //Path[] localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 150 logger.debug("entering into setup() of Reducer4Kepler:" + sdf.format(now)); 151 logger.debug("thread info:" + Thread.currentThread().getId()); 152 final Configuration hadoopConf = context.getConfiguration(); 153 // initialize kepler to run the workflow in the stub 154 StubUtilities.initializeKepler(hadoopConf); 155// String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL); 156// logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString); 157// CompositeActor reduceSubWf; 158// //synchronize because concurrent MoMLPaser threads sometimes have exceptions. 159// synchronized (MoMLParser.class) { 160// MoMLParser parser = new MoMLParser(); 161// parser.resetAll(); 162// MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters()); 163// MoMLParser.addMoMLFilter(new RemoveGraphicalClasses()); 164// //CompositeActor reduceSubWf = (CompositeActor)parser.parseFile(localFiles[0] + "/reduceSubFlow.xml"); 165// reduceSubWf = (CompositeActor)parser.parse(wfString); 166// } 167 _kepler = new KeplerApp4Reduce(hadoopConf); 168 } catch (Exception e) { 169 System.err.println("entering into exception catch() of Reducer4Kepler:" + sdf.format(now)); 170 System.err.println("thread info:" + Thread.currentThread().getId()); 171 e.printStackTrace(); 172 } 173 } 174 175 @Override 176 public void reduce(WritableComparable key, Iterable<Writable> values, 177 Context context) throws IOException, InterruptedException { 178 List output = null; 179 try { 180 output = _kepler.executeWf(key, values.iterator()); 181 if (output != null) { 182 java.util.Iterator outputIt = output.iterator(); 183 // LOG.info("execution result for hadoop map function " + 184 // output.size()); 185 while (outputIt.hasNext()) { 186 KeyValuePair oneEle = (KeyValuePair) outputIt 187 .next(); 188 context.write(oneEle.getKey(), oneEle.getValue()); 189 } 190 } 191 } catch (Exception e) { 192 e.printStackTrace(); 193 } 194 } 195 196 @Override 197 protected void cleanup(Context context 198 ) throws IOException, InterruptedException { 199 _kepler.cleanup(); 200 _kepler = null; 201 } 202 } 203}