001/* 
002 * Copyright (c) 2012-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 
007 * '$Revision: 33070 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.hadoop.mapreduce;
031
032import java.io.IOException;
033import java.util.LinkedList;
034import java.util.List;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.io.Writable;
038import org.apache.hadoop.io.WritableComparable;
039import org.apache.hadoop.mapreduce.Reducer;
040import org.apache.log4j.Logger;
041import org.kepler.hadoop.execution.KeplerApp4Match;
042import org.kepler.hadoop.io.KeyValuePair;
043import org.kepler.hadoop.io.TaggedValue;
044import org.kepler.hadoop.util.StubUtilities;
045
046//////////////////////////////////////////////////////////////////////////
047////Reducer4Match
048
049/**
050* This class include a reducer for Match actor.
051* It gets data from hadoop, executes sub-workflow in the match actor via Kepler engine, 
052* and sends outputs back to hadoop.
053* 
054* This class is not used in HadoopDirector?
055* 
056* @author Jianwu Wang (jianwu@sdsc.edu)
057* @version $Id: Reducer4Match.java 33070 2014-11-12 23:21:09Z crawl $
058*/
059
060public class Reducer4Match extends
061                Reducer<WritableComparable, TaggedValue, WritableComparable, Writable> {
062        
063        static Logger logger = Logger.getLogger(Reducer4Match.class.getName());
064
065        private KeplerApp4Match _kepler;
066
067        @Override
068    public void setup(Context context) {
069                // Get the cached archives/files
070//              Date now = new Date();
071//              SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
072                try {
073//                      logger.debug("entering into setup() of Reducer4Match:"
074//                                      + sdf.format(now));
075                        logger.debug("thread info:" + Thread.currentThread().getId());
076                        final Configuration hadoopConf = context.getConfiguration();
077                // initialize kepler to run the workflow in the stub
078                StubUtilities.initializeKepler(hadoopConf);
079//                      String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL);
080//                      logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString);
081//                      CompositeActor reduceSubWf;
082//                      // synchronize because concurrent MoMLPaser threads sometimes have
083//                      // exceptions.
084//                      synchronized (MoMLParser.class) {
085//                              MoMLParser parser = new MoMLParser();
086//                              parser.resetAll();
087//                              MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters());
088//                              MoMLParser.addMoMLFilter(new RemoveGraphicalClasses());
089//                              // CompositeActor reduceSubWf =
090//                              // (CompositeActor)parser.parseFile(localFiles[0] +
091//                              // "/reduceSubFlow.xml");
092//                              reduceSubWf = (CompositeActor) parser.parse(wfString);
093//                      }
094                        _kepler = new KeplerApp4Match(hadoopConf);
095                } catch (Exception e) {
096//                      System.err
097//                                      .println("entering into excepion catch() of Reducer4Kepler:"
098//                                                      + sdf.format(now));
099                        System.err.println("thread info:" + Thread.currentThread().getId());
100                        e.printStackTrace();
101                }
102        }
103
104        @Override
105    public void reduce(WritableComparable key, Iterable<TaggedValue> values,
106                        Context context) throws IOException, InterruptedException {
107                
108                LinkedList<Writable> values1 = new LinkedList<Writable>();
109                LinkedList<Writable> values2 = new LinkedList<Writable>();
110                
111                for (TaggedValue val: values) {
112                        logger.debug("In Reducer4Match reducer, key:" + key + "; outputValue:"
113                                        + val.getTag() + "," + val.getData());
114                        if (val.getTag().get())
115                                values1.add(val.getData());
116                        else
117                                values2.add(val.getData());
118                }
119                
120                List output = null;
121                for (Writable value1: values1) {
122                        for (Writable value2: values2) {
123                                output = _kepler.executeWf(key, value1, value2);
124                                if (output != null) {
125                                        java.util.Iterator outputIt = output.iterator();
126                                        while (outputIt.hasNext()) {
127                                                KeyValuePair oneEle = (KeyValuePair) outputIt
128                                                                .next();
129                                                context.write(oneEle.getKey(), oneEle.getValue());
130                                        }
131                                }
132                        }
133                }
134        }
135        
136    @Override
137    protected void cleanup(Context context
138            ) throws IOException, InterruptedException {
139        _kepler.cleanup();
140        _kepler = null;
141    }
142}