I have this code:
async def run_events_listener():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string(os.getenv('CHECKPOINT_STORE_URI'), os.getenv('CHECKPOINT_CONTAINER'))
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string(
os.getenv('EVENTS_HUB_URI'), consumer_group="$Default",
eventhub_name=os.getenv('EVENTHUB_NAME'), checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
In order to implement graceful stop, I have to close EventHubConsumerClient correctly. run_events_listener
is an asyncio.Task
. How should I close it?
CodePudding user response:
There is no action that callers need to explicitly take to shut down cleanly. The consumer internally manages the startup/shutdown steps via a try/finally block.
Terminating your process should or cancelling the asyncio
task is sufficient to clean things up.
CodePudding user response:
The consumer is in charge of efficient resource management, aiming to keep resource usage low during periods of idleness and managing health during periods of increased use.
When the application is closing down, calling either the
CloseAsync(CancellationToken)
orDisposeAsync()
method will ensure that network resources and other unmanaged objects are appropriately cleaned away.CloseAsync(CancellationToken) closes the consumer.
DisposeAsync() completes the work of cleaning up the
EventHubConsumerClient
's resources, including ensuring that the client has been closed.IsClosed
specifies whether or not this EventHubConsumerClient has been closed.