001/* Stratosphere input split for Ptolemy tokens.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.io.input;
031
032import java.io.DataInput;
033import java.io.DataOutput;
034import java.io.IOException;
035
036import org.kepler.ddp.Utilities;
037
038import eu.stratosphere.core.io.GenericInputSplit;
039import ptolemy.data.StringToken;
040import ptolemy.data.Token;
041
042/** An input split for Ptolemy tokens. 
043 * 
044 * @author Daniel Crawl
045 * @version $Id: TokenInputSplit.java 33628 2015-08-24 22:42:20Z crawl $
046 * 
047 */
048public class TokenInputSplit extends GenericInputSplit {
049
050        /** A constructor that takes no arguments is required. */
051        public TokenInputSplit() {
052        }
053        
054        /** Create a new TokenInputSplit.
055         *  @param i the split number.
056         *  @param token the token.
057         */
058        public TokenInputSplit(int i, Token token) {
059                super(i);
060                _token = token;
061                //System.out.println("new split " + i + " for " + token);
062        }
063
064        /** Get the token. */
065        public Token getToken() {
066                return _token;
067        }
068        
069        /** Write the token to the output stream. */
070        @Override
071        public void write(DataOutput out) throws IOException {
072
073                super.write(out);
074                
075                // write the token type
076                final Utilities.TokenType type = Utilities.getTokenTypeForSerialization(_token);
077                int typeVal = type.getValue();
078                
079                while(typeVal >= HIGH_BIT) {
080                    out.write(typeVal | HIGH_BIT);
081                    typeVal >>>= 7;
082                }
083                out.write(typeVal);
084                
085                // XXX copied from StringValue
086                
087                String tokenStr = null;
088                if(_token instanceof StringToken) {
089                    tokenStr = ((StringToken)_token).stringValue();
090                } else {
091                    tokenStr = _token.toString();
092                }
093                final int length = tokenStr.length();
094
095                int len = length;
096                // write the length, variable-length encoded
097                while (len >= HIGH_BIT) {
098                        out.write(len | HIGH_BIT);
099                        len >>>= 7;
100                }
101                out.write(len);
102
103                // write the char data, variable length encoded
104                for (int i = 0; i < len; i++) {
105                        int c = tokenStr.charAt(i);
106
107                        while (c >= HIGH_BIT) {
108                                out.write(c | HIGH_BIT);
109                                c >>>= 7;
110                        }
111                        out.write(c);
112                }
113                
114                //System.out.println("done writing split for " + _token.stringValue());
115                
116        }
117
118        /** Read a token from the input stream. */
119        @Override
120        public void read(DataInput in) throws IOException {
121                
122                super.read(in);
123                
124                // read token type
125                int typeVal = in.readUnsignedByte();
126                
127                if (typeVal >= HIGH_BIT) {
128                    int shift = 7;
129                    int curr;
130                    typeVal = typeVal & 0x7f;
131                    while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
132                        typeVal |= (curr & 0x7f) << shift;
133                        shift += 7;
134                    }
135                    typeVal |= curr << shift;
136                }
137                
138                Utilities.TokenType type = Utilities.TokenType.getInstance(typeVal);
139
140                // XXX copied from StringValue
141                
142                int len = in.readUnsignedByte();
143
144                if (len >= HIGH_BIT) {
145                        int shift = 7;
146                        int curr;
147                        len = len & 0x7f;
148                        while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
149                                len |= (curr & 0x7f) << shift;
150                                shift += 7;
151                        }
152                        len |= curr << shift;
153                }
154                
155                //this.len = len;
156                //this.hashCode = 0;
157                //ensureSize(len);
158                //final char[] data = this.value;
159                final char[] data = new char[len];
160
161                for (int i = 0; i < len; i++) {
162                        int c = in.readUnsignedByte();
163                        if (c < HIGH_BIT)
164                                data[i] = (char) c;
165                        else {
166                                int shift = 7;
167                                int curr;
168                                c = c & 0x7f;
169                                while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
170                                        c |= (curr & 0x7f) << shift;
171                                        shift += 7;
172                                }
173                                c |= curr << shift;
174                                data[i] = (char) c;
175                        }
176                }
177                
178                String tokenStr = String.valueOf(data);
179                _token = Utilities.createTokenFromString(tokenStr, type);
180
181        }
182        
183        /** Get a string representation of this split. */
184        @Override
185        public String toString() {
186            
187            String tokenStr = null;
188        if(_token instanceof StringToken) {
189            tokenStr = ((StringToken)_token).stringValue();
190        } else {
191            tokenStr = _token.toString();
192        }
193
194                return "{ number: " + number + ", token: " + tokenStr + " }";
195        }
196
197        /** The token data. */
198        private Token _token;
199        
200        private static final int HIGH_BIT = 0x1 << 7;
201
202}