001/* 
002 * Copyright (c) 2012-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 
007 * '$Revision: 33070 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.hadoop.mapreduce;
031
032import java.io.IOException;
033import java.util.List;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.io.IOUtils;
040import org.apache.hadoop.io.SequenceFile;
041import org.apache.hadoop.io.Writable;
042import org.apache.hadoop.io.WritableComparable;
043import org.apache.hadoop.mapreduce.Mapper;
044import org.apache.hadoop.util.ReflectionUtils;
045import org.apache.log4j.Logger;
046import org.kepler.ddp.Utilities;
047import org.kepler.hadoop.execution.KeplerApp4Cross;
048import org.kepler.hadoop.io.KeyValuePair;
049import org.kepler.hadoop.util.StubUtilities;
050
051//////////////////////////////////////////////////////////////////////////
052////Mapper4Cross
053
054/**
055* This class include a mapper for Cross actor.
056* It gets data from hadoop, executes sub-workflow in the cross actor via Kepler engine, 
057* and sends outputs back to hadoop.
058* 
059* @author Jianwu Wang (jianwu@sdsc.edu)
060* @version $Id: Mapper4Cross.java 33070 2014-11-12 23:21:09Z crawl $
061*/
062
063public class Mapper4Cross extends
064                Mapper<WritableComparable, Writable, WritableComparable, Writable> {
065        
066        static Logger logger = Logger.getLogger(Mapper4Cross.class.getName());
067
068        SequenceFile.Reader reader = null;
069        String strPath;
070        KeplerApp4Cross _kepler;
071        FileStatus[] status;
072
073        @Override
074    public void setup(Context context) {
075                strPath = context.getConfiguration()
076                                .get(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH);
077                logger.debug(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH + ":" + strPath);
078                //Date now = new Date();
079                //SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
080                try {
081                FileSystem fs = FileSystem.get(context.getConfiguration());
082                status = fs.listStatus(new Path(strPath));
083                //logger.debug("entering into setup() of Mapper4Cross:"
084                //              + sdf.format(now));
085                        logger.debug("thread info:" + Thread.currentThread().getId());
086                        final Configuration hadoopConf = context.getConfiguration();
087                // initialize kepler to run the workflow in the stub
088                StubUtilities.initializeKepler(hadoopConf);     
089                        // synchronize because concurrent MoMLPaser threads sometimes have
090                        // exceptions.
091//                      synchronized (MoMLParser.class) {
092//                              MoMLParser parser = new MoMLParser();
093//                              parser.resetAll();
094//                              MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters());
095//                              MoMLParser.addMoMLFilter(new RemoveGraphicalClasses());
096//                              // CompositeActor reduceSubWf =
097//                              // (CompositeActor)parser.parseFile(localFiles[0] +
098//                              // "/reduceSubFlow.xml");
099//                              subWf = (CompositeActor) parser.parse(wfString);
100//                      }
101                        _kepler = new KeplerApp4Cross(hadoopConf);
102                } catch (Exception e) {
103                        //System.err
104                        //              .println("entering into excepion catch() of Reducer4Kepler:"
105                        //                              + sdf.format(now));
106                        System.err.println("thread info:" + Thread.currentThread().getId());
107                        e.printStackTrace();
108                }
109        }
110
111        @Override
112    public void map(WritableComparable key, Writable value, Context context) throws IOException,
113                        InterruptedException {
114                List output = null;
115                logger.debug("key in map of Mapper4Cross:" + key);
116                logger.debug("value in map of Mapper4Cross:" + value);
117
118                for (int i=0; i<status.length; i++){
119                        try {
120                                logger.debug("one path from reader:" + status[i].getPath());
121                                reader = new SequenceFile.Reader(context.getConfiguration(),
122                                                SequenceFile.Reader.file(status[i].getPath()));
123                                WritableComparable key2 = (WritableComparable) ReflectionUtils.newInstance(
124                                                reader.getKeyClass(), context.getConfiguration());
125                                Writable value2 = (Writable) ReflectionUtils.newInstance(
126                                                reader.getValueClass(), context.getConfiguration());
127                                long position = reader.getPosition();
128                                while (reader.next(key2, value2)) {
129                                        String syncSeen = reader.syncSeen() ? "*" : "";
130                                        position = reader.getPosition();
131                                        // cross function can be done here.
132                                        logger.debug("parameters of a cross function. key1:"
133                                                        + key + ", value1:" + value + ", key2:" + key2
134                                                        + ", value2:" + value2);
135                                        output = _kepler.executeWf(key, value, key2, value2);
136                                        if (output != null) {
137                                                java.util.Iterator outputIt = output.iterator();
138                                                while (outputIt.hasNext()) {
139                                                        KeyValuePair oneEle = (KeyValuePair) outputIt
140                                                                        .next();
141                                                        context.write(oneEle.getKey(), oneEle.getValue());
142                                                }
143                                        }
144                                }
145                        } finally {
146                                IOUtils.closeStream(reader);
147                        }
148                }
149
150        }
151        
152    @Override
153    protected void cleanup(Context context
154            ) throws IOException, InterruptedException {
155        _kepler.cleanup();
156        _kepler = null;
157    }
158
159}