001/* 
002 * Copyright (c) 2012-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 
007 * '$Revision: 33070 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.hadoop.mapreduce;
031
032import java.io.IOException;
033import java.util.LinkedList;
034import java.util.List;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.io.Writable;
038import org.apache.hadoop.io.WritableComparable;
039import org.apache.hadoop.mapreduce.Reducer;
040import org.apache.log4j.Logger;
041import org.kepler.hadoop.execution.KeplerApp4CoGroup;
042import org.kepler.hadoop.io.KeyValuePair;
043import org.kepler.hadoop.io.TaggedValue;
044import org.kepler.hadoop.util.StubUtilities;
045
046//////////////////////////////////////////////////////////////////////////
047////Reducer4CoGroup
048
049/**
050* This class include a reducer for CoGroup actor.
051* It gets data from hadoop, executes sub-workflow in the cross actor via Kepler engine, 
052* and sends outputs back to hadoop.
053* 
054* @author Jianwu Wang (jianwu@sdsc.edu)
055* @version $Id: Reducer4CoGroup.java 33070 2014-11-12 23:21:09Z crawl $
056*/
057
058public class Reducer4CoGroup extends
059                Reducer<WritableComparable, TaggedValue, WritableComparable, Writable> {
060        
061        static Logger logger = Logger.getLogger(Reducer4CoGroup.class.getName());
062
063        private KeplerApp4CoGroup _kepler;
064
065        @Override
066    public void setup(Context context) {
067//              Date now = new Date();
068//              SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
069                try {
070                        // Path[] localFiles =
071                        // DistributedCache.getLocalCacheFiles(context.getConfiguration());
072//                      logger.debug("entering into setup() of CoGroupReducer:"
073//                                      + sdf.format(now));
074                        logger.debug("thread info:" + Thread.currentThread().getId());
075                        final Configuration hadoopConf = context.getConfiguration();
076                // initialize kepler to run the workflow in the stub
077                StubUtilities.initializeKepler(hadoopConf);
078//                      String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL);
079//                      logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString);
080//                      CompositeActor reduceSubWf;
081//                      // synchronize because concurrent MoMLPaser threads sometimes
082//                      // have exceptions.
083//                      synchronized (MoMLParser.class) {
084//                              MoMLParser parser = new MoMLParser();
085//                              parser.resetAll();
086//                              MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters());
087//                              MoMLParser.addMoMLFilter(new RemoveGraphicalClasses());
088//                              // CompositeActor reduceSubWf =
089//                              // (CompositeActor)parser.parseFile(localFiles[0] +
090//                              // "/reduceSubFlow.xml");
091//                              reduceSubWf = (CompositeActor) parser.parse(wfString);
092//                      }
093                        _kepler = new KeplerApp4CoGroup(hadoopConf);
094                } catch (Exception e) {
095//                      System.err
096//                                      .println("entering into excepion catch() of CoGroupReducer:"
097//                                                      + sdf.format(now));
098                        System.err.println("thread info:" + Thread.currentThread().getId());
099                        e.printStackTrace();
100                }
101        }
102
103        @Override
104    public void reduce(WritableComparable key, Iterable<TaggedValue> values,
105                        Context context) throws IOException, InterruptedException {
106                
107                LinkedList<Writable> values1 = new LinkedList<Writable>();
108                LinkedList<Writable> values2 = new LinkedList<Writable>();
109                
110                for (TaggedValue val: values) {
111                        logger.debug("In Reducer4CoGroup reducer, key:" + key + "; outputValue:"
112                                        + val.getTag() + "," + val.getData());
113                        if (val.getTag().get())
114                                values1.add(val.getData());
115                        else
116                                values2.add(val.getData());
117                }
118                List output = null;
119//              if (values1.size() != 0 && values2.size() != 0) //only process non-empty set, wrong! the empty set should also be processed.
120//              {
121                output = _kepler.executeWf(key, values1.iterator(), values2.iterator());
122                if (output != null) {
123                        java.util.Iterator outputIt = output.iterator();
124                        while (outputIt.hasNext()) {
125                                KeyValuePair oneEle = (KeyValuePair) outputIt.next();
126                                context.write(oneEle.getKey(), oneEle.getValue());
127                        }
128                }
129//              }
130        }
131        
132    @Override
133    protected void cleanup(Context context
134            ) throws IOException, InterruptedException {
135        _kepler.cleanup();
136        _kepler = null;
137    }
138}