001/* 002 * Copyright (c) 2012-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33070 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.hadoop.mapreduce; 031 032import java.io.IOException; 033import java.util.LinkedList; 034import java.util.List; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.io.Writable; 038import org.apache.hadoop.io.WritableComparable; 039import org.apache.hadoop.mapreduce.Reducer; 040import org.apache.log4j.Logger; 041import org.kepler.hadoop.execution.KeplerApp4Match; 042import org.kepler.hadoop.io.KeyValuePair; 043import org.kepler.hadoop.io.TaggedValue; 044import org.kepler.hadoop.util.StubUtilities; 045 046////////////////////////////////////////////////////////////////////////// 047////Reducer4Match 048 049/** 050* This class include a reducer for Match actor. 051* It gets data from hadoop, executes sub-workflow in the match actor via Kepler engine, 052* and sends outputs back to hadoop. 053* 054* This class is not used in HadoopDirector? 055* 056* @author Jianwu Wang (jianwu@sdsc.edu) 057* @version $Id: Reducer4Match.java 33070 2014-11-12 23:21:09Z crawl $ 058*/ 059 060public class Reducer4Match extends 061 Reducer<WritableComparable, TaggedValue, WritableComparable, Writable> { 062 063 static Logger logger = Logger.getLogger(Reducer4Match.class.getName()); 064 065 private KeplerApp4Match _kepler; 066 067 @Override 068 public void setup(Context context) { 069 // Get the cached archives/files 070// Date now = new Date(); 071// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 072 try { 073// logger.debug("entering into setup() of Reducer4Match:" 074// + sdf.format(now)); 075 logger.debug("thread info:" + Thread.currentThread().getId()); 076 final Configuration hadoopConf = context.getConfiguration(); 077 // initialize kepler to run the workflow in the stub 078 StubUtilities.initializeKepler(hadoopConf); 079// String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL); 080// logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString); 081// CompositeActor reduceSubWf; 082// // synchronize because concurrent MoMLPaser threads sometimes have 083// // exceptions. 084// synchronized (MoMLParser.class) { 085// MoMLParser parser = new MoMLParser(); 086// parser.resetAll(); 087// MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters()); 088// MoMLParser.addMoMLFilter(new RemoveGraphicalClasses()); 089// // CompositeActor reduceSubWf = 090// // (CompositeActor)parser.parseFile(localFiles[0] + 091// // "/reduceSubFlow.xml"); 092// reduceSubWf = (CompositeActor) parser.parse(wfString); 093// } 094 _kepler = new KeplerApp4Match(hadoopConf); 095 } catch (Exception e) { 096// System.err 097// .println("entering into excepion catch() of Reducer4Kepler:" 098// + sdf.format(now)); 099 System.err.println("thread info:" + Thread.currentThread().getId()); 100 e.printStackTrace(); 101 } 102 } 103 104 @Override 105 public void reduce(WritableComparable key, Iterable<TaggedValue> values, 106 Context context) throws IOException, InterruptedException { 107 108 LinkedList<Writable> values1 = new LinkedList<Writable>(); 109 LinkedList<Writable> values2 = new LinkedList<Writable>(); 110 111 for (TaggedValue val: values) { 112 logger.debug("In Reducer4Match reducer, key:" + key + "; outputValue:" 113 + val.getTag() + "," + val.getData()); 114 if (val.getTag().get()) 115 values1.add(val.getData()); 116 else 117 values2.add(val.getData()); 118 } 119 120 List output = null; 121 for (Writable value1: values1) { 122 for (Writable value2: values2) { 123 output = _kepler.executeWf(key, value1, value2); 124 if (output != null) { 125 java.util.Iterator outputIt = output.iterator(); 126 while (outputIt.hasNext()) { 127 KeyValuePair oneEle = (KeyValuePair) outputIt 128 .next(); 129 context.write(oneEle.getKey(), oneEle.getValue()); 130 } 131 } 132 } 133 } 134 } 135 136 @Override 137 protected void cleanup(Context context 138 ) throws IOException, InterruptedException { 139 _kepler.cleanup(); 140 _kepler = null; 141 } 142}