001/* A CoGroupStub that sums the values in both inputs. 002 * 003 * Copyright (c) 2013 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-03-17 23:36:57 +0000 (Mon, 17 Mar 2014) $' 008 * '$Revision: 32617 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.util.Iterator; 033 034import org.kepler.stratosphere.type.TypeUtilities; 035 036import eu.stratosphere.api.java.record.functions.CoGroupFunction; 037import eu.stratosphere.types.IntValue; 038import eu.stratosphere.types.Record; 039import eu.stratosphere.util.Collector; 040 041/** A CoGroupStub that sums the values in both inputs. This 042 * class is used for word count using cogroup. 043 * 044 * @author Daniel Crawl 045 * @version $Id: SumValuesCoGroup.java 32617 2014-03-17 23:36:57Z crawl $ 046 */ 047 048public class SumValuesCoGroup extends CoGroupFunction { 049 050 @Override 051 public void coGroup(Iterator<Record> records1, 052 Iterator<Record> records2, Collector<Record> out) throws Exception { 053 054 Record element = null; 055 int sum = 0; 056 057 while (records1.hasNext()) { 058 element = records1.next(); 059 IntValue i = element.getField(TypeUtilities.VALUE_FIELD, IntValue.class); 060 sum += i.getValue(); 061 } 062 063 while (records2.hasNext()) { 064 element = records2.next(); 065 IntValue i = element.getField(TypeUtilities.VALUE_FIELD, IntValue.class); 066 sum += i.getValue(); 067 } 068 069 this._count.setValue(sum); 070 element.setField(1, this._count); 071 out.collect(element); 072 073 } 074 075 private final IntValue _count = new IntValue(); 076}