001/* 002 * Copyright (c) 2015 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-06-04 22:39:55 +0000 (Thu, 04 Jun 2015) $' 007 * '$Revision: 33462 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029package org.kepler.spark.mllib; 030 031import java.util.regex.Pattern; 032 033import org.apache.spark.api.java.JavaRDD; 034import org.apache.spark.api.java.function.Function; 035import org.apache.spark.mllib.linalg.Vector; 036import org.apache.spark.mllib.linalg.Vectors; 037 038import ptolemy.actor.TypedAtomicActor; 039import ptolemy.actor.TypedIOPort; 040import ptolemy.actor.parameters.PortParameter; 041import ptolemy.data.ArrayToken; 042import ptolemy.data.IntToken; 043import ptolemy.data.ObjectToken; 044import ptolemy.data.StringToken; 045import ptolemy.data.Token; 046import ptolemy.data.type.ArrayType; 047import ptolemy.data.type.BaseType; 048import ptolemy.data.type.ObjectType; 049import ptolemy.kernel.CompositeEntity; 050import ptolemy.kernel.util.IllegalActionException; 051import ptolemy.kernel.util.NameDuplicationException; 052import ptolemy.kernel.util.SingletonAttribute; 053 054/** Takes a JavaRDD<String> dataset, an array of column indices and a 055 * separator string. It extracts specified columns from the passed 056 * dataset and creates an RDD<Vector> of them. 057 * 058 * @author Tahereh Masoumi 059 * @version $Id: ExtractColumns.java 33462 2015-06-04 22:39:55Z crawl $ 060 * 061 */ 062public class ExtractColumns extends TypedAtomicActor { 063 064 public ExtractColumns(CompositeEntity container, String name) 065 throws IllegalActionException, NameDuplicationException { 066 super(container, name); 067 068 in = new TypedIOPort(this, "in", true, false); 069 in.setTypeEquals(new ObjectType(JavaRDD.class)); 070 new SingletonAttribute(in, "_showName"); 071 072 ColumnIndices = new TypedIOPort(this, "ColumnIndices", true, false); 073 ColumnIndices.setTypeAtLeast(ArrayType.ARRAY_BOTTOM); 074 new SingletonAttribute(ColumnIndices, "_showName"); 075 076 sep = new PortParameter(this,"sep"); 077 sep.setTypeEquals(BaseType.STRING); 078 sep.setStringMode(true); 079 new SingletonAttribute(sep.getPort(), "_showName"); 080 081 out = new TypedIOPort(this, "out", false, true); 082 out.setTypeEquals(new ObjectType(JavaRDD.class)); 083 new SingletonAttribute(out, "_showName"); 084 085 test = new TypedIOPort(this, "test", false, true); 086 test.setTypeEquals(new ObjectType(Vector.class)); 087 new SingletonAttribute(test, "_showName"); 088 089 } 090 091 @Override 092 public void fire() throws IllegalActionException { 093 094 super.fire(); 095 sep.update(); 096 String separator = ((StringToken)sep.getToken()).stringValue(); 097 final JavaRDD<String> lines = (JavaRDD<String>) ((ObjectToken)in.get(0)).getValue(); 098 final Token[] idxs; 099 100 //ColumnIndices.update(); 101 if (ColumnIndices.hasToken(0)) { 102 103 ArrayToken token = (ArrayToken) ColumnIndices.get(0); 104 idxs = token.arrayValue(); 105 JavaRDD<Vector> points = lines.map(new ParsePoint(idxs,separator)); 106 out.broadcast(new ObjectToken(points, JavaRDD.class)); 107 test.broadcast(new ObjectToken(points.first(),Vector.class)); 108 109 } else { 110 111 JavaRDD<Vector> points = lines.map(new ParsePoint(separator)); 112 out.broadcast(new ObjectToken(points, JavaRDD.class)); 113 test.broadcast(new ObjectToken(points.first(),Vector.class)); 114 } 115 } 116 117 /** The input JavaRDD <String> dataset. */ 118 public TypedIOPort in; 119 120 /** An RDD of Vectors. */ 121 public TypedIOPort out; 122 123 /** An Array of column indices.*/ 124 public TypedIOPort ColumnIndices; 125 126 /** string column separator.*/ 127 public PortParameter sep; 128 129 /** Outputs the first row of the new dataset only for test purpose*/ 130 public TypedIOPort test; 131 132 133 private static class ParsePoint implements Function<String, Vector> { 134 135 private static Token[] idxs; 136 137 private static Pattern SPACE; 138 139 public ParsePoint(Token[] _idxs,String separator) { 140 idxs = _idxs; 141 SPACE = Pattern.compile("\\"+separator+"+"); 142 } 143 144 public ParsePoint(String separator){ 145 SPACE = Pattern.compile(separator); 146 } 147 148 @Override 149 public Vector call(String line) { 150 String[] tok = SPACE.split(line); 151 152 double[] point; 153 154 /** if no column index is passed it returns the vector of the whole dataset. */ 155 if(idxs.equals(null)){ 156 point = new double[tok.length]; 157 for (int i = 0; i < tok.length; ++i) { 158 point[i] = Double.parseDouble(tok[i]); 159 160 } 161 } else { 162 163 point = new double[idxs.length]; 164 165 for (int i = 0; i < idxs.length; ++i) { 166 point[i] = Double.parseDouble(tok[((IntToken)idxs[i]).intValue()]); 167 } 168 } 169 170 return Vectors.dense(point); 171 } 172 } 173}