将分析数据从Spark插入Postgres

我有Cassandra数据库,我通过Apache Spark使用SparkSQL分析数据。 现在我想将这些分析的数据插入到PostgreSQL中。 有没有办法直接实现这一点,除了使用PostgreSQL驱动程序(我使用postREST和驱动程序实现它我想知道是否有任何方法,如saveToCassandra() )?

目前没有将RDD写入任何DBMS的本机实现。 以下是Spark用户列表中相关讨论的链接: 一 , 二

一般来说,性能最佳的方法如下:

  1. validationRDD中的分区数,它不应该太低和太高。 20-50个分区应该没问题,如果数量较少 – 使用20个分区调用repartition分区,如果更高 – 调用coalesce到50个分区
  2. 调用mapPartition转换,在其内部调用函数,使用JDBC将记录插入到DBMS中。 在此函数中,您打开与数据库的连接并使用带有此API的COPY命令,这将允许您消除对每条记录单独命令的需要 – 这样可以更快地处理插入

这样,您可以使用多达50个并行连接以并行方式将数据插入Postgres(取决于您的Spark群集大小及其配置)。 整个方法可以实现为接受RDD和连接字符串的Java / Scala函数

回答0x0FFF是好的。 这是一个有用的附加点。

我使用foreachPartition来持久存储到外部存储。 这也与Design Patterns for using foreachRDD Spark文档中提供的Design Patterns for using foreachRDD的设计模式Design Patterns for using foreachRDD https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams

例:

 dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } 

你可以使用Postgres copy api来编写它,它的速度要快得多。 请参阅以下两种方法 – 一种迭代RDD以填充可由copy api保存的缓冲区。 您唯一需要注意的是以csv格式创建正确的语句,该语句将由copy api使用。

 def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = { val sb = mutable.StringBuilder.newBuilder val now = System.currentTimeMillis() rdd.collect().foreach(itr => { itr.foreach(_.createCSV(sb, now).append("\n")) }) copyIn("myTable", new StringReader(sb.toString), "statement") sb.clear } def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = { val conn = connectionPool.getConnection() try { conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader) } catch { case se: SQLException => logWarning(se.getMessage) case t: Throwable => logWarning(t.getMessage) } finally { conn.close() } }