001/*
002 * Copyright (c) 2005-2012 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-03-21 22:20:31 +0000 (Fri, 21 Mar 2014) $' 
007 * '$Revision: 32633 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.stratosphere.io.input;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.List;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038
039import eu.stratosphere.api.common.io.statistics.BaseStatistics;
040import eu.stratosphere.api.java.record.io.FileInputFormat;
041import eu.stratosphere.core.fs.BlockLocation;
042import eu.stratosphere.core.fs.FileInputSplit;
043import eu.stratosphere.core.fs.FileStatus;
044import eu.stratosphere.core.fs.FileSystem;
045import eu.stratosphere.core.fs.Path;
046import eu.stratosphere.types.Record;
047import eu.stratosphere.types.StringValue;
048
049/**
050* Read the whole file as one <key, value> pair.
051* key is the file name, value is file content.
052* Built based on eu.stratosphere.pact.common.io.TextInputFormat class by changing readRecord() and createInputSplits().
053* 
054* @author Jianwu Wang
055* @version $Id: DataFileInputFormat.java 32633 2014-03-21 22:20:31Z crawl $
056*/
057public class DataFileInputFormat extends FileInputFormat
058{
059        
060        /**
061         * The log.
062         */
063        private static final Log LOG = LogFactory.getLog(DataFileInputFormat.class);
064        
065        // --------------------------------------------------------------------------------------------
066
067        private boolean end = false;
068        
069        private Path splitPath;
070        
071        
072        // --------------------------------------------------------------------------------------------
073
074        @Override
075        public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics)
076        {
077                return null;
078        }
079
080        @Override
081        public void open(FileInputSplit split) throws IOException
082        {
083                System.out.println("enter into open() with split: " + split);
084                super.open(split);
085                this.splitPath = split.getPath();
086
087                if (this.splitStart != 0) {
088                        System.out.println("this.start:" + this.splitStart + ", which is not 0");
089                        this.stream.seek(this.splitStart);
090                }
091        }
092        
093        @Override
094        public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException
095        {
096                final Path path = this.filePath;
097                final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>();
098
099                // get all the files that are involved in the splits
100                List<FileStatus> files = new ArrayList<FileStatus>();
101
102                final FileSystem fs = path.getFileSystem();
103                final FileStatus pathFile = fs.getFileStatus(path);
104
105                if (!pathFile.isDir()) {
106                        throw new IOException("the path: " + pathFile + " is not a dir, please specify a dir path.");
107                }
108                
109                // input is directory. list all contained files
110                final FileStatus[] dir = fs.listStatus(path);
111                for (int i = 0; i < dir.length; i++) {
112                        if (!dir[i].isDir()) {
113                                files.add(dir[i]);
114                        }
115                }
116                if (dir.length != minNumSplits)
117                        throw new IOException("split number '" + minNumSplits + "; is not equal to file number '" + dir.length + "'. Currently, to use DataFileInputFormat, split number has to be the same with the file number in the " + pathFile + ".");
118                
119                // now that we have the files, generate the splits
120                int splitNum = 0;
121                for (final FileStatus file : files) {
122
123                                // get the block locations and make sure they are in order with respect to their offset
124                                final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
125                                Arrays.sort(blocks);
126                                int blockIndex = 0;
127                                // create a new split
128                                FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, file.getLen(),
129                                                blocks[blockIndex].getHosts());
130                                System.out.println("FileInputSplit:" + fis);
131                                inputSplits.add(fis);
132
133                }
134                return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
135                
136        }
137
138        /**
139         * Checks whether the current split is at its end.
140         * 
141         * @return True, if the split is at its end, false otherwise.
142         */
143        @Override
144        public boolean reachedEnd()
145        {
146                System.out.println("enter into reachedEnd() with the value of this.end : " + this.end);
147                return this.end;
148        }
149        
150        @Override
151        public boolean nextRecord(Record record) throws IOException
152        {
153                final StringValue string = new StringValue();
154                System.out.println("enter into nextRecord() with the value of this.end : " + this.end);
155                if (this.end)
156                        return false;
157                byte[] oneToken = this.readToken();
158                //pair.setKey(new PactString (new Long(System.currentTimeMillis()).toString()));
159                record.setField(0, new StringValue(splitPath.getName()));
160                string.setValueAscii(oneToken, 0, oneToken.length);
161                record.setField(1, string);             
162                return true;
163        }
164
165        // --------------------------------------------------------------------------------------------
166
167        private byte[] readToken() throws IOException {
168                if (this.stream == null) {
169                        return null;
170                }
171                byte[] tmp = new byte[(int) this.splitLength];
172                System.out.println("Read one token whose start is :" + this.splitStart + ", and its length is : " + this.splitLength);
173                // FIXME what about read() return value?
174                this.stream.read(tmp, 0, (int) this.splitLength);
175                this.end = true;
176                return tmp;
177        }
178        
179}