Apache Spark Kinesis示例不起作用

我正在尝试运行JavaKinesisWordCountASL示例。

该示例似乎连接到我的Kinesis Stream并从流中获取数据(如下面的日志所示)。 但是,Sparks不会调用示例中传递给unionStreams.flatMap方法的调用函数,也不会打印任何wordcount。

我尝试使用Java 8和Java 7运行。我在ubuntu实例上运行它。 同样的例子适用于我的macbook。

14/11/15 01:59:42 INFO scheduler.ReceiverTracker:Stream 1收到0个块14/11/15 01:59:42 INFO storage.MemoryStore:ensureFreeSpace(264)调用curMem = 3512,maxMem = 938244833 14 / 11/15 01:59:42 INFO storage.MemoryStore:阻止输入-0-1416016781800存储为内存中的值(估计大小264.0 B,免费894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerInfo:已添加ip-10-80-91-13.ec2.internal内存输入0-1416016781800:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerMaster:更新块输入信息-0-1416016781800 14/11/15 01:59:42 INFO scheduler.JobScheduler:已添加作业时间1416016782000 ms 14/11/15 01:59:42 INFO network.SendingConnection:启动与[ip-的连接10-80-91-13.ec2.internal / 10.80.91.13:39149] 14/11/15 01:59:42 INFO network.SendingConnection:已连接到[ip-10-80-91-13.ec2.internal / 10.80.91.13:39149],1条消息待定14/11/15 01:59:42 INFO network.ConnectionManager:接受来自[ip-10-80-91-的连接 13.ec2.internal / 10.80.91.13:56700] 14/11/15 01:59:42 WARN storage.BlockManager:该机器上已存在块输入0-1416016781800; 不重新添加它14/11/15 01:59:42 INFO receiver.BlockGenerator:推送块输入-0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore:ensureFreeSpace(256)调用curMem = 3776,maxMem = 938244833 14/11/15 01:59:43 INFO storage.MemoryStore:阻止输入-0-1416016782800存储为内存中的值(估计大小256.0 B,免费894.8 MB)14/11/15 01:59: 43 INFO storage.BlockManagerInfo:在内存中添加输入0-1416016782800在ip-10-80-91-13.ec2.internal:39149(大小:256.0 B,免费:894.8 MB)14/11/15 01:59: 43 INFO storage.BlockManagerMaster:块输入的更新信息-0-1416016782800 14/11/15 01:59:43 WARN storage.BlockManager:块输入-0-1416016782800已存在于此机器上; 不重新添加它14/11/15 01:59:43 INFO receiver.BlockGenerator:推送块输入-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker:流0收到2块14/11 / 15 01:59:44 INFO scheduler.ReceiverTracker:Stream 1收到0块14/11/15 01:59:44 INFO scheduler.JobScheduler:已添加时间作业1416016784000 ms 14/11/15 01:59:46 INFO scheduler .ReceiverTracker:流0收到0块14/11/15 01:59:46 INFO scheduler.ReceiverTracker:流1收到0块14/11/15 01:59:46 INFO scheduler.JobScheduler:已添加时间作业1416016786000 ms 14 / 11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布了17个基准。 14/11/15 01:59:46 INFO storage.MemoryStore:ensureFreeSpace(248)调用curMem = 4032,maxMem = 938244833 14/11/15 01:59:46 INFO storage.MemoryStore:阻止输入1-1416016786000存储作为内存中的值(估计大小248.0 B,免费894.8 MB)14/11/15 01:59:46 INFO storage.BlockManagerInfo:在ip-10-80-91-13.ec2的内存中添加了输入-1-1416016786000。内部:39149(大小:248.0 B,免费:894.8 MB)14/11/15 01:59:46 INFO storage.BlockManagerMaster:更新的块输入信息-1-1416016786000 14/11/15 01:59:46 WARN存储.BlockManager:该机器上已存在块输入-1-1416016786000; 不重新添加它14/11/15 01:59:46 INFO receiver.BlockGenerator:推送块输入-1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable:已成功发布14个基准。 14/11/15 01:59:48 INFO scheduler.ReceiverTracker:流0收到0块14/11/15 01:59:48 INFO storage.MemoryStore:ensureFreeSpace(264)调用curMem = 4280,maxMem = 938244833 14 / 11/15 01:59:48 INFO scheduler.ReceiverTracker:Stream 1收到1个块14/11/15 01:59:48 INFO storage.MemoryStore:块输入-0-1416016787800存储为内存中的值(估计大小为264.0 B,免费894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerInfo:在内存中添加输入-0-1416016787800在ip-10-80-91-13.ec2.internal:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerMaster:更新了块输入信息-0-1416016787800 14/11/15 01:59:48 INFO scheduler.JobScheduler:已添加时间1416016788000 ms 14 / 11/15 01:59:48 WARN storage.BlockManager:该机器上已存在块输入0-1416016787800; 没有重新添加它14/11/15 01:59:48 INFO receiver.BlockGenerator:推送块输入-0-1416016787800 14/11/15 01:59:50 INFO scheduler.ReceiverTracker:流0收到1块14/11 / 15 01:59:50 INFO scheduler.ReceiverTracker:Stream 1收到0块14/11/15 01:59:50 INFO scheduler.JobScheduler:已添加时间作业1416016790000 ms 14/11/15 01:59:51 INFO storage .MemoryStore:ensureFreeSpace(264)调用curMem = 4544,maxMem = 938244833 14/11/15 01:59:51 INFO storage.MemoryStore:阻止输入-0-1416016790800存储为内存中的值(估计大小264.0 B,免费894.8 MB)14/11/15 01:59:51 INFO storage.BlockManagerInfo:在内存中添加输入0-1416016790800在ip-10-80-91-13.ec2.internal:39149(大小:264.0 B,免费:894.8) MB)14/11/15 01:59:51 INFO storage.BlockManagerMaster:更新了块输入的信息-0-1416016790800 14/11/15 01:59:51 WARN storage.BlockManager:块输入-0-1416016790800已经存在这台机器; 不重新添加它14/11/15 01:59:51 INFO receiver.BlockGenerator:推送块输入-0-1416016790800

这可能与您获得的工作线程数有关。 当我使用–master local [2]运行应用程序时,我遇到了同样的问题。 我花了很多时间寻找答案而一无所获。 出于好奇,我改为–master local [4]并且它奏效了。 我不知道根本原因。 也许更熟悉Spark的人可以启发我们。

注意:就我而言,我的Kinesis流有两个分片。 因此,应用程序创建了两个输入流,每个输入流一个。

感谢来自@ user3594557的提示。

来自https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#input-dstreams的两个重要说明

如果分配给应用程序的核心数小于或等于输入DStreams /接收器的数量,则系统将接收数据,但无法处理它们。

在本地运行时,如果您将主URL设置为“本地”,则只有一个核心可以运行任务。 对于甚至有一个输入DStream(文件流都没问题)的程序来说,这是不够的,因为接收器将占用该核心,并且没有剩余的核心来处理数据。