如何将csv文件转换为镶木地板

我是BigData的新手。我需要将csv / txt文件转换为Parquet格式。 我搜索了很多,但找不到任何直接的方法。 有没有办法实现这一目标?

这是一段代码,它可以两种方式完成。

您可以使用Apache Drill ,如将CSV文件转换为带有Drill的Apache Parquet中所述 。

简单来说:

启动Apache Drill:

 $ cd / opt / drill / bin
 $ sqlline -u jdbc:drill:zk = local

创建Parquet文件:

 - 将默认表格格式设置为镶木地板
 ALTER SESSION SET`store.format` ='parquet';

 - 创建包含CSV表中所有数据的镶木桌
 CREATE TABLE dfs.tmp ./ stats / airport_data /`AS
选择
 CAST(SUBSTR(列[0],1,4)AS INT)`YEAR`,
 CAST(SUBSTR(列[0],5,2)AS INT)`MONTH`,
列[1]为“AIRLINE”,
列[2]为“IATA_CODE”,
列[3]为“AIRLINE_2”,
列[4]为“IATA_CODE_2”,
列[5]为`GEO_SUMMARY`,
列[6]为`GEO_REGION`,
列[7]为“ACTIVITY_CODE”,
列[8]为“PRICE_CODE”,
列[9]为“TERMINAL”,
列[10]为`BOARDING_AREA`,
 CAST(列[11] AS DOUBLE)为“PASSENGER_COUNT”
 FROM dfs。/ opendata / Passenger / SFO_Passenger_Data / * .csv`;

尝试从新的Parquet文件中选择数据:

 - 从镶木地板表中选择数据
选择 *
来自dfs.tmp ./ stats / airport_data / *`

您可以通过转到http://localhost:8047/storage/dfs (来源: CSV和Parquet )来更改dfs.tmp位置。

以下代码是使用spark2.0的示例。 读取比inferSchema选项快得多。 Spark 2.0转换为镶木地板文件比spark1.6更有效。

 import org.apache.spark.sql.types._ var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) )) df = spark.read .schema(df) .option("header", "true") .option("delimiter", "\t") .csv("/user/hduser/wikipedia/pageviews-by-second-tsv") df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet") 

我已经使用Apache Drill发布了关于如何执行此操作的答案 。 但是,如果您熟悉Python,现在可以使用Pandas和PyArrow执行此操作!

安装依赖项

使用pip

 pip install pandas pyarrow 

或使用conda

 conda install pandas pyarrow -c conda-forge 

将CSV转换为Parquet的块

 # csv_to_parquet.py import pandas as pd import pyarrow as pa import pyarrow.parquet as pq csv_file = '/path/to/my.tsv' chunksize = 100_000 csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False) for i, chunk in enumerate(csv_stream): print("Chunk", i) if i == 0: # Guess the schema of the CSV file from the first chunk parquet_schema = pa.Table.from_pandas(df=chunk).schema # Open a Parquet file for writing parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy') # Write CSV chunk to the parquet file table = pa.Table.from_pandas(chunk, schema=parquet_schema) parquet_writer.write_table(table) parquet_writer.close() 

我没有针对Apache Drill版本对此代码进行基准测试,但根据我的经验,它很快,每秒转换成数万行(当然这取决于CSV文件!)。

1)您可以创建外部配置单元表

 create external table emp(name string,job_title string,department string,salary_per_year int) row format delimited fields terminated by ',' location '.. hdfs location of csv file ' 

2)另一个存储镶木地板文件的蜂巢表

 create external table emp_par(name string,job_title string,department string,salary_per_year int) row format delimited stored as PARQUET location 'hdfs location were you want the save parquet file' 

将表中的一个数据插入表二:

 insert overwrite table emp_par select * from emp 

使用spark-csv包 在Apache Spark中将 csv文件读取为Dataframe 。 将数据加载到Dataframe后将dataframe保存到parquetfile。

 val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .option("mode", "DROPMALFORMED") .load("/home/myuser/data/log/*.csv") df.saveAsParquetFile("/home/myuser/data.parquet") 
 from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * import sys sc = SparkContext(appName="CSV2Parquet") sqlContext = SQLContext(sc) schema = StructType([ StructField("col1", StringType(), True), StructField("col2", StringType(), True), StructField("col3", StringType(), True), StructField("col4", StringType(), True), StructField("col5", StringType(), True)]) rdd = sc.textFile('/input.csv').map(lambda line: line.split(",")) df = sqlContext.createDataFrame(rdd, schema) df.write.parquet('/output.parquet')