001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.kepler.spark.examples; 019 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.regex.Pattern; 024 025import org.apache.hadoop.io.Text; 026import org.apache.spark.api.java.JavaPairRDD; 027import org.apache.spark.api.java.JavaSparkContext; 028import org.apache.spark.api.java.function.Function2; 029import org.apache.spark.api.java.function.PairFlatMapFunction; 030 031import scala.Tuple2; 032 033/** Example java classes. Some of these classes are used in the test workflows. 034 * 035 * @author Daniel Crawl, copied from org.apache.spark.example.JavaWordCount 036 * @version $Id: JavaWordCount.java 34532 2016-10-17 19:14:15Z crawl $ 037 */ 038public final class JavaWordCount { 039 040 private static final Pattern SPACE = Pattern.compile(" "); 041 042 public static void main(String[] args) throws Exception { 043 if (args.length < 2) { 044 System.err.println("Usage: JavaWordCount <master> <file>"); 045 System.exit(1); 046 } 047 048 JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", 049 System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class)); 050 //JavaRDD<String> lines = ctx.textFile(args[1], 1); 051 052 JavaPairRDD lines = null; 053 054 JavaPairRDD<String, Integer> ones = lines.mapPartitionsToPair(new TokenizeLine()); 055 056 JavaPairRDD<String, Integer> counts = ones.reduceByKey(new CountWords()); 057 JavaPairRDD<String, Integer> counts2 = ones.groupByKey().mapPartitionsToPair(new CountWordsMap()); 058 059 List<Tuple2<String, Integer>> output = counts.collect(); 060 for (Tuple2<?,?> tuple : output) { 061 System.out.println(tuple._1() + ": " + tuple._2()); 062 } 063 064 JavaPairRDD<Tuple2<String,Integer>,Tuple2<String,Integer>> pairs = counts.cartesian(counts2); 065 JavaPairRDD<String,Integer> compare = pairs.mapPartitionsToPair(new ComparePairs()); 066 067 output = compare.collect(); 068 for (Tuple2<?,?> tuple : output) { 069 System.out.println(tuple._1() + ": " + tuple._2()); 070 } 071 072 counts.cogroup(counts2).mapPartitionsToPair(new CoGroupCountWords()); 073 074 counts.join(counts2).mapPartitionsToPair(new JoinFunction()); 075 076 System.exit(0); 077 } 078 079 public static class TokenizeLine implements PairFlatMapFunction<Iterator<Tuple2<Long,?>>, String, Integer> { 080 081 @Override 082 public Iterator<Tuple2<String, Integer>> call(Iterator<Tuple2<Long,?>> iterator) 083 throws Exception { 084 List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>(); 085 while(iterator.hasNext()) { 086 Tuple2<Long,?> tuple = iterator.next(); 087 String line; 088 if(tuple._2 instanceof Text) { 089 line = tuple._2.toString(); 090 } else if(tuple._2 instanceof String) { 091 line = (String) tuple._2; 092 } else { 093 throw new Exception("Unknown type of line : " + tuple._2.getClass()); 094 } 095 for(String word : SPACE.split(line)) { 096 out.add(new Tuple2<String,Integer>(word, Integer.valueOf(1))); 097 } 098 } 099 return out.iterator(); 100 } 101 102 } 103 104 public static class CountWords implements Function2<Integer, Integer, Integer> { 105 @Override 106 public Integer call(Integer i1, Integer i2) { 107 return Integer.valueOf(i1.intValue() + i2.intValue()); 108 } 109 } 110 111 public static class CountWordsMap 112 implements PairFlatMapFunction<Iterator<Tuple2<String,Iterable<Integer>>>, String, Integer> { 113 114 @Override 115 public Iterator<Tuple2<String, Integer>> call(Iterator<Tuple2<String, Iterable<Integer>>> iterator) 116 throws Exception { 117 List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>(); 118 while(iterator.hasNext()) { 119 int count = 0; 120 Tuple2<String, Iterable<Integer>> input = iterator.next(); 121 for(Integer val : input._2) { 122 count += val.intValue(); 123 } 124 out.add(new Tuple2<String,Integer>(input._1, Integer.valueOf(count))); 125 } 126 return out.iterator(); 127 } 128 } 129 130 public static class ComparePairs 131 implements PairFlatMapFunction<Iterator<Tuple2<Tuple2<String,Integer>,Tuple2<String,Integer>>>, String, Integer> { 132 133 @Override 134 public Iterator<Tuple2<String, Integer>> call( 135 Iterator<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> iterator) 136 throws Exception { 137 List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>(); 138 while(iterator.hasNext()) { 139 Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>> input = iterator.next(); 140 System.out.print(input._1._1 + " " + input._2._1); 141 out.add(new Tuple2<String,Integer>(input._1._1, (input._1._2 - input._2._2))); 142 } 143 return out.iterator(); 144 } 145 } 146 147 public static class CoGroupCountWords 148 implements PairFlatMapFunction<Iterator<Tuple2<String,Tuple2<Iterable<Integer>,Iterable<Integer>>>>,String,Integer> { 149 150 @Override 151 public Iterator<Tuple2<String, Integer>> call( 152 Iterator<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> iterator) 153 throws Exception { 154 List<Tuple2<String,Integer>> out = new LinkedList<Tuple2<String,Integer>>(); 155 while(iterator.hasNext()) { 156 Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> input = iterator.next(); 157 String word = input._1; 158 int sum = 0; 159 for(Integer val : input._2._1) { 160 sum += val.intValue(); 161 } 162 for(Integer val : input._2._2) { 163 sum += val.intValue(); 164 } 165 out.add(new Tuple2<String,Integer>(word, Integer.valueOf(sum))); 166 } 167 return out.iterator(); 168 } 169 170 } 171 172 public static class JoinFunction 173 implements PairFlatMapFunction<Iterator<Tuple2<String,Tuple2<Integer,Integer>>>, String, Integer> { 174 175 @Override 176 public Iterator<Tuple2<String, Integer>> call( 177 Iterator<Tuple2<String, Tuple2<Integer, Integer>>> arg0) throws Exception { 178 // TODO Auto-generated method stub 179 return null; 180 } 181 182 } 183 184 185 186}