如何将csv文件转换为镶木地板
我是BigData的新手。我需要将csv / txt文件转换为Parquet格式。 我搜索了很多,但找不到任何直接的方法。 有没有办法实现这一目标?
这是一段代码,它可以两种方式完成。
您可以使用Apache Drill ,如将CSV文件转换为带有Drill的Apache Parquet中所述 。
简单来说:
启动Apache Drill:
$ cd / opt / drill / bin $ sqlline -u jdbc:drill:zk = local
创建Parquet文件:
- 将默认表格格式设置为镶木地板 ALTER SESSION SET`store.format` ='parquet'; - 创建包含CSV表中所有数据的镶木桌 CREATE TABLE dfs.tmp ./ stats / airport_data /`AS 选择 CAST(SUBSTR(列[0],1,4)AS INT)`YEAR`, CAST(SUBSTR(列[0],5,2)AS INT)`MONTH`, 列[1]为“AIRLINE”, 列[2]为“IATA_CODE”, 列[3]为“AIRLINE_2”, 列[4]为“IATA_CODE_2”, 列[5]为`GEO_SUMMARY`, 列[6]为`GEO_REGION`, 列[7]为“ACTIVITY_CODE”, 列[8]为“PRICE_CODE”, 列[9]为“TERMINAL”, 列[10]为`BOARDING_AREA`, CAST(列[11] AS DOUBLE)为“PASSENGER_COUNT” FROM dfs。/ opendata / Passenger / SFO_Passenger_Data / * .csv`;
尝试从新的Parquet文件中选择数据:
- 从镶木地板表中选择数据 选择 * 来自dfs.tmp ./ stats / airport_data / *`
您可以通过转到http://localhost:8047/storage/dfs
(来源: CSV和Parquet )来更改dfs.tmp
位置。
以下代码是使用spark2.0的示例。 读取比inferSchema选项快得多。 Spark 2.0转换为镶木地板文件比spark1.6更有效。
import org.apache.spark.sql.types._ var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) )) df = spark.read .schema(df) .option("header", "true") .option("delimiter", "\t") .csv("/user/hduser/wikipedia/pageviews-by-second-tsv") df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
我已经使用Apache Drill发布了关于如何执行此操作的答案 。 但是,如果您熟悉Python,现在可以使用Pandas和PyArrow执行此操作!
安装依赖项
使用pip
:
pip install pandas pyarrow
或使用conda
:
conda install pandas pyarrow -c conda-forge
将CSV转换为Parquet的块
# csv_to_parquet.py import pandas as pd import pyarrow as pa import pyarrow.parquet as pq csv_file = '/path/to/my.tsv' chunksize = 100_000 csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False) for i, chunk in enumerate(csv_stream): print("Chunk", i) if i == 0: # Guess the schema of the CSV file from the first chunk parquet_schema = pa.Table.from_pandas(df=chunk).schema # Open a Parquet file for writing parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy') # Write CSV chunk to the parquet file table = pa.Table.from_pandas(chunk, schema=parquet_schema) parquet_writer.write_table(table) parquet_writer.close()
我没有针对Apache Drill版本对此代码进行基准测试,但根据我的经验,它很快,每秒转换成数万行(当然这取决于CSV文件!)。
1)您可以创建外部配置单元表
create external table emp(name string,job_title string,department string,salary_per_year int) row format delimited fields terminated by ',' location '.. hdfs location of csv file '
2)另一个存储镶木地板文件的蜂巢表
create external table emp_par(name string,job_title string,department string,salary_per_year int) row format delimited stored as PARQUET location 'hdfs location were you want the save parquet file'
将表中的一个数据插入表二:
insert overwrite table emp_par select * from emp
使用spark-csv包 在Apache Spark中将 csv文件读取为Dataframe 。 将数据加载到Dataframe后将dataframe保存到parquetfile。
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .option("mode", "DROPMALFORMED") .load("/home/myuser/data/log/*.csv") df.saveAsParquetFile("/home/myuser/data.parquet")
from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * import sys sc = SparkContext(appName="CSV2Parquet") sqlContext = SQLContext(sc) schema = StructType([ StructField("col1", StringType(), True), StructField("col2", StringType(), True), StructField("col3", StringType(), True), StructField("col4", StringType(), True), StructField("col5", StringType(), True)]) rdd = sc.textFile('/input.csv').map(lambda line: line.split(",")) df = sqlContext.createDataFrame(rdd, schema) df.write.parquet('/output.parquet')