我有两个 Lambda 函数,一个 EventProducer 和一个 EventConsumer。所需的场景如下:EventProducer 在自定义 AWS EventBridge 总线中添加一个事件,而 EventConsumer 读取该事件。
我想使用 Lambda 目标来实现这一点,但 EventBridge 似乎没有工作。我已经设法通过显式调用“AmazonEventBridge::putEvent”来推送我的事件来使消费者可以使用事件,但我没有通过返回输出和发送输出来做到这一点。
如果我使用 Lambda 或 SQS 作为目标,而不是 EventBridge,则该代码有效。当我使用 AWS CLI 发送消息时,消费者也会读取它们。
有没有人有一个使用 Lambda 目标从 Lambda 函数推送 EventBridge 中的事件的工作示例?
我的代码如下:
Handler code:
@Override
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
DataciteDoiRequest sentDirectly = newDataciteDoiRequest();
logger.info(lOG_HANDLER_HAS_RUN);
putEventDirectlyToEventBridge(sentDirectly);
DataciteDoiRequest sentThroughLambdaDestination =
sentDirectly.copy().withPublicationId(URI.create("https://localhost/fromOutputStream")).build();
writeOutput(sentThroughLambdaDestination, output);
}
private <I> void writeOutput(I event, OutputStream outputStream)
throws IOException {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream))) {
String responseJson = Optional.ofNullable(objectMapper.writeValueAsString(event))
.map(StringUtils::replaceWhiteSpacesWithSpace)
.map(StringUtils::removeMultipleWhiteSpaces)
.orElseThrow();
logger.info(responseJson);
writer.write(responseJson);
}
}
private void putEventDirectlyToEventBridge(DataciteDoiRequest dataciteDoiRequest) {
PutEventsRequestEntry putEventsRequestEntry = new PutEventsRequestEntry()
.withDetail(dataciteDoiRequest.toString())
.withEventBusName(environment.readEnv(EVENT_BUS_ENV_VAR))
.withSource(SOURCE)
.withDetailType(dataciteDoiRequest.getType());
PutEventsRequest putEventsRequest = new PutEventsRequest().withEntries(putEventsRequestEntry);
eventBridgeClient.putEvents(putEventsRequest);
}
CloudFormation template:
EventConsumer:
Type: AWS::Serverless::Function
Properties:
CodeUri: dynamo-event-to-datacite-request
Handler: handlers.EventConsumer::handleRequest
Runtime: java11
MemorySize: 1400
Role: !GetAtt LambdaRole.Arn
Environment:
Variables:
EVENT_BUS: !GetAtt EventBus.Name
AWC_ACCOUNT_ID: !Ref AWS::AccountId
Events:
EventBridgeEvent:
Type: EventBridgeRule
Properties:
EventBusName: !GetAtt EventBus.Name
Pattern: { "detail": { "type": [ "MyType" ] } }
EventProducer:
DependsOn:
- EventBus
- FailQueue
- EventConsumer
Type: AWS::Serverless::Function
Properties:
CodeUri: dynamo-event-to-datacite-request
Handler: handlers.EventProducer::handleRequest
Runtime: java11
MemorySize: 1400
Role: !GetAtt LambdaRole.Arn
EventInvokeConfig:
DestinationConfig:
OnSuccess:
Type: EventBridge
Destination: !GetAtt EventBus.Arn
OnFailure:
Type: SQS
Destination: !GetAtt FailQueue.Arn
Environment:
Variables:
EVENT_BUS: !GetAtt EventBus.Name
EventBus:
Type: AWS::Events::EventBus
Properties:
Name: orestis-event-bus
FailQueue:
Type: AWS::SQS::Queue
Properties:
MaximumMessageSize: 262144
QueueName: orestis-failure-queue