aws_dynamodb_cdc

Stream item-level changes from DynamoDB tables using DynamoDB Streams. This input automatically manages shards, checkpoints progress for recovery, and processes multiple shards concurrently.

Introduced in version 4.79.0.

Use this reference to:

  • Look up configuration options for DynamoDB CDC streaming

  • Find metadata fields available for message processing

  • Identify checkpointing and performance tuning settings

  • Common

  • Advanced

inputs:
  label: ""
  aws_dynamodb_cdc:
    tables: []
    checkpoint_table: redpanda_dynamodb_checkpoints
    start_from: trim_horizon
    snapshot_mode: none
inputs:
  label: ""
  aws_dynamodb_cdc:
    tables: []
    table_discovery_mode: single
    table_tag_filter: ""
    table_discovery_interval: 5m
    checkpoint_table: redpanda_dynamodb_checkpoints
    batch_size: 1000
    poll_interval: 1s
    start_from: trim_horizon
    checkpoint_limit: 1000
    max_tracked_shards: 10000
    throttle_backoff: 100ms
    snapshot_mode: none
    snapshot_segments: 1
    snapshot_batch_size: 100
    snapshot_throttle: 100ms
    snapshot_deduplicate: true
    snapshot_buffer_size: 100000
    region: "" # No default (optional)
    endpoint: "" # No default (optional)
    tcp:
      connect_timeout: 0s
      keep_alive:
        idle: 15s
        interval: 15s
        count: 9
      tcp_user_timeout: 0s
    credentials:
      profile: "" # No default (optional)
      id: "" # No default (optional)
      secret: "" # No default (optional)
      token: "" # No default (optional)
      from_ec2_role: "" # No default (optional)
      role: "" # No default (optional)
      role_external_id: "" # No default (optional)

Prerequisites

The source DynamoDB table must have DynamoDB Streams enabled. You can enable streams with one of these view types:

  • KEYS_ONLY: Only the key attributes of the modified item

  • NEW_IMAGE: The entire item as it appears after the modification

  • OLD_IMAGE: The entire item as it appeared before the modification

  • NEW_AND_OLD_IMAGES: Both the new and old item images

Checkpointing

Checkpoints are stored in a separate DynamoDB table (configured via checkpoint_table). This table is created automatically if it does not exist. On restart, the input resumes from the last checkpointed position for each shard.

Alternative components

For better performance and longer retention (up to 1 year vs 24 hours), consider using Kinesis Data Streams for DynamoDB with the aws_kinesis input instead.

Message structure

Each CDC event is delivered as a JSON message with the following structure. Use these fields in your Bloblang mappings with this.<field>:

{
  "eventID": "abc123-",                    (1)
  "eventName": "INSERT | MODIFY | REMOVE",   (2)
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-1",
  "tableName": "my-table",                   (3)
  "dynamodb": {
    "keys": {                                (4)
      "pk": "user#123",
      "sk": "profile"
    },
    "newImage": {                            (5)
      "pk": "user#123",
      "sk": "profile",
      "name": "Alice",
      "email": "alice@example.com"
    },
    "oldImage": {                            (6)
      "pk": "user#123",
      "sk": "profile",
      "name": "Alice Smith"
    },
    "sequenceNumber": "12345678901234567890", (7)
    "sizeBytes": 256,
    "streamViewType": "NEW_AND_OLD_IMAGES"
  }
}
1 Unique identifier for this change event.
2 Type of change: INSERT (new item), MODIFY (updated item), or REMOVE (deleted item).
3 Name of the source DynamoDB table.
4 Primary key attributes of the changed item. Always present.
5 Item state after the change. Present for INSERT and MODIFY events (requires NEW_IMAGE or NEW_AND_OLD_IMAGES stream view type).
6 Item state before the change. Present for MODIFY and REMOVE events (requires OLD_IMAGE or NEW_AND_OLD_IMAGES stream view type).
7 Position of this record in the shard, used for ordering and checkpointing.
DynamoDB attribute values are automatically unmarshalled from DynamoDB’s type format ({"S": "value"}) to plain values ("value").

Example mapping

pipeline:
  processors:
    - mapping: |
        root.event_type = this.eventName
        root.table = this.tableName
        root.keys = this.dynamodb.keys
        root.new_data = this.dynamodb.newImage
        root.old_data = this.dynamodb.oldImage

Metadata

This input adds the following metadata fields to each message:

  • dynamodb_shard_id: The shard ID from which the record was read

  • dynamodb_sequence_number: The sequence number of the record in the stream

  • dynamodb_event_name: The type of change: INSERT, MODIFY, or REMOVE

  • dynamodb_table: The name of the DynamoDB table

Metrics

This input emits the following metrics:

  • dynamodb_cdc_shards_tracked: Total number of shards being tracked (gauge)

  • dynamodb_cdc_shards_active: Number of shards currently being read from (gauge)

Fields

batch_size

Maximum number of records to read per shard in a single request. Valid range: 1-1000.

Type: int

Default: 1000

checkpoint_limit

Maximum number of unacknowledged messages before forcing a checkpoint update. Lower values provide better recovery guarantees but increase write overhead.

Type: int

Default: 1000

checkpoint_table

DynamoDB table name for storing checkpoints. Will be created if it doesn’t exist.

Type: string

Default: redpanda_dynamodb_checkpoints

credentials

Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services.

Type: object

credentials.from_ec2_role

Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.

Requires version 4.2.0 or later.

Type: bool

credentials.id

The ID of credentials to use.

Type: string

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string

credentials.role

A role ARN to assume.

Type: string

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string

credentials.secret

The secret for the credentials being used.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string

max_tracked_shards

Maximum number of shards to track simultaneously. Prevents memory issues with extremely large tables.

Type: int

Default: 10000

poll_interval

Time to wait between polling attempts when no records are available.

Type: string

Default: 1s

region

The AWS region to target.

Type: string

snapshot_batch_size

Records per scan request during snapshot. Maximum 1000. Lower values provide better backpressure control but require more API calls.

Type: int

Default: 100

snapshot_buffer_size

Maximum CDC events to buffer for deduplication (approximately 100 bytes per entry). If exceeded, deduplication is disabled and duplicates may be emitted.

Type: int

Default: 100000

snapshot_deduplicate

Deduplicate records that appear in both snapshot and CDC stream. Requires buffering CDC events during snapshot. If buffer is exceeded, deduplication is disabled to prevent data loss.

Type: bool

Default: true

snapshot_mode

none: Streams CDC events only (default). snapshot_only: Performs a one-time full table scan with no ongoing streaming. snapshot_and_cdc: Scans the entire table, then streams changes.

Type: string

Default: none

Options: none, snapshot_only, snapshot_and_cdc

snapshot_segments

Number of parallel scan segments (1-10). Higher parallelism scans faster but consumes more Read Capacity Units (RCUs). A lower value is safer to start with.

Type: int

Default: 1

snapshot_throttle

Minimum time between scan requests per segment. Use this to limit Read Capacity Unit (RCU) consumption during snapshot.

Type: string

Default: 100ms

start_from

Where to start reading when no checkpoint exists. trim_horizon starts from the oldest available record, latest starts from new records.

Type: string

Default: trim_horizon

Options: trim_horizon, latest

table_discovery_interval

Interval for rescanning and discovering new tables when using tag or includelist mode. Set to 0 to disable periodic rescanning.

Type: string

Default: 5m

table_discovery_mode

single: Streams from tables specified in the tables list. tag: Auto-discovers tables by tags (ignores the tables field). includelist: Streams from tables in the tables list. Use single instead; includelist is kept for backward compatibility.

Type: string

Default: single

Options: single, tag, includelist

table_tag_filter

Multi-tag filter in the format key1:v1,v2;key2:v3,v4. Matches tables where (key1=v1 OR key1=v2) AND (key2=v3 OR key2=v4). Required when table_discovery_mode is tag.

Type: string

Default: ""

tables[]

List of table names to stream from. For single table mode, provide one table. For multi-table mode, provide multiple tables.

Type: array

Default: []

tcp

TCP socket configuration.

Type: object

tcp.connect_timeout

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

Type: string

Default: 0s

tcp.keep_alive

TCP keep-alive probe configuration.

Type: object

tcp.keep_alive.count

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

Type: int

Default: 9

tcp.keep_alive.idle

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

Type: string

Default: 15s

tcp.keep_alive.interval

Duration between keep-alive probes. Zero defaults to 15s.

Type: string

Default: 15s

tcp.tcp_user_timeout

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.

Type: string

Default: 0s

throttle_backoff

Time to wait when applying backpressure due to too many in-flight messages.

Type: string

Default: 100ms

Examples

Consume CDC events

Read change events from a DynamoDB table with streams enabled.

input:
  aws_dynamodb_cdc:
    tables: [my-table]
    region: us-east-1

Start from latest

Only process new changes, ignoring existing stream data.

input:
  aws_dynamodb_cdc:
    tables: [orders]
    start_from: latest
    region: us-west-2

Snapshot and CDC

Scan all existing records, then stream ongoing changes.

input:
  aws_dynamodb_cdc:
    tables: [products]
    snapshot_mode: snapshot_and_cdc
    snapshot_segments: 5
    region: us-east-1

Auto-discover tables by tag

Automatically discover and stream from all tables with a specific tag.

input:
  aws_dynamodb_cdc:
    table_discovery_mode: tag
    table_tag_filter: "stream-enabled:true"
    table_discovery_interval: 5m
    region: us-east-1

Auto-discover tables by multiple tags

Discover tables matching multiple tag criteria with OR logic per key, AND logic across keys.

input:
  aws_dynamodb_cdc:
    table_discovery_mode: tag
    table_tag_filter: "environment:prod,staging;team:data,analytics"
    table_discovery_interval: 5m
    region: us-east-1
    # Matches tables with: (environment=prod OR environment=staging) AND (team=data OR team=analytics)

Stream from multiple specific tables

Stream from an explicit list of tables simultaneously.

input:
  aws_dynamodb_cdc:
    table_discovery_mode: includelist
    tables:
      - orders
      - customers
      - products
    region: us-west-2

Suggested reading

For common patterns including filtering events, routing to Kafka or S3, and detecting changed fields, see the DynamoDB CDC Patterns cookbook.