Amazon Kinesis Data Streams is one of 4 Amazon Kinesis services. It helps to easily stream data at any scale.
- There are no servers to manage
- Two capacity modes:
- on-demand: eliminates the need to provision or manage capacity required for running applications; Automatic provisioning and scaling
- provisioned
- Pay only for what you use
- Built-in integrations with other AWS services to create analytics, serverless, and application integration solutions
- To ingest and collect terabytes of data per day from application and service logs, clickstream data, sensor data, and in-app user events to power live dashboards, generate metrics, and deliver data into data lakes
- To build applications for high-frequency event data such as clickstream data, and gain access to insights in seconds, not days, using AWS Lambda or Amazon Managed Service for Apache Flink
- To pair with AWS Lambda to respond to or adjust immediate occurrences within the event-driven applications in your environment, at any scale.
The producers continually push data to Kinesis Data Streams, and the consumers process the data in real time. Producer can be e.g. CloudWatch Log and consumer can be e.g. Lambda function.
A producer puts data records into shards and a consumer gets data records from shards. Consumers use shards for parallel data processing and for consuming data in the exact order in which they are stored. If writes and reads exceed the shard limits, the producer and consumer applications will receive throttles, which can be handled through retries.
Capacity Modes
- Provisioned
- data stream capacity is fixed
- 1 shard has fixed capacities:
- Write: Maximum
1 MiB/second
1,000 records/second
- Read: Maximum
2 MiB/second
- N shards will multiply R/W capacity by N
- On-demand
- data stream capacity scales automatically
Data Retention Period
A Kinesis data stream stores records from 24 hours by default, up to 8760 hours (365 days). You can update the retention period via the Kinesis Data Streams console or by using the IncreaseStreamRetentionPeriod and the DecreaseStreamRetentionPeriod operations.
The default retention period of 24 hours covers scenarios where intermittent lags in processing require catch-up with the real-time data. A seven-day retention lets you reprocess data for up to seven days to resolve potential downstream data losses. Long-term data retention greater than seven days and up to 365 days lets you reprocess old data for use cases such as algorithm back testing, data store backfills, and auditing.
Data Stream << Shards << Data Records
Kinesis data stream is a set of shards. Each shard has a sequence of data records.
A data record is the unit of data stored in a Kinesis data stream. Data records are composed of:
- Sequence number
- within the shard
- assigned by Kinesis Data Streams
- Partition key
- used to isolate and route records to different shards of a data stream
- specified by your data producer while adding data to a Kinesis data stream. For example, let’s say you have a data stream with two shards (shard 1 and shard 2). You can configure your data producer to use two partition keys (key A and key B) so that all records with key A are added to shard 1 and all records with key B are added to shard 2.
- Data blob
- an immutable sequence of bytes
- data of interest your data producer adds to a data stream
- Kinesis Data Streams does not inspect, interpret, or change the data in the blob in any way
- A data blob (the data payload before Base64-encoding) can be up to 1 MB.
How to find all shards in a stream?
% aws kinesis list-shards \
--stream-name my-stream \
--region us-east-2 \
--profile my-profile
{
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "440282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49663754454378916691333541504734985347376184017408753666"
}
}
]
}
Note HashKeyRange - it is used by data producers to determine into which shard to write the next document.
Shard Iterator
Shard iterators are a fundamental tool for consumers to process data in the correct order and without missing records. They provide fine control over the reading position, making it possible to build scalable, reliable stream processing workflows.
A shard iterator is a reference marker that specifies the exact position within a shard from which data reading should begin and continue sequentially. It enables consumers to access, read, and process records from a particular location within a data stream shard.
Different types of shard iterators control where reading starts:
- AT_SEQUENCE_NUMBER: Reads from the specific sequence number.
- AFTER_SEQUENCE_NUMBER: Reads just after the specified sequence number.
- TRIM_HORIZON: Starts from the oldest available data in the shard.
- LATEST: Starts from the newest record (most recently added record, records added after the iterator is created)
- AT_TIMESTAMP: Starts from a specific timestamp in the shard
When reading repeatedly from a stream, the shard iterator is updated after each GetRecords request (with the NextShardIterator returned by the API).
This mechanism allows applications to seamlessly resume reading from the correct point in the stream.
Proper management of shard iterators is essential to avoid data loss or duplicate reads, especially as data retention policies and processing speeds can affect the availability of data in the stream
How to get an iterator in a shard?
A shard iterator is obtained using the GetShardIterator API, which marks the spot in the shard to start reading records.
The command aws kinesis get-shard-iterator is used to obtain a pointer (iterator) to a specific position in a shard of a Kinesis stream. We need this iterator to actually read records from the stream using aws kinesis get-records.
% ITERATOR=$(aws kinesis get-shard-iterator \
--stream-name my-stream \
--shard-id shardId-000000000000 \
--shard-iterator-type TRIM_HORIZON \
--query 'ShardIterator' \
--output text \
--region us-east-2 \
--profile my-profile)
ShardIterator is a token we pass to
aws kinesis get-records to actually fetch the data.
% echo $ITERATOR
AAAAAAAAAAHf65JkbV8ZJQ...Exsy8WerU5Z8LKI8wtXm95+blpXljd0UWgDs7Seo9QlpikJI/6U=
We can now read records directly from the stream:
% aws kinesis get-records \
--shard-iterator $ITERATOR \
--limit 50 \
--region us-east-2 \
--profile my-profile
{
"Records": [
{
"SequenceNumber": "49666716197061357389751170868210642185623031110770360322",
"ApproximateArrivalTimestamp": "2025-09-03T17:37:00.343000+01:00",
"Data": "H4sIAAAAAAAA/+3YTW/TMBgH8K8S...KaevxIQAA",
"PartitionKey": "54dc991cd70ae6242c35f01972968478"
},
{
"SequenceNumber": "49666716197061357389751170868211851111442645739945066498",
"ApproximateArrivalTimestamp": "2025-09-03T17:37:00.343000+01:00",
"Data": "H4sIAAAAAAAA/+3YS2vcM...889uPBV8BVXceAAA=",
"PartitionKey": "e4e9ad254c154281a67d05a33fa0ea31"
},
...
]
}
Data field in each record is Base64-encoded. That’s because Kinesis doesn’t know what your producers are sending (could be JSON, gzipped text, protobuf, etc.), so it just delivers raw bytes.
Why each record read from the shard has a different Partition Key?
Each record in a Kinesis stream can have a different PartitionKey because the PartitionKey is chosen by the data producer for each record and is not tied directly to the way records are read from the stream or which iterator type is used. Even when reading from a single shard using TRIM_HORIZON, records within that shard may have different PartitionKeys because the PartitionKey is used to route records to shards at the time of writing—not to group records within a shard during reading.
How Partition Keys and Shards Work
The PartitionKey determines to which shard a record is sent by hashing the key and mapping it to a shard's hash key range. Multiple records with different PartitionKeys can end up in the same shard, especially if the total number of unique partition keys is greater than the number of shards, or if the hash function maps them together. When reading from a shard (with any iterator, including TRIM_HORIZON), the records are read in order of arrival, but each can have any PartitionKey defined at ingest time.
Reading and PartitionKeys
Using TRIM_HORIZON just means starting at the oldest record available in the shard. It does not guarantee all records have the same PartitionKey, only that they are the oldest records remaining for that shard. Records from a single shard will often have various PartitionKeys, all mixed together as per their original ingest. Therefore, it is normal and expected to see a variety of PartitionKeys when reading a batch of records from the same shard with TRIM_HORIZON
Writing and Reading
Data records written by the same producer can end up in different shards if the producer chooses different PartitionKeys for those records. The distribution of records across shards is determined by the hash of the PartitionKey assigned to each record, not by the producer or the order of writing.
Distribution Across Shards
If a producer sends data with varying PartitionKeys, Kinesis uses those keys' hashes to assign records to shards; thus, even the same producer's records can be spread across multiple shards.
If the producer uses the same PartitionKey for all records, then all its records will go to the same shard, preserving strict ordering for that key within the shard.
Reading N Records: Producer and Sequence
When reading N records from a shard iterator, those records are the next available records in that shard, in the order they arrived in that shard. These records will not necessarily all be from the same producer, nor are they guaranteed to be N consecutive records produced by any single producer.
Records from different producers, as well as from the same producer if it used multiple PartitionKeys, can appear in any sequence within a shard, depending on how PartitionKeys are mapped at write time.
In summary, unless a producer always uses the same PartitionKey, its records may spread across shards, and any batch read from a shard iterator will simply reflect the ordering of records within that shard, including records from multiple producers and PartitionKeys.
How to get records content in a human-readable format?
We need to extract Data field from a record, Base64 decode it and then process the data further. In our example the payload was gzip-compressed JSON (as that's what CloudWatch Logs → Kinesis subscription delivers) so we need to decompress it and parse it as JSON:
% aws kinesis get-records \
--shard-iterator $ITERATOR \
--limit 1 \
--query 'Records[0].Data' \
--output text \
--region us-east-2 \
--profile my-profile \
| base64 --decode \
| gzip -d \
| jq .
{
"messageType": "DATA_MESSAGE",
"owner": "123456789999",
"logGroup": "/aws/lambda/my-lambda",
"logStream": "2025/09/03/[$LATEST]202865ca545544deb61360d571180d45",
"subscriptionFilters": [
"my-lambda-MainSubscriptionFilter-r1YlqNvCVDqk"
],
"logEvents": [
{
"id": "39180581104302115620949753249891540826922898325656305664",
"timestamp": 1756918020250,
"message": "{\"log.level\":\"info\",\"@timestamp\":\"2025-09-03T16:47:00.250Z\",\"log.origin\":{\"function\":\"github.com/elastic/apm-aws-lambda/app.(*App).Run\",\"file.name\":\"app/run.go\",\"file.line\":98},\"message\":\"Exiting due to shutdown event with reason spindown\",\"ecs.version\":\"1.6.0\"}\n"
},
{
"id": "39180581104302115620949753249891540826922898325656305665",
"timestamp": 1756918020250,
"message": "{\"log.level\":\"warn\",\"@timestamp\":\"2025-09-03T16:47:00.250Z\",\"log.origin\":{\"function\":\"github.com/elastic/apm-aws-lambda/apmproxy.(*Client).forwardLambdaData\",\"file.name\":\"apmproxy/apmserver.go\",\"file.line\":357},\"message\":\"Dropping lambda data due to error: metadata is not yet available\",\"ecs.version\":\"1.6.0\"}\n"
},
{
"id": "39180581104302115620949753249891540826922898325656305666",
"timestamp": 1756918020250,
"message": "{\"log.level\":\"warn\",\"@timestamp\":\"2025-09-03T16:47:00.250Z\",\"log.origin\":{\"function\":\"github.com/elastic/apm-aws-lambda/apmproxy.(*Client).forwardLambdaData\",\"file.name\":\"apmproxy/apmserver.go\",\"file.line\":357},\"message\":\"Dropping lambda data due to error: metadata is not yet available\",\"ecs.version\":\"1.6.0\"}\n"
}
]
}
Metrics to Observe
Basic and enhanced CloudWatch Metrics
Kinesis Data Streams and Amazon CloudWatch are integrated so that you can collect, view, and analyze CloudWatch metrics for your Kinesis data streams. This integration supports basic stream-level and enhanced shard-level monitoring for Kinesis data streams.
- Basic (stream-level) metrics – Stream-level data is sent automatically every minute at no charge.
- Enhanced (shard-level) metrics – Shard-level data is sent every minute for an additional cost.
Stream has producers and consumers. If rate of writing new records is higher than rate of reading them, records will reach their retention age (24 hours by default) and the oldest unread will start being removed from the stream and lost forever.
To prevent this from happening we can monitor some stream metrics and also set alarms when they reach critical thresholds.
GetRecords.IteratorAgeMilliseconds
It measures how old the oldest record returned by GetRecords is (how far our consumer lags). A very large value → our consumer(s) aren’t keeping up with the incoming write rate.
The age of the last record in all GetRecords calls made against a Kinesis stream, measured over the specified time period. Age is the difference between the current time and when the last record of the GetRecords call was written to the stream. The Minimum and Maximum statistics can be used to track the progress of Kinesis consumer applications. A value of zero indicates that the records being read are completely caught up with the stream. Shard-level metric name: IteratorAgeMilliseconds.
Meaningful Statistics: Minimum, Maximum, Average, Samples
Unit info: Milliseconds
There are 86,400,000 milliseconds in a day so if the reading of this metric goes above it, that means that some records will be lost.
Iterator-age number is a classic “consumer is falling behind” symptom.
IteratorAgeMilliseconds = 86.4M → very high, backlog building
GetRecords.Bytes
The number of bytes retrieved from the Kinesis stream, measured over the specified time period. Minimum, Maximum, and Average statistics represent the bytes in a single GetRecords operation for the stream in the specified time period.
Shard-level metric name: OutgoingBytes
Meaningful Statistics: Minimum, Maximum, Average, Sum, Samples
Unit info: Bytes
GetRecords - sum (MiB/second)
GetRecords - sum (Count)
- GetRecords.Records decreasing → Lambda is lagging
GetRecords iterator age - maximum (Milliseconds)
GetRecords latency - average (Milliseconds)
GetRecords success - average (Ratio)
- GetRecords.Success decreasing → Lambda is not keeping up
Incoming data - sum (MiB/second)
Incoming data - sum (Count)
PutRecord - sum (MiB/second)
PutRecords - sum (MiB/second)
PutRecord latency - average (Milliseconds)
PutRecords latency - average (Milliseconds)
PutRecord success - average (Ratio)
PutRecords successful records - average (Percent)
PutRecords failed records - average (Percent)
PutRecords throttled records - average (Percent)
Read throughput exceeded - average (Ratio)
Write throughput exceeded - average (Count)
Addressing Bottlenecks
If Lambda function is stream consumer, with just 1 shard, only 1 Lambda invocation can read at a time (per shard). If our log rate exceeds what that shard can handle, the iterator age skyrockets.
It is possible to change number of shards on a live Kinesis stream:
aws kinesis update-shard-count \
--stream-name your-stream-name \
--target-shard-count 4 \
--scaling-type UNIFORM_SCALING
Each shard = 1 MiB/s write and 2 MiB/s read, so 4 shards = 4 MiB/s write and 8 MiB/s read.
Lambda will then process 4 records batches in parallel (one per shard).
To empower Lambda for faster processing, consider increasing its RAM memory e.g. from 256 to 512MB. In AWS Console, this can be done in Lambda >> Configuration >> General configuration.
In Kinesis stream trigger, we can try to increase:
- batch size: 100 --> 400
- concurrent batches per shard: 1 --> 5
Example of settings for Kinesis stream trigger:
- Activate trigger: Yes
- Batch size: 400
- Batch window: None
- Concurrent batches per shard: 5
- Event source mapping ARN: arn:aws:lambda:us-east-2:123456789012:event-source-mapping:80bd81a9-c175-4af5-9aa9-8926b0587f40
- Last processing result: OK
- Maximum age of record: -1
- Metrics: None
- On-failure destination: None
- Report batch item failures: No
- Retry attempts: -1
- Split batch on error: No
- Starting position: TRIM_HORIZON
- Tags: View
- Tumbling window duration: None
- UUID: 80bd81a9-c175-4af5-9aa9-8926b0587f40
In AWS Console, this can be done in Lambda >> Configuration >> Triggers.
What “Concurrent batches per shard” does?
- Each shard can have multiple batches being processed at the same time.
- Default is 1, meaning: the next batch from a shard won’t be sent to Lambda until the previous batch finishes.
- If your Lambda is slow or variable in duration, this can create a backlog because only one batch per shard is in flight at a time.
When to increase it?
- If IteratorAgeMilliseconds is very high → Lambda cannot keep up with the stream.
- If Lambda execution duration is variable → a single batch in flight per shard limits throughput.
- If you have sufficient Lambda concurrency (which you do — up to 15) → you can safely allow multiple batches per shard.
Recommended approach
- Start with 2–5 concurrent batches per shard
- This allows Kinesis to send multiple batches from the same shard to Lambda simultaneously.
- Observe if IteratorAgeMilliseconds decreases.
- Monitor Lambda throttles and duration
- Ensure your Lambda’s memory/cpu and timeout can handle multiple concurrent batches.
- if no throttling currently → room to increase concurrency.
- Adjust batch size if needed
- Larger batch sizes may help throughput, but smaller batch sizes + higher concurrency often reduce latency.
Important notes
- Increasing concurrency per shard does not increase shard limits; it just allows more parallelism per shard.
- If Lambda fails batches, retries are per batch → more concurrency increases the number of batches being retried simultaneously.
Practical starting point:
- Set Concurrent batches per shard = 5
- Keep batch size = 400
- Monitor:
- IteratorAgeMilliseconds
- GetRecords.Records
- Lambda duration / concurrency
- Adjust up/down based on observed backlog.
Whatever you change, do a single change at a time and after that monitor performance. That will give a clear picture of the impact of that particular parameter and its value.
Amazon Kinesis Data Streams Terminology and concepts - Amazon Kinesis Data Streams