001/* Hadoop InputSplit for Ptolemy tokens.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-07-02 15:58:19 +0000 (Wed, 02 Jul 2014) $' 
008 * '$Revision: 32804 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.hadoop.io.input;
031
032import java.io.DataInput;
033import java.io.DataOutput;
034import java.io.IOException;
035
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.io.Writable;
038import org.apache.hadoop.mapreduce.InputSplit;
039import org.kepler.ddp.Utilities;
040import org.kepler.hadoop.io.TokenWritable;
041
042import ptolemy.data.StringToken;
043import ptolemy.data.Token;
044
045/** Hadoop InputSplit for Ptolemy tokens.
046 * 
047 *  @author Daniel Crawl
048 *  @verion $Id: TokenInputSplit.java 32804 2014-07-02 15:58:19Z crawl $
049 */
050public class TokenInputSplit extends InputSplit implements Writable {
051
052    /** No argument constructor is required. */
053    public TokenInputSplit() {
054    }
055    
056    /** Create a TokenInputSplit for a token. */
057    public TokenInputSplit(Token token) {
058        _token = token;
059    }
060
061    /** Get the number of bytes in the split. */
062    @Override
063    public long getLength() throws IOException, InterruptedException {
064        
065        String tokenStr = null;
066        if(_token instanceof StringToken) {
067            tokenStr = ((StringToken)_token).stringValue();
068        } else {
069            tokenStr = _token.toString();
070        }
071        return new Text(tokenStr).getLength();
072    }
073
074    /** Get the host locations of the split. In this class, returns none. */
075    @Override
076    public String[] getLocations() throws IOException, InterruptedException {
077        return new String[] {};
078    }
079
080    /** Get a Writable for this split. */
081    public TokenWritable getTokenWritable() {
082        return new TokenWritable(_token);
083    }
084
085
086    /** Unserialize the fields for this split. */
087    @Override
088    public void readFields(DataInput in) throws IOException {
089        
090        // TODO copied from Stratosphere TokenInputSplit
091
092        // read token type
093        int typeVal = in.readUnsignedByte();
094        
095        if (typeVal >= HIGH_BIT) {
096            int shift = 7;
097            int curr;
098            typeVal = typeVal & 0x7f;
099            while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
100                typeVal |= (curr & 0x7f) << shift;
101                shift += 7;
102            }
103            typeVal |= curr << shift;
104        }
105        
106        Utilities.TokenType type = Utilities.TokenType.getInstance(typeVal);
107
108        // XXX copied from StringValue
109        
110        int len = in.readUnsignedByte();
111
112        if (len >= HIGH_BIT) {
113            int shift = 7;
114            int curr;
115            len = len & 0x7f;
116            while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
117                len |= (curr & 0x7f) << shift;
118                shift += 7;
119            }
120            len |= curr << shift;
121        }
122        
123        //this.len = len;
124        //this.hashCode = 0;
125        //ensureSize(len);
126        //final char[] data = this.value;
127        final char[] data = new char[len];
128
129        for (int i = 0; i < len; i++) {
130            int c = in.readUnsignedByte();
131            if (c < HIGH_BIT)
132                data[i] = (char) c;
133            else {
134                int shift = 7;
135                int curr;
136                c = c & 0x7f;
137                while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
138                    c |= (curr & 0x7f) << shift;
139                    shift += 7;
140                }
141                c |= curr << shift;
142                data[i] = (char) c;
143            }
144        }
145        
146        String tokenStr = String.valueOf(data);
147        _token = Utilities.createTokenFromString(tokenStr, type);
148        
149    }
150
151    /** Serialize the fields in this split. */
152    @Override
153    public void write(DataOutput out) throws IOException {
154        
155        // TODO copied from Stratosphere TokenInputSplit
156        
157        // write the token type
158        final Utilities.TokenType type = Utilities.getTokenTypeForSerialization(_token);
159        int typeVal = type.getValue();
160        
161        while(typeVal >= HIGH_BIT) {
162            out.write(typeVal | HIGH_BIT);
163            typeVal >>>= 7;
164        }
165        out.write(typeVal);
166        
167        // XXX copied from StringValue
168        
169        String tokenStr = null;
170        if(_token instanceof StringToken) {
171            tokenStr = ((StringToken)_token).stringValue();
172        } else {
173            tokenStr = _token.toString();
174        }
175        final int length = tokenStr.length();
176
177        int len = length;
178        // write the length, variable-length encoded
179        while (len >= HIGH_BIT) {
180            out.write(len | HIGH_BIT);
181            len >>>= 7;
182        }
183        out.write(len);
184
185        // write the char data, variable length encoded
186        for (int i = 0; i < len; i++) {
187            int c = tokenStr.charAt(i);
188
189            while (c >= HIGH_BIT) {
190                out.write(c | HIGH_BIT);
191                c >>>= 7;
192            }
193            out.write(c);
194        }
195    }
196
197    /** The token for this split. */
198    private Token _token;
199    
200    private static final int HIGH_BIT = 0x1 << 7;
201
202}