001/* Hadoop InputSplit for Ptolemy tokens. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-07-02 15:58:19 +0000 (Wed, 02 Jul 2014) $' 008 * '$Revision: 32804 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.hadoop.io.input; 031 032import java.io.DataInput; 033import java.io.DataOutput; 034import java.io.IOException; 035 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.io.Writable; 038import org.apache.hadoop.mapreduce.InputSplit; 039import org.kepler.ddp.Utilities; 040import org.kepler.hadoop.io.TokenWritable; 041 042import ptolemy.data.StringToken; 043import ptolemy.data.Token; 044 045/** Hadoop InputSplit for Ptolemy tokens. 046 * 047 * @author Daniel Crawl 048 * @verion $Id: TokenInputSplit.java 32804 2014-07-02 15:58:19Z crawl $ 049 */ 050public class TokenInputSplit extends InputSplit implements Writable { 051 052 /** No argument constructor is required. */ 053 public TokenInputSplit() { 054 } 055 056 /** Create a TokenInputSplit for a token. */ 057 public TokenInputSplit(Token token) { 058 _token = token; 059 } 060 061 /** Get the number of bytes in the split. */ 062 @Override 063 public long getLength() throws IOException, InterruptedException { 064 065 String tokenStr = null; 066 if(_token instanceof StringToken) { 067 tokenStr = ((StringToken)_token).stringValue(); 068 } else { 069 tokenStr = _token.toString(); 070 } 071 return new Text(tokenStr).getLength(); 072 } 073 074 /** Get the host locations of the split. In this class, returns none. */ 075 @Override 076 public String[] getLocations() throws IOException, InterruptedException { 077 return new String[] {}; 078 } 079 080 /** Get a Writable for this split. */ 081 public TokenWritable getTokenWritable() { 082 return new TokenWritable(_token); 083 } 084 085 086 /** Unserialize the fields for this split. */ 087 @Override 088 public void readFields(DataInput in) throws IOException { 089 090 // TODO copied from Stratosphere TokenInputSplit 091 092 // read token type 093 int typeVal = in.readUnsignedByte(); 094 095 if (typeVal >= HIGH_BIT) { 096 int shift = 7; 097 int curr; 098 typeVal = typeVal & 0x7f; 099 while ((curr = in.readUnsignedByte()) >= HIGH_BIT) { 100 typeVal |= (curr & 0x7f) << shift; 101 shift += 7; 102 } 103 typeVal |= curr << shift; 104 } 105 106 Utilities.TokenType type = Utilities.TokenType.getInstance(typeVal); 107 108 // XXX copied from StringValue 109 110 int len = in.readUnsignedByte(); 111 112 if (len >= HIGH_BIT) { 113 int shift = 7; 114 int curr; 115 len = len & 0x7f; 116 while ((curr = in.readUnsignedByte()) >= HIGH_BIT) { 117 len |= (curr & 0x7f) << shift; 118 shift += 7; 119 } 120 len |= curr << shift; 121 } 122 123 //this.len = len; 124 //this.hashCode = 0; 125 //ensureSize(len); 126 //final char[] data = this.value; 127 final char[] data = new char[len]; 128 129 for (int i = 0; i < len; i++) { 130 int c = in.readUnsignedByte(); 131 if (c < HIGH_BIT) 132 data[i] = (char) c; 133 else { 134 int shift = 7; 135 int curr; 136 c = c & 0x7f; 137 while ((curr = in.readUnsignedByte()) >= HIGH_BIT) { 138 c |= (curr & 0x7f) << shift; 139 shift += 7; 140 } 141 c |= curr << shift; 142 data[i] = (char) c; 143 } 144 } 145 146 String tokenStr = String.valueOf(data); 147 _token = Utilities.createTokenFromString(tokenStr, type); 148 149 } 150 151 /** Serialize the fields in this split. */ 152 @Override 153 public void write(DataOutput out) throws IOException { 154 155 // TODO copied from Stratosphere TokenInputSplit 156 157 // write the token type 158 final Utilities.TokenType type = Utilities.getTokenTypeForSerialization(_token); 159 int typeVal = type.getValue(); 160 161 while(typeVal >= HIGH_BIT) { 162 out.write(typeVal | HIGH_BIT); 163 typeVal >>>= 7; 164 } 165 out.write(typeVal); 166 167 // XXX copied from StringValue 168 169 String tokenStr = null; 170 if(_token instanceof StringToken) { 171 tokenStr = ((StringToken)_token).stringValue(); 172 } else { 173 tokenStr = _token.toString(); 174 } 175 final int length = tokenStr.length(); 176 177 int len = length; 178 // write the length, variable-length encoded 179 while (len >= HIGH_BIT) { 180 out.write(len | HIGH_BIT); 181 len >>>= 7; 182 } 183 out.write(len); 184 185 // write the char data, variable length encoded 186 for (int i = 0; i < len; i++) { 187 int c = tokenStr.charAt(i); 188 189 while (c >= HIGH_BIT) { 190 out.write(c | HIGH_BIT); 191 c >>>= 7; 192 } 193 out.write(c); 194 } 195 } 196 197 /** The token for this split. */ 198 private Token _token; 199 200 private static final int HIGH_BIT = 0x1 << 7; 201 202}