如何将.txt / .csv文件转换为ORC格式

对于某些要求,我想将文本文件(分隔)转换为ORC(优化行列)格式。 因为我必须定期运行它,所以我想编写一个java程序来执行此操作。 我不想使用Hive临时表解决方法。 有人可以帮我做吗? 以下是我的尝试

/*ORCMapper.java*/ import java.io.IOException; import java.util.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.hive.ql.io.orc.*; import org.apache.hadoop.io.*; public class ORCMapper extends MapReduceBase implements Mapper{ OrcSerde serde; @Override public void configure(JobConf job) { serde = new OrcSerde(); } @Override public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { output.collect(NullWritable.get(),serde.serialize(value, null)); } } /*ORCReducer.java*/ import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class ORCReducer extends MapReduceBase implements Reducer{ @Override public void reduce(NullWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Writable value = values.next(); output.collect(key, value); } } /*ORCDriver.java*/ import java.io.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.hive.ql.io.orc.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; public class ORCDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { JobClient client = new JobClient(); JobConf conf = new JobConf("ORC_Generator"); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputValueClass(Writable.class); conf.setOutputFormat(OrcOutputFormat.class); FileInputFormat.addInputPath(conf, new Path("hdfs://localhost:9000/path/to/ipdir/textfile")); OrcOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/path/to/opdir/orcfile")); conf.setMapperClass(ORCMapper.class); System.out.println(OrcOutputFormat.getWorkOutputPath(conf)); conf.setNumReduceTasks(0); client.setConf(conf); try { JobClient.runJob(conf); } catch (Exception e) { e.printStackTrace(); } } } 

运行此命令显示以下错误,并在我的本地生成名为part-00000的文件

 java.io.IOException: File already exists:part-00000 at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:249) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:241) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:335) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:381) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:364) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:564) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:545) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.ensureWriter(WriterImpl.java:1672) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:1688) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:1868) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:95) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:80) at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.close(MapTask.java:833) at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1763) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:439) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/09/02 11:23:26 INFO mapred.LocalJobRunner: Map task executor complete. 14/09/02 11:23:26 WARN mapred.LocalJobRunner: job_local688970064_0001 java.lang.Exception: java.lang.NullPointerException at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354) Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.io.orc.WriterImpl.createTreeWriter(WriterImpl.java:1515) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.(WriterImpl.java:154) at org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:258) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:63) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:46) at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:847) at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:591) at ORCMapper.map(ORCMapper.java:42) at ORCMapper.map(ORCMapper.java:1) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/09/02 11:23:26 INFO mapred.JobClient: map 0% reduce 0% 14/09/02 11:23:26 INFO mapred.JobClient: Job complete: job_local688970064_0001 14/09/02 11:23:26 INFO mapred.JobClient: Counters: 0 14/09/02 11:23:26 INFO mapred.JobClient: Job Failed: NA java.io.IOException: Job failed! at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1357) at ORCDriver.main(ORCDriver.java:53) 

您可以通过以下命令将文本数据插入到orc表中:

 insert overwrite table orcTable select * from textTable; 

第一个表是orcTable,由以下命令创建:

 create table orcTable(name string, city string) stored as orc; 

textTable与orcTable的结构相同。

您可以使用Sparkdataframe将分隔文件转换为orc格式非常容易。 您还可以指定/强制使用模式并过滤特定列。

 public class OrcConvert { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("OrcConvert"); JavaSparkContext jsc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(jsc); String inputPath = args[0]; String outputPath = args[1]; DataFrame inputDf = hiveContext.read().format("com.databricks.spark.csv") .option("quote", "'").option("delimiter", "\001") .load(inputPath); inputDf.write().orc(outputPath); } } 

确保满足所有依赖关系,hive也应该运行以使用HiveContext,目前只有HiveContext支持Spark ORC格式。