Tag: amazon kinesis

Kinesis:关闭工人的最佳/安全方法是什么?

我正在使用AWS Kinesis客户端库 。 我需要一种方法在部署期间关闭Kinesis Worker线程,这样我就停在检查点而不是在processRecords()的中间。 我在Worker.java看到了一个关闭boolean,但它是私有的。 我需要的原因是检查点和幂等性对我来说至关重要,我不想在批处理中间杀死进程。 [编辑] 感谢@CaptainMurphy,我注意到Worker.java公开了shutdown()方法,它安全地关闭了worker和LeaseCoordinator 。 它没有做的是在IRecordProcessor调用shutdown()任务。 它突然终止了IRecordProcessor而不用担心状态。 我确实理解KCL不保证检查点之间的幂等性,并且开发人员应该使设计容错,但我觉得在LeaseCoordinator停止之前, IRecordProcessor应该正确关闭, LeaseCoordinator 。

AWS Lambda性能问题

我使用与aws lambda(java)集成的aws api网关,但我在这种方法中看到了一些严重的问题。 删除服务器并让您的应用程序开箱即用的概念非常好,但这是我面临的问题。 我的lambda做了两件简单的事情 – validation从客户端收到的有效负载,然后将其发送到kinesis流,以便从另一个lambda进行进一步处理(你会问为什么我不直接发送到流,只使用1个lambda我只想说我想分离逻辑并有一层抽象,并且能够告诉客户他正在发送无效数据。) 在lambda的实施中,我集成了弹簧DI。 到现在为止还挺好。 我开始进行性能测试。 我模拟了50个并发用户,每个请求发出4个请求,请求之间有5秒。 所以发生了什么 – 在lambda的冷启动中,我初始化了spring的应用程序上下文,但似乎在lambda未启动时有如此多的同时请求正在做一些奇怪的事情。 这是上下文初始化时间的屏幕截图。 我们从截图中看到的是,初始化上下文的时间有很大差异。 我对所发生的事情的假设是,当接收到如此多的请求且没有“活动”lambda时,它会为每个请求初始化一个lambda容器,同时它“阻塞”其中一些(具有大量时间的那些) 18s)直到其他已经开始准备好了。 所以也许它可以同时启动容器的内部限制。 问题是如果你没有平均分配的流量,这将不时发生,一些请求将会超时。 我们不希望这种情况发生。 接下来的事情就是在没有弹簧容器的情况下进行一些测试,因为我的想法是“好的,初始化很重,让我们只做普通的旧java对象初始化”。 不幸的是,同样的事情发生了(可能只是减少了一些请求的3s容器初始化)。 以下是测试数据的更详细截图: 所以我记录了整个lambda执行时间(从构造到结束),kinesis客户端初始化以及实际将数据发送到流,因为这些是lambda中最重的操作。 我们仍然有18岁或者其他什么时候,但有趣的是,时代在某种程度上是成比例的。 因此,如果整个lambda需要18秒,大约7-8s是客户端初始化,6-7用于将数据发送到流,而剩下4-5秒用于lambda中的其他操作,目前只是validation。 另一方面,如果我们采用其中一个小时间(这意味着它重用已经开始的lambda),即820ms,则kinesis客户端初始化需要100ms,数据发送需要340ms,validation需要400ms。 所以这再次推动了我内心因为一些限制而睡觉的想法。 下一个屏幕截图显示了当lamda已经启动时下一轮请求发生的情况: 所以我们没有这么大的时间,是的,我们在一些请求中仍然有一些相对较大的增量(对我而言也很奇怪),但事情看起来好多了。 所以我正在寻找一个实际上知道幕后发生了什么的人的澄清,因为对于使用云的严肃应用程序来说这不是一个好的行为,因为它具有“无限”的可能性。 另一个问题与区域中帐户内所有lambda中lambda-200并发调用的另一个限制有关。 对我而言,对于拥有大量流量的大型应用程序来说,这也是一个很大的限制。 因此,我的商业案例(我不知道将来)或多或少是火,忘了请求。 我开始考虑改变网关直接将数据发送到流的方式的逻辑,另一个lambda负责validation和进一步处理。 是的,我正在失去当前的抽象(目前我不需要),但我多次增加应用程序的可用性。 你怎么看?

如何将数据从服务器放到Kinesis Stream

我是Kinesis的新手。 读出我发现的文档,我可以创建Kinesis Stream来从Producer获取数据。 然后使用KCL将从Stream读取此数据以进一步处理。 我理解如何通过实现IRecordProcessor来编写KCL应用程序。 然而,关于如何将数据放在Kinesis流上的第一阶段仍然不清楚。 我们是否有一些确实需要实现的AWS API。 场景:我有一台服务器,可以从文件夹中的各种来源连续获取数据。 每个文件夹都包含文本文件,其行包含用于分析工作的必需属性。 我必须将所有这些数据推送到Kinesis Stream。 我需要一些代码,如下面的类putData方法将用于Kinesis流中 public class Put { AmazonKinesisClient kinesisClient; Put() { String accessKey = “My Access Key here” ; String secretKey = “My Secret Key here” ; AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); kinesisClient = new AmazonKinesisClient(credentials); kinesisClient.setEndpoint(“kinesis.us-east-1.amazonaws.com”, “kinesis”, “us-east-1”); System.out.println(“starting the Put Application”); } […]

Apache Spark Kinesis示例不起作用

我正在尝试运行JavaKinesisWordCountASL示例。 该示例似乎连接到我的Kinesis Stream并从流中获取数据(如下面的日志所示)。 但是,Sparks不会调用示例中传递给unionStreams.flatMap方法的调用函数,也不会打印任何wordcount。 我尝试使用Java 8和Java 7运行。我在ubuntu实例上运行它。 同样的例子适用于我的macbook。 14/11/15 01:59:42 INFO scheduler.ReceiverTracker:Stream 1收到0个块14/11/15 01:59:42 INFO storage.MemoryStore:ensureFreeSpace(264)调用curMem = 3512,maxMem = 938244833 14 / 11/15 01:59:42 INFO storage.MemoryStore:阻止输入-0-1416016781800存储为内存中的值(估计大小264.0 B,免费894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerInfo:已添加ip-10-80-91-13.ec2.internal内存输入0-1416016781800:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerMaster:更新块输入信息-0-1416016781800 14/11/15 01:59:42 INFO scheduler.JobScheduler:已添加作业时间1416016782000 ms 14/11/15 01:59:42 INFO network.SendingConnection:启动与[ip-的连接10-80-91-13.ec2.internal / 10.80.91.13:39149] 14/11/15 01:59:42 INFO network.SendingConnection:已连接到[ip-10-80-91-13.ec2.internal / 10.80.91.13:39149],1条消息待定14/11/15 01:59:42 INFO […]