与csv文件相比,将mysql表转换为spark数据集的速度非常慢

我在amazon s3中有csv文件,大小为62mb(114000行)。 我正在将其转换为spark数据集,并从中获取前500行。 代码如下;

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true); Dataset set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+""); set.take(500) 

整个操作需要20到30秒。

现在我尝试相同但是使用csv我正在使用带有119 000行的mySQL表。 MySQL服务器在亚马逊ec2中。 代码如下;

 String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password; SparkSession spark=StartSpark.getSparkSession(); SQLContext sc = spark.sqlContext(); DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true); Dataset set = sc .read() .option("url", url) .option("dbtable", this.tableName) .option("driver","com.mysql.jdbc.Driver") .format("jdbc") .load(); set.take(500); 

这需要5到10分钟。 我在jvm里面运行火花。 在两种情况下使用相同的配置。

我可以使用partitionColumn,numParttition等但我没有任何数字列,还有一个问题是我不知道该表的模式。

我的问题不是如何减少所需的时间,因为我知道在理想情况下火花将在集群中运行但我无法理解的是为什么在上述两种情况下这个大的时间差异?

StackOverflow上已多次覆盖此问题:

  • 如何使用DataFrame和JDBC连接提高慢速Spark作业的性能?
  • spark jdbc df limit …它在做什么?
  • 如何使用JDBC源在(Py)Spark中写入和读取数据?

在外部来源:

所以重申一下 – 默认情况下, DataFrameReader.jdbc不会分发数据或读取。 它使用单线程,单个exectuor。

分发阅读:

  • 使用lowerBound / upperBound范围:

     Properties properties; Lower Dataset set = sc .read() .option("partitionColumn", "foo") .option("numPartitions", "3") .option("lowerBound", 0) .option("upperBound", 30) .option("url", url) .option("dbtable", this.tableName) .option("driver","com.mysql.jdbc.Driver") .format("jdbc") .load(); 
  • predicates

     Properties properties; Dataset set = sc .read() .jdbc( url, this.tableName, {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"}, properties ) 

请按照以下步骤操作

1.下载mysql的JDBC连接器副本。 我相信你已经拥有一个。

 wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar 

2.以下面的格式创建db-properties.flat文件

 jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase} user= password= 

3.首先在要加载数据的位置创建一个空表。

用驱动程序类调用spark shell

 spark-shell --driver-class-path  

然后导入所有必需的包

 import java.io.{File, FileInputStream} import java.util.Properties import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} 

启动配置单元上下文或sql上下文

 val sQLContext = new HiveContext(sc) import sQLContext.implicits._ import sQLContext.sql 

设置一些属性

 sQLContext.setConf("hive.exec.dynamic.partition", "true") sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 

从文件加载mysql db属性

 val dbProperties = new Properties() dbProperties.load(new FileInputStream(new File("your_path_to/db- properties.flat"))) val jdbcurl = dbProperties.getProperty("jdbcUrl") 

创建一个查询以从表中读取数据并将其传递给#sqlcontext的读取方法。 这是您可以管理where子句的地方

 val df1 = "(SELECT * FROM your_table_name) as s1" 

传递jdbcurl,选择查询和db属性来读取方法

 val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties) 

把它写在你的桌子上

 df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")