001/* An operator for token sinks.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-11-18 00:12:43 +0000 (Tue, 18 Nov 2014) $' 
008 * '$Revision: 33083 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.operator;
031
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Comparator;
035import java.util.List;
036
037import org.apache.spark.api.java.JavaPairRDD;
038import org.kepler.ddp.actor.pattern.DDPDataSink;
039import org.kepler.spark.stub.StubUtilities;
040
041import ptolemy.data.RecordToken;
042import ptolemy.data.Token;
043import ptolemy.kernel.util.IllegalActionException;
044import scala.Tuple2;
045
046/** A data sink that writes to Ptolemy tokens.
047 * 
048 *  @author Daniel Crawl
049 *  @version $Id: TokenSink.java 33083 2014-11-18 00:12:43Z crawl $
050 */
051public class TokenSink extends DataSink {
052
053    /** Create a new TokenSink.
054     *  @param sinkActorFullName the full name of the sink actor
055     *  @param name the name of the operator
056     */
057    public TokenSink(String sinkActorFullName, String name) {
058        super(1, null, name);
059        _sinkActorFullName = sinkActorFullName;
060    }
061
062    /** Execute the operator. */
063    @Override
064    public JavaPairRDD<Object, ?> execute() throws IllegalActionException {
065        
066        List<RecordToken> tokenList = new ArrayList<RecordToken>();
067        
068        // NOTE: sortByKey().collect() was executing the DDP tasks twice,
069        // so we call collect() and sort the results below.
070        //for(Tuple2<Object, ?> tuple : _inputData[0].sortByKey().collect()) {
071        for(Tuple2<Object, ?> tuple : _inputData[0].collect()) {
072            
073            Token keyToken;
074            Token valueToken;
075            
076            // see if there is only one field
077            if(tuple._2 == null) {
078                keyToken = Token.NIL;
079                valueToken = StubUtilities.convertToToken(tuple._1);
080            } else {
081                keyToken = StubUtilities.convertToToken(tuple._1);
082                valueToken = StubUtilities.convertToToken(tuple._2);
083            }
084            
085            RecordToken recordToken = new RecordToken(new String[] {"key", "value"}, new Token[] {keyToken, valueToken});
086            tokenList.add(recordToken);
087        }
088        
089        // NOTE: sort the list for predictable output for the tests
090        
091        Token[] array = tokenList.toArray(new Token[tokenList.size()]);
092        Arrays.sort(array, new Comparator<Token>() {
093            @Override
094            public int compare(Token o1, Token o2) {
095                return ((RecordToken)o1).get("key").toString().compareTo(
096                        ((RecordToken)o2).get("key").toString());
097            }
098        });
099
100        DDPDataSink.addTokens(_sinkActorFullName, Arrays.asList(array));
101
102        return null;
103    }
104
105    /** The full name of the sink actor. */
106    private String _sinkActorFullName;
107}