001/*
002 * Copyright (c) 2010-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * Permission is hereby granted, without written agreement and without
006 * license or royalty fees, to use, copy, modify, and distribute this
007 * software and its documentation for any purpose, provided that the above
008 * copyright notice and the following two paragraphs appear in all copies
009 * of this software.
010 *
011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 * SUCH DAMAGE.
016 *
017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 * ENHANCEMENTS, OR MODIFICATIONS.
023 *
024 */
025package org.kepler.hadoop.mapreduce;
026
027import java.io.IOException;
028import java.text.SimpleDateFormat;
029import java.util.Date;
030import java.util.List;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.io.Writable;
034import org.apache.hadoop.io.WritableComparable;
035import org.apache.hadoop.mapreduce.Mapper;
036import org.apache.hadoop.mapreduce.Reducer;
037import org.apache.log4j.Logger;
038import org.kepler.hadoop.execution.KeplerApp4Map;
039import org.kepler.hadoop.execution.KeplerApp4Reduce;
040import org.kepler.hadoop.io.KeyValuePair;
041import org.kepler.hadoop.util.StubUtilities;
042
043//////////////////////////////////////////////////////////////////////////
044////MapReduce4Kepler
045
046/**
047 * This class include Mapper4Kepler and Reducer4Kepler classes which extend
048 * Mapper and Reducer class provided by Hadoop. Through this class, map and
049 * reduce sub-workflow in Map and Reduce Actor will be invoked within Hadoop
050 * infrastructure.
051 * 
052 * @author Jianwu Wang (jianwu@sdsc.edu)
053 * @version $Id: MapReduce4Kepler.java 33070 2014-11-12 23:21:09Z crawl $
054 */
055
056public class MapReduce4Kepler {
057
058        //private static final Log LOG = LogFactory.getLog(MapReduce4Kepler.class);
059        static Logger logger = Logger.getLogger(MapReduce4Kepler.class.getName());
060        
061        public static class Mapper4Kepler extends
062                Mapper<WritableComparable, Writable, WritableComparable, Writable> {    
063                
064                private KeplerApp4Map _kepler;
065                private String inputPath;
066
067                @Override
068        public void setup(Context context) {
069//                      Date now = new Date();
070//                      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
071                        // Get the cached archives/files
072                        try {
073//                              logger.debug("entering into setup() of Mapper4Kepler:" + sdf.format(now));
074                                logger.debug("thread info:" + Thread.currentThread().getId());
075                                final Configuration hadoopConf = context.getConfiguration();
076                        // initialize kepler to run the workflow in the stub
077                        StubUtilities.initializeKepler(hadoopConf);
078//                              String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL);
079//                              logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString);
080//                              //synchronize because concurrent MoMLPaser threads sometimes have exceptions.
081//                              synchronized (MoMLParser.class) {
082//                                      MoMLParser parser = new MoMLParser();
083//                                      parser.resetAll();
084//                                      MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters());
085//                                      MoMLParser.addMoMLFilter(new RemoveGraphicalClasses());
086//                                      mapSubWf = (CompositeActor)parser.parse(wfString);
087//                              }
088                                _kepler = new KeplerApp4Map(hadoopConf);
089                                
090                        } catch (Exception e) {
091//                              System.err.println("entering into excepion catch() of Mapper4Kepler:" + sdf.format(now));
092                                System.err.println("thread info:" + Thread.currentThread().getId());                            
093                                e.printStackTrace();
094                        }
095                }
096
097                // add parameter value update, useful for distributed cache.
098                // Path[] cachedFiles = new Path[0];
099                // finish adding parameter value update
100
101                @Override
102        public void map(WritableComparable key, Writable value, Context context)
103                                throws IOException, InterruptedException {
104
105//                       System.out.println("Job ID:" +
106//                       context.getTaskAttemptID().getJobID());
107//                       System.out.println("Task ID:" +
108//                       context.getTaskAttemptID().getTaskID());
109//                       System.out.println("ID:" + context.getTaskAttemptID().getId());
110//                      Date now = new Date();
111//                      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
112//                      logger.debug("entering into map() of Mapper4Kepler:" + sdf.format(now));
113                        logger.debug("thread info:" + Thread.currentThread().getId());
114//                      System.out.println("key:" + key);
115//                      System.out.println("value:" + value);
116                        List<KeyValuePair> output = _kepler.executeWf(key, value, inputPath,
117                                                context.getTaskAttemptID().getTaskID().toString());
118                        if (output != null) {
119                                java.util.Iterator<KeyValuePair> outputIt = output.iterator();
120                                // LOG.info("execution result for hadoop map function " +
121                                // output.size());
122
123                                while (outputIt.hasNext()) {
124                                        KeyValuePair oneEle = (KeyValuePair) outputIt
125                                                        .next();
126                                        context.write(oneEle.getKey(), oneEle.getValue());
127                                }
128                        }                               
129                }
130                
131            @Override
132        protected void cleanup(Context context
133                    ) throws IOException, InterruptedException {
134                _kepler.cleanup();
135                _kepler = null;
136            }
137        }
138
139        public static class Reducer4Kepler extends
140                Reducer<WritableComparable, Writable, WritableComparable, Writable> {           
141                private KeplerApp4Reduce _kepler;
142
143                @Override
144        public void setup(Context context) {
145                        // Get the cached archives/files
146                        Date now = new Date();
147                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
148                        try {
149                                //Path[] localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
150                                logger.debug("entering into setup() of Reducer4Kepler:" + sdf.format(now));
151                                logger.debug("thread info:" + Thread.currentThread().getId());
152                                final Configuration hadoopConf = context.getConfiguration();
153                        // initialize kepler to run the workflow in the stub
154                        StubUtilities.initializeKepler(hadoopConf);
155//                              String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL);
156//                              logger.debug(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString);
157//                              CompositeActor reduceSubWf;
158//                              //synchronize because concurrent MoMLPaser threads sometimes have exceptions.
159//                              synchronized (MoMLParser.class) {
160//                                      MoMLParser parser = new MoMLParser();
161//                                      parser.resetAll();
162//                                      MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters());
163//                                      MoMLParser.addMoMLFilter(new RemoveGraphicalClasses());
164//                                      //CompositeActor reduceSubWf  = (CompositeActor)parser.parseFile(localFiles[0] + "/reduceSubFlow.xml");
165//                                      reduceSubWf = (CompositeActor)parser.parse(wfString);
166//                              }
167                                _kepler = new KeplerApp4Reduce(hadoopConf);
168                        } catch (Exception e) {
169                                System.err.println("entering into exception catch() of Reducer4Kepler:" + sdf.format(now));
170                                System.err.println("thread info:" + Thread.currentThread().getId());
171                                e.printStackTrace();
172                        }
173                }
174
175                @Override
176        public void reduce(WritableComparable key, Iterable<Writable> values,
177                                Context context) throws IOException, InterruptedException {
178                        List output = null;
179                        try {
180                                output = _kepler.executeWf(key, values.iterator());
181                                if (output != null) {
182                                        java.util.Iterator outputIt = output.iterator();
183                                        // LOG.info("execution result for hadoop map function " +
184                                        // output.size());
185                                        while (outputIt.hasNext()) {
186                                                KeyValuePair oneEle = (KeyValuePair) outputIt
187                                                                .next();
188                                                context.write(oneEle.getKey(), oneEle.getValue());
189                                        }
190                                }
191                        } catch (Exception e) {
192                                e.printStackTrace();
193                        }
194                }
195                
196            @Override
197        protected void cleanup(Context context
198                    ) throws IOException, InterruptedException {
199                _kepler.cleanup();
200                _kepler = null;
201            }
202        }
203}