001/* 002 * Copyright (c) 2012-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33070 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.hadoop.mapreduce; 031 032import java.io.IOException; 033import java.util.LinkedList; 034import java.util.List; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.io.Writable; 038import org.apache.hadoop.io.WritableComparable; 039import org.apache.hadoop.mapreduce.Reducer; 040import org.apache.log4j.Logger; 041import org.kepler.hadoop.execution.KeplerApp4CoGroup; 042import org.kepler.hadoop.io.KeyValuePair; 043import org.kepler.hadoop.io.TaggedValue; 044import org.kepler.hadoop.util.StubUtilities; 045 046////////////////////////////////////////////////////////////////////////// 047////Reducer4CoGroup 048 049/** 050* This class include a reducer for CoGroup actor. 051* It gets data from hadoop, executes sub-workflow in the cross actor via Kepler engine, 052* and sends outputs back to hadoop. 053* 054* @author Jianwu Wang (jianwu@sdsc.edu) 055* @version $Id: Reducer4CoGroup.java 33070 2014-11-12 23:21:09Z crawl $ 056*/ 057 058public class Reducer4CoGroup extends 059 Reducer<WritableComparable, TaggedValue, WritableComparable, Writable> { 060 061 static Logger logger = Logger.getLogger(Reducer4CoGroup.class.getName()); 062 063 private KeplerApp4CoGroup _kepler; 064 065 @Override 066 public void setup(Context context) { 067// Date now = new Date(); 068// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 069 try { 070 // Path[] localFiles = 071 // DistributedCache.getLocalCacheFiles(context.getConfiguration()); 072// logger.debug("entering into setup() of CoGroupReducer:" 073// + sdf.format(now)); 074 logger.debug("thread info:" + Thread.currentThread().getId()); 075 final Configuration hadoopConf = context.getConfiguration(); 076 // initialize kepler to run the workflow in the stub 077 StubUtilities.initializeKepler(hadoopConf); 078// String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL); 079// logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString); 080// CompositeActor reduceSubWf; 081// // synchronize because concurrent MoMLPaser threads sometimes 082// // have exceptions. 083// synchronized (MoMLParser.class) { 084// MoMLParser parser = new MoMLParser(); 085// parser.resetAll(); 086// MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters()); 087// MoMLParser.addMoMLFilter(new RemoveGraphicalClasses()); 088// // CompositeActor reduceSubWf = 089// // (CompositeActor)parser.parseFile(localFiles[0] + 090// // "/reduceSubFlow.xml"); 091// reduceSubWf = (CompositeActor) parser.parse(wfString); 092// } 093 _kepler = new KeplerApp4CoGroup(hadoopConf); 094 } catch (Exception e) { 095// System.err 096// .println("entering into excepion catch() of CoGroupReducer:" 097// + sdf.format(now)); 098 System.err.println("thread info:" + Thread.currentThread().getId()); 099 e.printStackTrace(); 100 } 101 } 102 103 @Override 104 public void reduce(WritableComparable key, Iterable<TaggedValue> values, 105 Context context) throws IOException, InterruptedException { 106 107 LinkedList<Writable> values1 = new LinkedList<Writable>(); 108 LinkedList<Writable> values2 = new LinkedList<Writable>(); 109 110 for (TaggedValue val: values) { 111 logger.debug("In Reducer4CoGroup reducer, key:" + key + "; outputValue:" 112 + val.getTag() + "," + val.getData()); 113 if (val.getTag().get()) 114 values1.add(val.getData()); 115 else 116 values2.add(val.getData()); 117 } 118 List output = null; 119// if (values1.size() != 0 && values2.size() != 0) //only process non-empty set, wrong! the empty set should also be processed. 120// { 121 output = _kepler.executeWf(key, values1.iterator(), values2.iterator()); 122 if (output != null) { 123 java.util.Iterator outputIt = output.iterator(); 124 while (outputIt.hasNext()) { 125 KeyValuePair oneEle = (KeyValuePair) outputIt.next(); 126 context.write(oneEle.getKey(), oneEle.getValue()); 127 } 128 } 129// } 130 } 131 132 @Override 133 protected void cleanup(Context context 134 ) throws IOException, InterruptedException { 135 _kepler.cleanup(); 136 _kepler = null; 137 } 138}