001/* 
002 * Copyright (c) 2012-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 
007 * '$Revision: 33070 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.hadoop.mapreduce;
031
032import java.io.IOException;
033import java.text.SimpleDateFormat;
034import java.util.Date;
035import java.util.List;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.io.IOUtils;
042import org.apache.hadoop.io.SequenceFile;
043import org.apache.hadoop.io.Writable;
044import org.apache.hadoop.io.WritableComparable;
045import org.apache.hadoop.mapreduce.Mapper;
046import org.apache.hadoop.util.ReflectionUtils;
047import org.apache.log4j.Logger;
048import org.kepler.ddp.Utilities;
049import org.kepler.hadoop.execution.KeplerApp4Match;
050import org.kepler.hadoop.io.KeyValuePair;
051import org.kepler.hadoop.util.StubUtilities;
052
053import ptolemy.moml.MoMLFilter;
054import ptolemy.moml.MoMLParser;
055
056//////////////////////////////////////////////////////////////////////////
057////Mapper4Match
058
059/**
060* This class include a mapper for Match actor.
061* It gets data from hadoop, executes sub-workflow in the match actor via Kepler engine, 
062* and sends outputs back to hadoop.
063* 
064* @author Jianwu Wang (jianwu@sdsc.edu)
065* @version $Id: Mapper4Match.java 33070 2014-11-12 23:21:09Z crawl $
066*/
067
068public class Mapper4Match extends
069                Mapper<WritableComparable, Writable, WritableComparable, Writable> {
070        
071        static Logger logger = Logger.getLogger(Mapper4Match.class.getName());
072
073        SequenceFile.Reader[] readers = null;
074        String strPath;
075        KeplerApp4Match _kepler;
076        FileStatus[] status;
077        WritableComparable key2 = null;
078        Writable value2 = null;
079
080        @Override
081    public void setup(Context context) {
082                strPath = context.getConfiguration()
083                                .get(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH);
084                logger.debug(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH + ":" + strPath);
085                // Get the cached archives/files
086                Date now = new Date();
087                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
088                try {
089                FileSystem fs = FileSystem.get(context.getConfiguration());
090                status = fs.listStatus(new Path(strPath));
091                        // Path[] localFiles =
092                        // DistributedCache.getLocalCacheFiles(context.getConfiguration());
093                        logger.debug("entering into setup() of Mapper4Match:"
094                                        + sdf.format(now));
095                        logger.debug("thread info:" + Thread.currentThread().getId());
096                        final Configuration hadoopConf = context.getConfiguration();
097                // initialize kepler to run the workflow in the stub
098                StubUtilities.initializeKepler(hadoopConf);
099//                      String wfString = context.getConfiguration().get(Utilities.CONFIGURATION_KEPLER_MODEL);
100//                      System.err.println(Utilities.CONFIGURATION_KEPLER_MODEL + ":" + wfString);
101//                      CompositeActor subWf;
102//                      // synchronize because concurrent MoMLPaser threads sometimes have
103//                      // exceptions.
104//                      synchronized (MoMLParser.class) {
105//                              MoMLParser parser = new MoMLParser();
106//                              parser.resetAll();
107//                              MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters());
108//                              MoMLParser.addMoMLFilter(new RemoveGraphicalClasses());
109//                              // CompositeActor reduceSubWf =
110//                              // (CompositeActor)parser.parseFile(localFiles[0] +
111//                              // "/reduceSubFlow.xml");
112//                              subWf = (CompositeActor) parser.parse(wfString);
113//                      }
114                        _kepler = new KeplerApp4Match(hadoopConf);
115                } catch (Exception e) {
116                        System.err
117                                        .println("entering into excepion catch() of Reducer4Kepler:"
118                                                        + sdf.format(now));
119                        System.err.println("thread info:" + Thread.currentThread().getId());
120                        for (Object filter : MoMLParser.getMoMLFilters()) {
121                                System.err.println("one filter: " + ((MoMLFilter) filter));
122                        }
123                        e.printStackTrace();
124                }
125        }
126
127        @Override
128    public void map(WritableComparable key, Writable value, Context context) throws IOException,
129                        InterruptedException {
130                List output = null;
131//              System.out.println("key in map of Mapper4Match:" + key);
132//              System.out.println("value in map of Mapper4Match:" + value);
133
134                if (readers == null) {
135                        readers = new SequenceFile.Reader[status.length];               
136                        for (int i=0; i<status.length; i++){
137//                              System.out.println("one input path for match reader:" + status[i].getPath());
138                                readers[i] = new SequenceFile.Reader(context.getConfiguration(),
139                                                SequenceFile.Reader.file(status[i].getPath()));
140                        }
141                } 
142                
143                for (int i=0; i<status.length; i++) {
144//                              System.out.println("one input path for match reader:" + status[i].getPath());
145                                key2 = (WritableComparable) ReflectionUtils.newInstance(
146                                                readers[i].getKeyClass(), context.getConfiguration());
147                                value2 = (Writable) ReflectionUtils.newInstance(
148                                                readers[i].getValueClass(), context.getConfiguration());
149                                long position = readers[i].getPosition();
150//                              logger.debug("data before start reading. key1:"
151//                                              + key + ", value1:" + value + ", key2:" + key2
152//                                              + ", value2:" + value2 + " at position: " + position);
153                                while (readers[i].next(key2, value2)) {
154//                                      logger.debug("new record. key1:"
155//                                                      + key + ", value1:" + value + ", key2:" + key2
156//                                                      + ", value2:" + value2 + " at position: " + position);
157                                        if (key.compareTo(key2) < 0) { 
158                                                //since data read by reader is sorted, we do not need to compare it anymore once we found the current key2 is bigger than key1.
159//                                              logger.debug("key1 is smaller than key2. key1:"
160//                                                              + key + ", value1:" + value + ", key2:" + key2
161//                                                              + ", value2:" + value2 + " at position: " + position);
162                                                readers[i].seek(position);
163                                                break;
164                                        }
165                                        else {
166                                                //System.out.println("position where key >= key2: " + position);
167                                                if (key.compareTo(key2) == 0) {
168                                                        //if key == key2, do not remember the position because the next key could be the same.
169//                                                      logger.debug("found same key. key1:"
170//                                                                      + key + ", value1:" + value + ", key2:" + key2
171//                                                                      + ", value2:" + value2 + " at position: " + position);
172                                                        output = _kepler.executeWf(key, value, value2);
173                                                        if (output != null) {
174                                                                java.util.Iterator outputIt = output.iterator();
175                                                                while (outputIt.hasNext()) {
176                                                                        KeyValuePair oneEle = (KeyValuePair) outputIt
177                                                                                        .next();
178                                                                        context.write(oneEle.getKey(), oneEle.getValue());
179                                                                }
180                                                        }
181                                                } else { //remember the position if key > key2. 
182                                                        position = readers[i].getPosition();
183                                                }
184                                        }
185                                }
186                }
187        }
188
189    @Override
190    protected void cleanup(Context context
191            ) throws IOException, InterruptedException {
192        _kepler.cleanup();
193        _kepler = null;
194        if (readers != null) 
195                for (SequenceFile.Reader reader : readers)
196                        IOUtils.closeStream(reader);
197    }
198}