001/* Stratosphere input split for Ptolemy tokens. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.io.input; 031 032import java.io.DataInput; 033import java.io.DataOutput; 034import java.io.IOException; 035 036import org.kepler.ddp.Utilities; 037 038import eu.stratosphere.core.io.GenericInputSplit; 039import ptolemy.data.StringToken; 040import ptolemy.data.Token; 041 042/** An input split for Ptolemy tokens. 043 * 044 * @author Daniel Crawl 045 * @version $Id: TokenInputSplit.java 33628 2015-08-24 22:42:20Z crawl $ 046 * 047 */ 048public class TokenInputSplit extends GenericInputSplit { 049 050 /** A constructor that takes no arguments is required. */ 051 public TokenInputSplit() { 052 } 053 054 /** Create a new TokenInputSplit. 055 * @param i the split number. 056 * @param token the token. 057 */ 058 public TokenInputSplit(int i, Token token) { 059 super(i); 060 _token = token; 061 //System.out.println("new split " + i + " for " + token); 062 } 063 064 /** Get the token. */ 065 public Token getToken() { 066 return _token; 067 } 068 069 /** Write the token to the output stream. */ 070 @Override 071 public void write(DataOutput out) throws IOException { 072 073 super.write(out); 074 075 // write the token type 076 final Utilities.TokenType type = Utilities.getTokenTypeForSerialization(_token); 077 int typeVal = type.getValue(); 078 079 while(typeVal >= HIGH_BIT) { 080 out.write(typeVal | HIGH_BIT); 081 typeVal >>>= 7; 082 } 083 out.write(typeVal); 084 085 // XXX copied from StringValue 086 087 String tokenStr = null; 088 if(_token instanceof StringToken) { 089 tokenStr = ((StringToken)_token).stringValue(); 090 } else { 091 tokenStr = _token.toString(); 092 } 093 final int length = tokenStr.length(); 094 095 int len = length; 096 // write the length, variable-length encoded 097 while (len >= HIGH_BIT) { 098 out.write(len | HIGH_BIT); 099 len >>>= 7; 100 } 101 out.write(len); 102 103 // write the char data, variable length encoded 104 for (int i = 0; i < len; i++) { 105 int c = tokenStr.charAt(i); 106 107 while (c >= HIGH_BIT) { 108 out.write(c | HIGH_BIT); 109 c >>>= 7; 110 } 111 out.write(c); 112 } 113 114 //System.out.println("done writing split for " + _token.stringValue()); 115 116 } 117 118 /** Read a token from the input stream. */ 119 @Override 120 public void read(DataInput in) throws IOException { 121 122 super.read(in); 123 124 // read token type 125 int typeVal = in.readUnsignedByte(); 126 127 if (typeVal >= HIGH_BIT) { 128 int shift = 7; 129 int curr; 130 typeVal = typeVal & 0x7f; 131 while ((curr = in.readUnsignedByte()) >= HIGH_BIT) { 132 typeVal |= (curr & 0x7f) << shift; 133 shift += 7; 134 } 135 typeVal |= curr << shift; 136 } 137 138 Utilities.TokenType type = Utilities.TokenType.getInstance(typeVal); 139 140 // XXX copied from StringValue 141 142 int len = in.readUnsignedByte(); 143 144 if (len >= HIGH_BIT) { 145 int shift = 7; 146 int curr; 147 len = len & 0x7f; 148 while ((curr = in.readUnsignedByte()) >= HIGH_BIT) { 149 len |= (curr & 0x7f) << shift; 150 shift += 7; 151 } 152 len |= curr << shift; 153 } 154 155 //this.len = len; 156 //this.hashCode = 0; 157 //ensureSize(len); 158 //final char[] data = this.value; 159 final char[] data = new char[len]; 160 161 for (int i = 0; i < len; i++) { 162 int c = in.readUnsignedByte(); 163 if (c < HIGH_BIT) 164 data[i] = (char) c; 165 else { 166 int shift = 7; 167 int curr; 168 c = c & 0x7f; 169 while ((curr = in.readUnsignedByte()) >= HIGH_BIT) { 170 c |= (curr & 0x7f) << shift; 171 shift += 7; 172 } 173 c |= curr << shift; 174 data[i] = (char) c; 175 } 176 } 177 178 String tokenStr = String.valueOf(data); 179 _token = Utilities.createTokenFromString(tokenStr, type); 180 181 } 182 183 /** Get a string representation of this split. */ 184 @Override 185 public String toString() { 186 187 String tokenStr = null; 188 if(_token instanceof StringToken) { 189 tokenStr = ((StringToken)_token).stringValue(); 190 } else { 191 tokenStr = _token.toString(); 192 } 193 194 return "{ number: " + number + ", token: " + tokenStr + " }"; 195 } 196 197 /** The token data. */ 198 private Token _token; 199 200 private static final int HIGH_BIT = 0x1 << 7; 201 202}