Triggering multiple workflows with DynamoDB Streams
1. The scenario
I love event-driven architectures. They have a natural flow, require minimal code, and are well-suited for microservices.
A common use case I have encountered is that after a write is committed to the database, other services in the application need to trigger their business logic based on the new data.
Developers often write code to make this work. A typical solution looks like this (pseudo-code):
async function writeData(data) {
// 1. Write data and wait for the success response
const databaseResponse = await db.write(data);
// 2. Publish a message to SNS
await snsClient.send(new PublishCommand({
Message: data, TopicArn: TOPIC_ARN,
}));
}
First, the function persists the data to the database and waits for a success response. Then, it publishes a message to a service like SNS, where multiple subscribers can process the information.
Alternatively, the function might try to handle everything:
async function fanout(data) {
// Send a message to an SQS queue
await sqsClient.send(new SendMessageCommand({
QueueUrl: QUEUE1_URL,
MessageBody: JSON.stringify(data),
}));
// Send a message to another SQS queue
await sqsClient.send(new SendMessageCommand({
QueueUrl: QUEUE2_URL,
MessageBody: JSON.stringify(data),
}));
// Start a Step Functions state machine execution
await sqsClient.send(new SendMessageCommand({
QueueUrl: QUEUE3_URL,
MessageBody: JSON.stringify(data),
}));
// etc.
}
I would avoid code like this. Whether these calls run in parallel or sequentially, you will need to implement robust error handling and retry mechanisms, and the code can quickly become complex, requiring significant debugging time.
Sometimes, writing such code is unavoidable. However, some AWS database services offer excellent support for event-driven solutions. With DynamoDB, you can capture data changes by enabling streams.
DynamoDB offers two streaming models: DynamoDB Streams and Kinesis Data Streams for DynamoDB.
This article focuses on DynamoDB Streams and how to implement streamlined event-driven architectures by capturing data changes.
2. DynamoDB Streams
Wouldn’t it be great if the writeData
function’s only responsibility were to save new or updated items to DynamoDB, and other services in the application could receive notifications about new data and react independently?
I’ll answer my own question: Yes, it would be great.
DynamoDB Streams is designed for this purpose. Here are some key features:
- Stores records for up to 24 hours.
- Records appear exactly once and leave the stream in the same order they entered.
- Lambda reads from the stream are free (this alone makes DynamoDB Streams appealing).
- Supports up to two concurrent consumers per shard for single-region (not global) tables via the DynamoDB Streams
GetRecords
API. Reads from additional consumers may be throttled.
This sounds promising, except for the last point. Only two consumers? That limits us to basic fanout-like solutions.
Despite the API read restriction, we can still use streams to trigger multiple downstream services and design fanout architectures.
Let’s explore some solutions.
3. Using Lambda functions
I use Lambda functions whenever possible, so I’m excited that pairing DynamoDB Streams with Lambda presents many opportunities.
3.1. Lambda as a consumer
When Lambda is added as a stream consumer, the Lambda service handles everything through event-source mapping settings. You can configure options like batch size or an on-failure destination. Lambda uses a polling invocation model, querying the stream four times per second.
You can also configure up to five filter criteria per Lambda function to match specific item attributes. Lambda only invokes the function when a matching record appears in the stream, eliminating the need for filtering logic in your code. This can save significant costs by avoiding invocations for irrelevant events.
3.2. Fanout with Lambda
How can we run multiple business logic processes on database change records?
You can configure multiple Lambda functions (more than two) to be triggered by the same DynamoDB stream. Each function receives and processes the same records.
Lambda functions don’t count as separate consumers, as the Lambda service acts as the consumer. When a new record is available (which is almost always, given the use cases of streams), Lambda invokes the functions simultaneously, scaling their concurrency as needed (and configured).
DynamoDB automatically adjusts the number of shards, and you have no control over this.
Fanout with Lambda functions? Absolutely!
4. Other fanout options
Not every use case suits Lambda functions, and not everyone prefers Lambda. How can we trigger multiple workflows that don’t rely on Lambda?
4.1. Intermediary service
The two-concurrent-consumers-per-shard limit can be a bottleneck. A solution can be an intermediary service that receives records from the stream and forwards them to another service responsible for triggering interested parties.
4.2. Fanout with EventBridge Pipes
A great candidate for an intermediary service is EventBridge Pipes.
EventBridge Pipes connects a source to a target.
DynamoDB Streams is a supported source for a pipe, with direct integration, so no external tools (like a Lambda function) are needed. EventBridge polls the stream shards at four requests per second.
For a fanout architecture, you can choose an SNS topic or an EventBridge event bus as the target.
SNS supports a wide range of subscribers, including SQS queues, Lambda functions, or email addresses.
EventBridge buses support up to five targets per rule, offering a smaller fanout than SNS.
A Step Functions state machine is also a valid target for EventBridge Pipes. You can create fanout-like solutions using the Parallel
state, where different workflows run independently.
This is an interesting solution worth exploring.
4.3. Fanout with a Lambda function
Lambda functions can also serve as intermediary services. A router function can act as the stream consumer, forwarding events to services like SNS or EventBridge after receiving a record.
Advantages of using Lambda as an intermediary include simplicity and the control provided by custom code. It may also be more cost-effective than EventBridge Pipes.
4.4. If none of these work
If none of these solutions fit your use case, I’m sorry to hear that!
I believe a small Lambda intermediary function should suit most architectures. If not, you can retrieve stream records directly using the GetRecords
API and deploy code to poll the stream on any AWS compute service. Be mindful of the two-concurrent-consumers-per-shard rule when implementing custom solutions.
5. Summary
You can react to database changes in an event-driven way using DynamoDB Streams.
The simplest approach is to use streams with Lambda functions. If Lambda isn’t suitable, EventBridge Pipes or a Lambda intermediary function can distribute data change events to an SNS topic or EventBridge event bus, where you can add multiple subscribers or targets.
6. Further Reading
Change data capture with Amazon DynamoDB - DynamoDB stream options
DynamoDB Streams limits - Throttling considerations
Using event filtering with a DynamoDB event source - Event filtering options in Lambda