I am using Boto3 with Kinesis. I want Kinesis to return a single item from the stream.
This is my code:
kinesis = boto3.client('kinesis')
iterator = kinesis.get_shard_iterator(
StreamName = "requests",
ShardIteratorType='LATEST'
)
response = kinesis.get_records(
ShardIterator = iterator,
Limit = 1,
StreamName = "requests",
)
print(response)
It fails with this error:
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "ShardId"
on the line where I initialize the iterator, so I know I'm missing a parameter, but what do I set ShardId
to? If the answer varies from stream to stream, what exactly is a ShardId
?
Thanks in advance!
CodePudding user response:
The core problem is that get_shard_iterator
requires a ShardId
argument passed to it.
The shard ID is the unique identifier of a shard within a stream.
To obtain the ShardId
(or ShardId
s to then pick from) for your stream, first call describe_stream
.
stream = client.describe_stream(StreamName='requests')
This will return to you a response object with a list of stream objects with a JsonPath of response.StreamDescription.Shards
.
Each object in this array will have a ShardId
. Dependent on how you've configured your Kinesis stream, you will have 1 or more shard objects in this array.
If you have 1 shard, the below should allow you to pick the first shard ID & proceed with the rest of your code:
kinesis = boto3.client('kinesis')
stream = client.describe_stream(StreamName='requests')
shard_id = stream['StreamDescription']['Shards'][0]['ShardId']
iterator = kinesis.get_shard_iterator(
StreamName = "requests",
ShardIteratorType='LATEST',
ShardId = shard_id
)
response = kinesis.get_records(
ShardIterator = iterator,
Limit = 1
)
print(response)
To learn more about shards themselves (out of scope), start by reading this great SO answer by John Rotenstein.