使用RabbitMQ源的Spark结构化流式传输

我正在尝试编写一个Structured Streaming的自定义接收器,它将使用来自RabbitMQ消息。 Spark 最近发布了 DataSource V2 API,这似乎非常有前景。 由于它抽象了很多细节,我想在简单性和性能方面使用这个API。 但是,由于它很新,因此可用的资源不多。 我需要经验丰富的Spark人员做一些澄清,因为他们会更容易掌握关键点。 开始了:

我的出发点是博客文章系列,第一部分在这里 。 它显示了如何在没有流function的情况下实现数据源。 为了制作流媒体源,我略微改变了它们,因为我需要实现MicroBatchReadSupport而不是(或除了) DataSourceV2 。

为了提高效率,让多个spark执行器同时使用RabbitMQ是明智的,即从同一队列中消耗RabbitMQ 。 如果我没有感到困惑,输入的每个分区-in Spark的术语 – 对应于来自队列的消费者-in RabbitMQ术语。 因此,我们需要为输入流分配多个分区,对吧?

与本系列的第4部分类似,我实现了MicroBatchReader ,如下所示:

 @Override public List<DataReaderFactory> createDataReaderFactories() { int partition = options.getInt(RMQ.PARTITICN, 5); List<DataReaderFactory> factories = new LinkedList(); for (int i = 0; i < partition; i++) { factories.add(new RMQDataReaderFactory(options)); } return factories; } 

我将返回一个工厂列表,并希望列表中的每个实例都将用于创建一个读者,这也将是一个消费者。 这种方法是否正确?

我希望我的接收器是可靠的,即在每个处理完的消息之后(或至少写入chekpoint目录进行进一步处理),我需要将它恢复到RabbitMQ 。 问题在这之后开始:这些工厂是在驱动程序中创建的,实际的读取过程是通过DataReader在执行程序中进行的。 但是, commit方法是MicroBatchReader的一部分,而不是DataReader 。 由于每个MicroBatchReader有很多DataReader ,我应该如何将这些消息发送回RabbitMQ ? 或者我应该在DataReader上调用下一个方法时确认? 安全吗? 如果是这样,那么commitfunction的目的是什么呢?

澄清: OBFUSCATION:关于重命名某些类/函数的答案中提供的链接(除了那里的解释)使得一切 更清楚 比以往更糟糕 。 引自那里 :

重命名:

  • DataReaderFactoryInputPartition

  • DataReaderInputPartitionReader

InputPartition的目的是管理相关阅读器的生命周期,现在称为InputPartitionReader ,并使用显式创建操作来镜像关闭操作。 API中不再清楚这一点,因为DataReaderFactory似乎比它更通用,并且不清楚为什么会为读取生成一组它们。

编辑:然而, 文档清楚地说“阅读器工厂将被序列化并发送给执行者,然后数据阅读器将在执行器上创建并进行实际阅读。”

为了使消费者可靠,我必须在Spark方面提交之后才确认特定消息。 请注意,必须在通过它传递的同一连接上确认消息,但在驱动程序节点调用commit函数。 我怎样才能在worker / executor节点上提交?

>我将返回一个工厂列表,并希望列表中的每个实例都将用于创建一个读者,这也将是一个消费者。 这种方法是否正确? 源[socket] [1]源实现有一个线程将消息推送到内部ListBuffer。 换句话说,有一个消费者(线程)填充内部ListBuffer,然后**由`planInputPartitions`(`createDataReaderFactories`得到[重命名] [2]到`planInputPartitions`)划分成分区。 另外,根据[MicroBatchReadSupport]的Javadoc [3]>执行引擎将在流式查询开始时创建一个微批量阅读器,为每个要处理的批处理调用setOffsetRange和createDataReaderFactories,然后调用stop()执行完成时 请注意,由于重新启动或故障恢复,单个查询可能会有多次执行。 换句话说,`createDataReaderFactories`应该被**多次**调用,据我所知,每个`DataReader`负责一个静态输入分区,这意味着DataReader不应该是一个消费者。 ———->但是,commit方法是MicroBatchReader的一部分,而不是DataReader ……如果是这样,那么commit函数的目的是什么呢? 也许提交函数的部分原理是防止MicroBatchReader的内部缓冲区变大。 通过提交Offset,您可以有效地从缓冲区中删除小于Offset的元素,因为您承诺不再处理它们。 您可以使用`batches.trimStart(offsetDiff)`在套接字源代码中看到这种情况


我不确定如何实现一个可靠的接收器,所以我希望有一个更有经验的Spark人来找我并且抓住你的问题,因为我也很感兴趣! 希望这可以帮助!

编辑

我只研究了socket和wiki编辑源代码。 这些来源不是生产准备好的,这是问题所不能解决的问题。 相反, kafka来源是一个更好的起点,与上述来源不同,像作者一样寻找多个消费者。

但是,也许如果您正在寻找不可靠的来源,上面的socket和wikiedit源提供了一个不那么复杂的解决方案。