如何使用纯Java生成Parquet文件(包括日期和小数类型)并上传到S3 (无HDFS)

我最近有一个要求,我需要生成Parquet文件,Apache Spark只能使用Java读取(不使用其他软件安装,如:Apache Drill,Hive,Spark等)。 这些文件需要保存到S3,因此我将分享有关如何执行这两项操作的详细信息。

关于如何做到这一点没有简单的指南。 我也不是Java程序员,因此使用Maven,Hadoop等的概念对我来说都是陌生的。 所以我花了将近两周的时间来完成这项工作。 我想在下面分享我的个人指南,了解我是如何实现这一目标的

免责声明:以下代码示例绝不代表最佳做法,仅作为粗略的操作方法呈现。

依赖关系:

  • parquet-avro(1.9.0): https ://mvnrepository.com/artifact/org.apache.parquet/parquet-avro/1.9.0(我们使用1.9.0,因为此版本使用Avro 1.8+支持Decimals和Dates )
  • hadoop-aws(2.8.2)[如果你不打算写入S3,你将不需要这个,但你需要添加其他几个通常会被添加的依赖项,这要归功于此。 我不会介绍这种情况。 因此,即使您只在本地磁盘上生成Parquet文件,您仍然可以将其作为依赖项添加到项目中: https : //mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/ 2.8.2 (我们使用它是因为它是当时的最新版本)
  • Hadoop 2.8.1: https : //github.com/steveloughran/winutils/tree/master/hadoop-2.8.1 (我们使用2.8.X,因为它需要匹配镶木地板和hadoop-aws中使用的hadoop库依赖)

我将使用NetBeans作为我的IDE。

关于Java中的镶木地板的一些信息(对于像我这样的新手):

  • 为了将数据序列化为镶木地板,您必须选择一个流行的Java数据序列化框架:Avro,Protocol Buffers或Thrift(我将使用Avro(1.8.0),从我们的镶木地板avro依赖项中可以看出)
  • 您将需要使用支持Maven的IDE。 这是因为上面的依赖关系有很多自己的依赖关系。 Maven将自动为您下载(如NuGet for VisualStudio)

先决条件:

您必须在将运行Java代码的Windows机器上安装hadoop。 好消息是你不需要安装整个hadoop软件,而只需要两个文件:

  • hadoop.dll
  • winutils.exe

这些可以在这里下载。 这个例子你需要2.8.1版本(由于parquet-avro 1.9.0)。

  1. 将这些文件复制到目标计算机上的C:\ hadoop-2.8.1 \ bin
  2. 添加一个名为HADOOP_HOME的新系统变量(非用户变量),其值为C:\ hadoop-2.8.1

    systemvariable

  3. 修改系统路径变量(不是用户变量)并将以下内容添加到结尾: %HADOOP_HOME%\ bin

  4. 重新启动计算机以使更改生效。

如果此配置未正确完成,您将在运行时收到以下错误: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

编码入门:

  • 首先创建一个新的空Maven项目,并添加parquet-avro 1.9.0和hadoop-aws 2.8.2作为依赖项: 依赖
  • 创建您的主类,您可以在其中编写一些代码
  • 首先,您需要生成一个Schema 。 现在据我所知,你无法在运行时以编程方式生成模式。 Schema.Parser类的’ parse()方法仅将文件或字符串文字作为参数,并且在创建模式后不允许您修改模式。 为了避免这种情况,我在运行时生成我的Schema JSON并解析它。 下面是一个示例Schema:

     String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything + "\"type\": \"record\"," //Must be set as record + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything + "\"fields\": [" + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field + " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]}," + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]}," + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}" + " ]}"; Parser parser = new Schema.Parser().setValidate(true); Schema avroSchema = parser.parse(schema); 

    有关Avro架构的详细信息,请访问: https : //avro.apache.org/docs/1.8.0/spec.html

  • 接下来我们可以开始生成记录(Avro原始类型很简单):

     GenericData.Record record = new GenericData.Record(avroSchema); record.put("myInteger", 1); record.put("myString", "string value 1"); 
    • 为了生成十进制逻辑类型,必须使用固定字节基元类型作为存储的实际数据类型。 当前的Parquet格式仅支持固定长度字节数组(aka: fixed_len_byte_array )。 所以我们也必须在我们的案例中使用fixed (在模式中可以看到)。 在Java中,我们必须使用BigDecimal才能真正处理小数。 我已经确定,无论值如何, Decimal(32,4)都不会超过16个字节。 因此,我们将在下面的序列化中使用标准字节数组大小16(并在上面的模式中):
     BigDecimal myDecimalValue = new BigDecimal("99.9999"); //First we need to make sure the BigDecimal matches our schema scale: myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP); //Next we get the decimal value as one BigInteger (like there was no decimal point) BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue(); //Finally we serialize the integer byte[] decimalBytes = myUnscaledDecimalValue.toByteArray(); //We need to create an Avro 'Fixed' type and pass the decimal schema once more here: GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}")); byte[] myDecimalBuffer = new byte[16]; if (myDecimalBuffer.length >= decimalBytes.length) { //Because we set our fixed byte array size as 16 bytes, we need to //pad-left our original value's bytes with zeros int myDecimalBufferIndex = myDecimalBuffer.length - 1; for(int i = decimalBytes.length - 1; i >= 0; i--){ myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i]; myDecimalBufferIndex--; } //Save result fixed.bytes(myDecimalBuffer); } else { throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length)); } //We can finally write our decimal to our record record.put("myDecimal", fixed); 
  • 对于Date值,Avro指定我们需要将自EPOCH以来的天数保存为整数。 (如果您还需要时间组件,例如实际的DateTime类型,则需要将日期和时间分别写为两个字段。对于时间组件,您可以使用Timestamp Avro类型,我将不会介绍)。 我发现获得自纪元以来的天数的最简单方法是使用joda-time库。 如果您将hadoop-aws依赖项添加到项目中,那么您应该已经拥有此库。 如果不是,您需要自己添加:

     //Get epoch value MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC); DateTime currentDate = new DateTime(); //Can take Java Date in constructor Days days = Days.daysBetween(epoch, currentDate); //We can write number of days since epoch into the record record.put("myDate", days.getDays()); 
  • 我们终于可以开始编写我们的镶木地板文件了

     try { Configuration conf = new Configuration(); conf.set("fs.s3a.access.key", "ACCESSKEY"); conf.set("fs.s3a.secret.key", "SECRETKEY"); //Below are some other helpful settings //conf.set("fs.s3a.endpoint", "s3.amazonaws.com"); //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library. //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet"); //Use path below to save to local file system instead //Path path = new Path("data.parquet"); try (ParquetWriter writer = AvroParquetWriter.builder(path) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.GZIP) .withConf(conf) .withPageSize(4 * 1024 * 1024) //For compression .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size) .build()) { //We only have one record to write in our example writer.write(record); } } catch (Exception ex) { ex.printStackTrace(System.out); } 
  • 这是加载到Apache Spark(2.2.0)中的数据: 火花

为了您的方便,整个源代码:

 package com.mycompany.stackoverflow; import java.math.BigDecimal; import java.math.BigInteger; import java.math.RoundingMode; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Days; import org.joda.time.MutableDateTime; public class Main { public static void main(String[] args) { System.out.println("Start"); String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything + "\"type\": \"record\"," //Must be set as record + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything + "\"fields\": [" + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field + " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]}," + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]}," + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}" + " ]}"; Schema.Parser parser = new Schema.Parser().setValidate(true); Schema avroSchema = parser.parse(schema); GenericData.Record record = new GenericData.Record(avroSchema); record.put("myInteger", 1); record.put("myString", "string value 1"); BigDecimal myDecimalValue = new BigDecimal("99.9999"); //First we need to make sure the huge decimal matches our schema scale: myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP); //Next we get the decimal value as one BigInteger (like there was no decimal point) BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue(); //Finally we serialize the integer byte[] decimalBytes = myUnscaledDecimalValue.toByteArray(); //We need to create an Avro 'Fixed' type and pass the decimal schema once more here: GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}")); byte[] myDecimalBuffer = new byte[16]; if (myDecimalBuffer.length >= decimalBytes.length) { //Because we set our fixed byte array size as 16 bytes, we need to //pad-left our original value's bytes with zeros int myDecimalBufferIndex = myDecimalBuffer.length - 1; for(int i = decimalBytes.length - 1; i >= 0; i--){ myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i]; myDecimalBufferIndex--; } //Save result fixed.bytes(myDecimalBuffer); } else { throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length)); } //We can finally write our decimal to our record record.put("myDecimal", fixed); //Get epoch value MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC); DateTime currentDate = new DateTime(); //Can take Java Date in constructor Days days = Days.daysBetween(epoch, currentDate); //We can write number of days since epoch into the record record.put("myDate", days.getDays()); try { Configuration conf = new Configuration(); conf.set("fs.s3a.access.key", "ACCESSKEY"); conf.set("fs.s3a.secret.key", "SECRETKEY"); //Below are some other helpful settings //conf.set("fs.s3a.endpoint", "s3.amazonaws.com"); //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library. //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors. Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet"); //Use path below to save to local file system instead //Path path = new Path("data.parquet"); try (ParquetWriter writer = AvroParquetWriter.builder(path) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.GZIP) .withConf(conf) .withPageSize(4 * 1024 * 1024) //For compression .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size) .build()) { //We only have one record to write in our example writer.write(record); } } catch (Exception ex) { ex.printStackTrace(System.out); } } }