在JAVA中使用Lambda的AWS DynamoDB触发器

我试图在dynamodb流事件上触发用Java编写的AWS lambda函数。 亚马逊有一个相同的指南,在这里使用NodeJS http://docs.aws.amazon.com/lambda/latest/dg/wt-ddb-create-test-function.html

NodeJS的测试输入(来自上面的链接)看起来像一个SNS事件,所以我尝试在Java中使用相应的SNSEvent类作为我的处理程序方法的输入。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.events.SNSEvent; import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; import java.util.List; public class RecomFunction { public void handler(SNSEvent event, Context context) { LambdaLogger logger = context.getLogger(); List records = event.getRecords(); if (records != null) { for (SNSRecord record : records) { if (record != null) { logger.log("SNS record: " + record.getSNS().getMessage()); } } } } } 

不幸的是,record.getSNS()返回NULL导致NullPointerexception

有一个相关的问题,但没有给出具体的答案: 使用Lambda设置DynamoDB触发器

这段代码对我有用。 您可以使用它在Lambda函数中接收和处理DynamoDB事件 –

 public class Handler implements RequestHandler { @Override public Void handleRequest(DynamodbEvent dynamodbEvent, Context context) { for (DynamodbStreamRecord record : dynamodbEvent.getRecords()) { if (record == null) { continue; } // Your code here } return null; } } 

同样,您可以使用SNSEventSNSRecord来处理Amazon SNS事件。

这对我来说很有用 – 例如DynamoDB流事件:

 import com.amazonaws.services.lambda.runtime.RequestHandler; ... public class DynamoStreamHandler implements RequestHandler { @Override public Void handleRequest(Object o, Context context) { LinkedHashMap lhm = (LinkedHashMap) o; ...etc. } } 

它们似乎使用了一个定制的JSON映射器,它使用了MapList对象。 通过测试和打印日志来validation其他事件类型是非常简单的(但很乏味)。 (叹)

编辑:如果~5 MB开销没问题,您可以使用aws-lambda-java-events library v1.1.0提供的DynamodbEvent.DynamodbStreamRecord ,如AWS Lambda演练3: AWS Lambda文档中的处理Amazon DynamoDB事件(Java)中所述。

创建一个处理InputStream的处理程序,读入InputStream的内容(只是JSON),然后反序列化它以获取所需的数据。

 import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.amazonaws.services.lambda.runtime.Context; public class MyHandler { public void handler(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); int letter; while((letter = inputStream.read()) != -1) { baos.write(letter); } //Send the contents of baos to a JSON deserializer ... } } 

这有点麻烦,但据我所知,AWS目前没有提供更高级别的Java lambda接口来使用DynamoDB Streams。 我在这里有一个完整的例子,详细介绍了如何对JSON流进行反序列化以获取数据的Java对象。

您的代码在CloudWatch日志中导致以下exception,

类没有实现适当的处理程序接口 ….

一旦我将代码更改为以下内容,我就可以获得SNS消息了。

 public class RecomFunction implements RequestHandler { public Void handleRequest(SNSEvent event, Context context) { ... return null; } } 

要从事件处理程序中获取反序列化的对象:1)在处理程序中使用以下内容从输入流中获取json:

 private String getJsonFrom(InputStream stream) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); int letter; while ((letter = stream.read()) != -1) baos.write(letter); return new String(baos.toByteArray()); } 

2)然后创建一个特定的反序列化器从json派生对象。 在DynamoDB事件的情况下’NewImage’。 这里有一个例子。