001/* A base class for DDP pattern actor stubs that read data from the workflow.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-07-15 00:24:21 +0000 (Wed, 15 Jul 2015) $' 
008 * '$Revision: 33544 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern.stub;
031
032import java.util.LinkedList;
033import java.util.List;
034import java.util.concurrent.LinkedBlockingQueue;
035
036import org.kepler.ddp.actor.pattern.Types;
037
038import ptolemy.actor.TypedIOPort;
039import ptolemy.data.ArrayToken;
040import ptolemy.data.RecordToken;
041import ptolemy.data.StringToken;
042import ptolemy.data.Token;
043import ptolemy.data.expr.Parameter;
044import ptolemy.data.expr.SingletonParameter;
045import ptolemy.data.type.BaseType;
046import ptolemy.kernel.CompositeEntity;
047import ptolemy.kernel.util.IllegalActionException;
048import ptolemy.kernel.util.NameDuplicationException;
049import ptolemy.kernel.util.Settable;
050import ptolemy.kernel.util.Workspace;
051
052/** A base class for DDP pattern actor stubs that read data from the workflow.
053 * 
054 *  @author Daniel Crawl
055 *  @version $Id: StubSinkActor.java 33544 2015-07-15 00:24:21Z crawl $
056 */
057public class StubSinkActor extends StubBaseActor {
058
059    /** Construct a new StubSinkActor in a container with a given name. */
060    public StubSinkActor(CompositeEntity container, String name)
061            throws IllegalActionException, NameDuplicationException {
062        
063        super(container, name);
064
065        keysvalues = new TypedIOPort(this, "keysvalues", true, false);
066        keysvalues.setTypeAtMost(Types.keyValueArrayType);
067        new SingletonParameter(keysvalues, "_showName");
068        
069        lines = new TypedIOPort(this, "lines", true, false);
070        lines.setTypeEquals(BaseType.STRING);
071        new SingletonParameter(lines, "_showName");
072        
073        out = new TypedIOPort(this, "out", false, true);
074        
075        // set the production rate to 0 since no tokens are written.
076        // the out port is connected to the out port of the pattern
077        // actor, which does not transfer tokens.
078        // NOTE: if the production rate is not 0, SDF sometimes throws
079        // an exception since it expects a token to be written when
080        // this actor is fired.
081        Parameter tokenProductionRate = new Parameter(out, "tokenProductionRate");
082        tokenProductionRate.setVisibility(Settable.NOT_EDITABLE);
083        tokenProductionRate.setTypeEquals(BaseType.INT);
084        tokenProductionRate.setToken("0");
085        
086        _emptyKeyValue = new ArrayToken(new Token[] { new StringToken(), new StringToken() });
087
088    }
089
090    /** Clone the actor into the specified workspace. */
091    @Override
092    public Object clone(Workspace workspace) throws CloneNotSupportedException {
093        StubSinkActor newObject = (StubSinkActor)super.clone(workspace);
094        newObject._blockingList = new LinkedBlockingQueue<List<Token>>();
095        try {
096            newObject._emptyKeyValue = new ArrayToken(new Token[] { new StringToken(), new StringToken() });
097        } catch (IllegalActionException e) {
098            throw new CloneNotSupportedException("Error creating _emptyKeyValue for clone: " + e.getMessage());
099        }
100        newObject._nonBlockingList = new LinkedList<Token>();
101        return newObject;
102    }
103    
104    /** Read the data the input port. */
105    @Override
106    public void fire() throws IllegalActionException {
107        
108        
109        Token keysValuesToken = null;
110        
111        if(_keysvaluesConnected) {
112            keysValuesToken = keysvalues.get(0);                    
113        }
114        
115        if(_linesConnected) {
116            final String linesStr = ((StringToken)lines.get(0)).stringValue();
117            if(linesStr.trim().isEmpty()) {
118                keysValuesToken = _emptyKeyValue;                
119            } else {
120                final String[] lineArray = linesStr.split("\n");
121                final Token[] tokenArray = new Token[lineArray.length];
122                for(int i = 0; i < lineArray.length; i++) {
123                    final String line = lineArray[i];
124                    tokenArray[i] = new RecordToken(_recordLabels,
125                            new Token[] {new StringToken(line.substring(0, line.indexOf("\t"))),
126                            new StringToken(line)});
127                }
128                keysValuesToken = new ArrayToken(tokenArray);
129            }   
130            
131        }
132        
133        if(_runWorkflowLifecyclePerInput) {
134            _nonBlockingList.add(keysValuesToken);
135        } else {
136            List<Token> list = new LinkedList<Token>();
137            list.add(keysValuesToken);
138            try {
139                _blockingList.put(list);
140            } catch(InterruptedException e) {
141                throw new IllegalActionException(this, e, "Error waiting for token.");
142            }
143        }
144        
145    }
146    
147    /** Get the data read from the workflow by this actor. */
148    public List<Token> getOutput() throws IllegalActionException {
149                
150        List<Token> retval;
151        
152        if(_runWorkflowLifecyclePerInput) {
153            retval = _nonBlockingList;
154            _nonBlockingList = new LinkedList<Token>();
155        } else {
156            try {
157                retval = _blockingList.take();
158            } catch(InterruptedException e) {
159                // do not rethrow since this exception can occur when stopping a DDP
160                // job. instead just print to stderr.
161                //throw new IllegalActionException(this, e, "Error waiting for token lists.");
162                System.err.println("Got InterruptedException.");
163                return null;
164            }
165        }
166        
167        return retval;
168    }
169    
170    @Override
171    public void preinitialize() throws IllegalActionException {
172        
173        super.preinitialize();
174     
175        // see if keysvalues port is connected
176        if(keysvalues.numberOfSources() > 0) {
177            _keysvaluesConnected = true;
178            
179            // set the out port type to be based on keysvalues
180            out.setTypeAtLeast(keysvalues);
181            
182        } else {
183            _keysvaluesConnected = false;
184        }
185        
186        // see if lines port is connected
187        if(lines.numberOfSources() > 0) {
188            _linesConnected = true;
189            
190            // set the out port type to be strings
191            out.setTypeEquals(Types.createKeyValueArrayType(BaseType.STRING, BaseType.STRING));
192            
193            // FIXME fix for problem in hadoop
194            if(!_keysvaluesConnected) {
195                keysvalues.setTypeEquals(Types.createKeyValueArrayType(BaseType.STRING, BaseType.STRING));
196            }
197            
198        } else {
199            _linesConnected = false;
200        }
201        
202        if(!_keysvaluesConnected && !_linesConnected) {
203            throw new IllegalActionException(this, "No input connected.");
204        } 
205        
206    }
207            
208    /** An array of records with key-values. */
209    public TypedIOPort keysvalues;
210    
211    /** Line(s) of text that will be converted into key-value pairs.
212     *  For each line, the key is the part before the first tab,
213     *  and the value is the entire line.
214     */
215    public TypedIOPort lines;
216    
217    /** Output port of key-value pairs. */
218    public TypedIOPort out;
219    
220    /** A blocking list of tokens used when executing one iteration of the workflow per input. */
221    private LinkedBlockingQueue<List<Token>> _blockingList = new LinkedBlockingQueue<List<Token>>();
222    
223    /** A non-blocking list of tokens used when execution the full lifecycle
224     *  of the workflow per input.
225     */
226    private List<Token> _nonBlockingList = new LinkedList<Token>();
227    
228    /** True if the keysvalues port is connected. */
229    private boolean _keysvaluesConnected;
230    
231    /** True if the lines port is connected. */
232    private boolean _linesConnected;
233    
234    // TODO move to Types, find other instances
235    /** Labels for key-value record token. */
236    private static final String[] _recordLabels = new String[] {"key", "value"};
237    
238    /** Array of one record token with an empty key value. Cannot make static final
239     *  since ArrayToken constructor throws exception.
240     */
241    private ArrayToken _emptyKeyValue;
242    
243}