Friday, 5 September 2025

Introduction to Amazon Kinesis Data Streams

 


Amazon Kinesis Data Streams is one of 4 Amazon Kinesis services. It helps to easily stream data at any scale.

  • There are no servers to manage. The on-demand mode eliminates the need to provision or manage capacity required for running applications.
  • Automatic provisioning and scaling with the on-demand mode.
  • Pay only for what you use 
  • built-in integrations with other AWS services to create analytics, serverless, and application integration solutions
  • To ingest and collect terabytes of data per day from application and service logs, clickstream data, sensor data, and in-app user events to power live dashboards, generate metrics, and deliver data into data lakes
  • To build applications for high-frequency event data such as clickstream data, and gain access to insights in seconds, not days, using AWS Lambda or Amazon Managed Service for Apache Flink
  • To pair with AWS Lambda to respond to or adjust immediate occurrences within the event-driven applications in your environment, at any scale.

The producers continually push data to Kinesis Data Streams, and the consumers process the data in real time. Producer can be e.g. CloudWatch Log and consumer can be e.g. Lambda function.


Data Stream << Shards << Data Records


Kinesis data stream is a set of shards. Each shard has a sequence of data records.

A data record is the unit of data stored in a Kinesis data stream. Data records are composed of:
  • sequence number
    • within the shard
    • assigned by Kinesis Data Streams
  • partition key
  • data blob, which is an immutable sequence of bytes
    • Kinesis Data Streams does not inspect, interpret, or change the data in the blob in any way
    • A data blob can be up to 1 MB.

How to find all shards in a stream?

% aws kinesis list-shards \
--stream-name my-stream \
--region us-east-2 \
--profile my-profile
{
    "Shards": [
        {
            "ShardId": "shardId-000000000000",
            "HashKeyRange": {
                "StartingHashKey": "0",
                "EndingHashKey": "440282366920938463463374607431768211455"
            },
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49663754454378916691333541504734985347376184017408753666"
            }
        }
    ]
}


Note HashKeyRange - it is used by data producers to determine into which shard to write the next document.

Shard Iterator


Shard iterators are a fundamental tool for consumers to process data in the correct order and without missing records. They provide fine control over the reading position, making it possible to build scalable, reliable stream processing workflows.

A shard iterator is a reference marker that specifies the exact position within a shard from which data reading should begin and continue sequentially. It enables consumers to access, read, and process records from a particular location within a data stream shard.

Different types of shard iterators control where reading starts:
  • AT_SEQUENCE_NUMBER: Reads from the specific sequence number.
  • AFTER_SEQUENCE_NUMBER: Reads just after the specified sequence number.
  • TRIM_HORIZON: Starts from the oldest available data in the shard.
  • LATEST: Starts from the newest record (most recently added record, records added after the iterator is created)
  • AT_TIMESTAMP: Starts from a specific timestamp in the shard
When reading repeatedly from a stream, the shard iterator is updated after each GetRecords request (with the NextShardIterator returned by the API).

This mechanism allows applications to seamlessly resume reading from the correct point in the stream.

Proper management of shard iterators is essential to avoid data loss or duplicate reads, especially as data retention policies and processing speeds can affect the availability of data in the stream


How to get an iterator in a shard?


A shard iterator is obtained using the GetShardIterator API, which marks the spot in the shard to start reading records.

The command aws kinesis get-shard-iterator is used to obtain a pointer (iterator) to a specific position in a shard of a Kinesis stream. We need this iterator to actually read records from the stream using aws kinesis get-records.


% ITERATOR=$(aws kinesis get-shard-iterator \
    --stream-name my-stream \
    --shard-id shardId-000000000000 \
    --shard-iterator-type TRIM_HORIZON \
    --query 'ShardIterator' \
    --output text \
    --region us-east-2 \
    --profile my-profile)

ShardIterator is a token we pass to aws kinesis get-records to actually fetch the data.

% echo $ITERATOR
AAAAAAAAAAHf65JkbV8ZJQ...Exsy8WerU5Z8LKI8wtXm95+blpXljd0UWgDs7Seo9QlpikJI/6U=


We can now read records directly from the stream:

% aws kinesis get-records \
--shard-iterator $ITERATOR \
--limit 50 \
--region us-east-2 \
--profile my-profile
{
    "Records": [
        {
            "SequenceNumber": "49666716197061357389751170868210642185623031110770360322",
            "ApproximateArrivalTimestamp": "2025-09-03T17:37:00.343000+01:00",
            "Data": "H4sIAAAAAAAA/+3YTW/TMBgH8K8S...KaevxIQAA",
            "PartitionKey": "54dc991cd70ae6242c35f01972968478"
        },
        {
            "SequenceNumber": "49666716197061357389751170868211851111442645739945066498",
            "ApproximateArrivalTimestamp": "2025-09-03T17:37:00.343000+01:00",
            "Data": "H4sIAAAAAAAA/+3YS2vcM...889uPBV8BVXceAAA=",
            "PartitionKey": "e4e9ad254c154281a67d05a33fa0ea31"
        },
        ...
   ]
}

Data field in each record is Base64-encoded. That’s because Kinesis doesn’t know what your producers are sending (could be JSON, gzipped text, protobuf, etc.), so it just delivers raw bytes.

Why each record read from the shard has a different Partition Key?


Each record in a Kinesis stream can have a different PartitionKey because the PartitionKey is chosen by the data producer for each record and is not tied directly to the way records are read from the stream or which iterator type is used. Even when reading from a single shard using TRIM_HORIZON, records within that shard may have different PartitionKeys because the PartitionKey is used to route records to shards at the time of writing—not to group records within a shard during reading.

How Partition Keys and Shards Work


The PartitionKey determines to which shard a record is sent by hashing the key and mapping it to a shard's hash key range. Multiple records with different PartitionKeys can end up in the same shard, especially if the total number of unique partition keys is greater than the number of shards, or if the hash function maps them together. When reading from a shard (with any iterator, including TRIM_HORIZON), the records are read in order of arrival, but each can have any PartitionKey defined at ingest time.

Reading and PartitionKeys


Using TRIM_HORIZON just means starting at the oldest record available in the shard. It does not guarantee all records have the same PartitionKey, only that they are the oldest records remaining for that shard. Records from a single shard will often have various PartitionKeys, all mixed together as per their original ingest. Therefore, it is normal and expected to see a variety of PartitionKeys when reading a batch of records from the same shard with TRIM_HORIZON


Writing and Reading

Data records written by the same producer can end up in different shards if the producer chooses different PartitionKeys for those records. The distribution of records across shards is determined by the hash of the PartitionKey assigned to each record, not by the producer or the order of writing.

Distribution Across Shards

If a producer sends data with varying PartitionKeys, Kinesis uses those keys' hashes to assign records to shards; thus, even the same producer's records can be spread across multiple shards.
If the producer uses the same PartitionKey for all records, then all its records will go to the same shard, preserving strict ordering for that key within the shard.

Reading N Records: Producer and Sequence

When reading N records from a shard iterator, those records are the next available records in that shard, in the order they arrived in that shard. These records will not necessarily all be from the same producer, nor are they guaranteed to be N consecutive records produced by any single producer.

Records from different producers, as well as from the same producer if it used multiple PartitionKeys, can appear in any sequence within a shard, depending on how PartitionKeys are mapped at write time.
In summary, unless a producer always uses the same PartitionKey, its records may spread across shards, and any batch read from a shard iterator will simply reflect the ordering of records within that shard, including records from multiple producers and PartitionKeys.


How to get records content in a human-readable format?

We need to extract Data field from a record, Base64 decode it and then process the data further. In our example the payload was gzip-compressed JSON (as that's what CloudWatch Logs → Kinesis subscription delivers) so we need to decompress it and parse it as JSON:


% aws kinesis get-records \
--shard-iterator $ITERATOR \
--limit 1 \
--query 'Records[0].Data' \
--output text \
--region us-east-2 \
--profile my-profile \
| base64 --decode \
| gzip -d \
| jq .

{
  "messageType": "DATA_MESSAGE",
  "owner": "123456789999",
  "logGroup": "/aws/lambda/my-lambda",
  "logStream": "2025/09/03/[$LATEST]202865ca545544deb61360d571180d45",
  "subscriptionFilters": [
    "my-lambda-MainSubscriptionFilter-r1YlqNvCVDqk"
  ],
  "logEvents": [
    {
      "id": "39180581104302115620949753249891540826922898325656305664",
      "timestamp": 1756918020250,
      "message": "{\"log.level\":\"info\",\"@timestamp\":\"2025-09-03T16:47:00.250Z\",\"log.origin\":{\"function\":\"github.com/elastic/apm-aws-lambda/app.(*App).Run\",\"file.name\":\"app/run.go\",\"file.line\":98},\"message\":\"Exiting due to shutdown event with reason spindown\",\"ecs.version\":\"1.6.0\"}\n"
    },
    {
      "id": "39180581104302115620949753249891540826922898325656305665",
      "timestamp": 1756918020250,
      "message": "{\"log.level\":\"warn\",\"@timestamp\":\"2025-09-03T16:47:00.250Z\",\"log.origin\":{\"function\":\"github.com/elastic/apm-aws-lambda/apmproxy.(*Client).forwardLambdaData\",\"file.name\":\"apmproxy/apmserver.go\",\"file.line\":357},\"message\":\"Dropping lambda data due to error: metadata is not yet available\",\"ecs.version\":\"1.6.0\"}\n"
    },
    {
      "id": "39180581104302115620949753249891540826922898325656305666",
      "timestamp": 1756918020250,
      "message": "{\"log.level\":\"warn\",\"@timestamp\":\"2025-09-03T16:47:00.250Z\",\"log.origin\":{\"function\":\"github.com/elastic/apm-aws-lambda/apmproxy.(*Client).forwardLambdaData\",\"file.name\":\"apmproxy/apmserver.go\",\"file.line\":357},\"message\":\"Dropping lambda data due to error: metadata is not yet available\",\"ecs.version\":\"1.6.0\"}\n"
    }
  ]
}



Amazon Kinesis Data Streams Terminology and concepts - Amazon Kinesis Data Streams

Introduction to Amazon Kinesis

 


Amazon Kinesis is a Serverless Streaming Data Service which has 4 Service types:

  • Amazon Kinesis Video Streams - to securely stream video from connected devices to AWS for analytics, machine learning (ML), playback, and other processing
  • Amazon Kinesis Data Streams - to easily stream data at any scale
  • Amazon Data Firehose - to reliably loads real-time streams into data lakes, warehouses, and analytics services
  • Amazon Managed Service for Apache Flink - to transform and analyze streaming data in real time