001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.kepler.spark.examples;
019
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.regex.Pattern;
024
025import org.apache.hadoop.io.Text;
026import org.apache.spark.api.java.JavaPairRDD;
027import org.apache.spark.api.java.JavaSparkContext;
028import org.apache.spark.api.java.function.Function2;
029import org.apache.spark.api.java.function.PairFlatMapFunction;
030
031import scala.Tuple2;
032
033/** Example java classes. Some of these classes are used in the test workflows.
034 * 
035 * @author Daniel Crawl, copied from org.apache.spark.example.JavaWordCount
036 * @version $Id: JavaWordCount.java 34532 2016-10-17 19:14:15Z crawl $
037 */
038public final class JavaWordCount {
039
040    private static final Pattern SPACE = Pattern.compile(" ");
041
042    public static void main(String[] args) throws Exception {
043        if (args.length < 2) {
044            System.err.println("Usage: JavaWordCount <master> <file>");
045            System.exit(1);
046        }
047
048        JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
049                System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
050        //JavaRDD<String> lines = ctx.textFile(args[1], 1);
051        
052        JavaPairRDD lines = null;
053
054        JavaPairRDD<String, Integer> ones = lines.mapPartitionsToPair(new TokenizeLine());
055
056        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new CountWords());
057        JavaPairRDD<String, Integer> counts2 = ones.groupByKey().mapPartitionsToPair(new CountWordsMap());
058
059        List<Tuple2<String, Integer>> output = counts.collect();
060        for (Tuple2<?,?> tuple : output) {
061            System.out.println(tuple._1() + ": " + tuple._2());
062        }
063        
064        JavaPairRDD<Tuple2<String,Integer>,Tuple2<String,Integer>> pairs = counts.cartesian(counts2);
065        JavaPairRDD<String,Integer> compare = pairs.mapPartitionsToPair(new ComparePairs());
066        
067        output = compare.collect();
068        for (Tuple2<?,?> tuple : output) {
069            System.out.println(tuple._1() + ": " + tuple._2());
070        }
071        
072        counts.cogroup(counts2).mapPartitionsToPair(new CoGroupCountWords());
073        
074        counts.join(counts2).mapPartitionsToPair(new JoinFunction());
075        
076        System.exit(0);
077    }
078
079    public static class TokenizeLine implements PairFlatMapFunction<Iterator<Tuple2<Long,?>>, String, Integer> {
080
081        @Override
082        public Iterator<Tuple2<String, Integer>> call(Iterator<Tuple2<Long,?>> iterator)
083                throws Exception {
084            List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>();
085            while(iterator.hasNext()) {
086                Tuple2<Long,?> tuple = iterator.next();
087                String line;
088                if(tuple._2 instanceof Text) {
089                    line = tuple._2.toString();
090                } else if(tuple._2 instanceof String) {
091                    line = (String) tuple._2;
092                } else {
093                    throw new Exception("Unknown type of line : " + tuple._2.getClass());
094                }
095                for(String word : SPACE.split(line)) {
096                    out.add(new Tuple2<String,Integer>(word, Integer.valueOf(1)));
097                }
098            }
099            return out.iterator();
100        }
101
102    }
103    
104    public static class CountWords implements Function2<Integer, Integer, Integer> {
105        @Override
106        public Integer call(Integer i1, Integer i2) {
107            return Integer.valueOf(i1.intValue() + i2.intValue());
108        }
109    }
110    
111    public static class CountWordsMap
112        implements PairFlatMapFunction<Iterator<Tuple2<String,Iterable<Integer>>>, String, Integer> {
113
114        @Override
115        public Iterator<Tuple2<String, Integer>> call(Iterator<Tuple2<String, Iterable<Integer>>> iterator)
116                throws Exception {
117            List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>();
118            while(iterator.hasNext()) {
119                int count = 0;
120                Tuple2<String, Iterable<Integer>> input = iterator.next();
121                for(Integer val : input._2) {
122                    count += val.intValue();
123                }
124                out.add(new Tuple2<String,Integer>(input._1, Integer.valueOf(count)));
125            }
126            return out.iterator();
127        }
128    }
129   
130    public static class ComparePairs
131        implements PairFlatMapFunction<Iterator<Tuple2<Tuple2<String,Integer>,Tuple2<String,Integer>>>, String, Integer> {
132
133        @Override
134        public Iterator<Tuple2<String, Integer>> call(
135                Iterator<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> iterator)
136                throws Exception {
137            List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>();
138            while(iterator.hasNext()) {
139                Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>> input = iterator.next();
140                System.out.print(input._1._1 + " " + input._2._1);
141                out.add(new Tuple2<String,Integer>(input._1._1, (input._1._2 - input._2._2)));
142            }
143            return out.iterator();
144        }
145    }
146
147    public static class CoGroupCountWords
148        implements PairFlatMapFunction<Iterator<Tuple2<String,Tuple2<Iterable<Integer>,Iterable<Integer>>>>,String,Integer> {
149
150        @Override
151        public Iterator<Tuple2<String, Integer>> call(
152                Iterator<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> iterator)
153                throws Exception {
154            List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>();
155            while(iterator.hasNext()) {
156                Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> input = iterator.next();
157                String word = input._1;
158                int sum = 0;
159                for(Integer val : input._2._1) {
160                    sum += val.intValue();
161                }
162                for(Integer val : input._2._2) {
163                    sum += val.intValue();
164                }
165                out.add(new Tuple2<String,Integer>(word, Integer.valueOf(sum)));
166            }
167            return out.iterator();
168        }
169
170    }
171    
172    public static class JoinFunction
173        implements PairFlatMapFunction<Iterator<Tuple2<String,Tuple2<Integer,Integer>>>, String, Integer> {
174
175        @Override
176        public Iterator<Tuple2<String, Integer>> call(
177                Iterator<Tuple2<String, Tuple2<Integer, Integer>>> arg0) throws Exception {
178            // TODO Auto-generated method stub
179            return null;
180        }
181
182    }
183
184
185
186}