I'm using Tokio runtime and I'm trying to build an interface which will return messages from several streams , but in this instance I cannot consume the streams with StreamMap
or any merge
method (streams will be owned by a wrapping object), so I plan to use select!
statement for that:
struct Listener {
alice: StructWithStreamField,
bob: StructWithStreamField,
}
impl Listener {
async fn listen(&mut self) -> Option<MessageAndSource> {
select! {
maybe_message = self.bob.stream.next() => {
let message = maybe_message?;
return Some(MessageAndSource::FromBob(message))
}
maybe_message = self.alice.stream.next() => {
let message = maybe_message?;
return Some(MessageAndSource::FromAlice(message))
}
}
}
}
Granted the stream
field owned by bob
and alice
actually implements Stream
, can I be sure that cancelling listen
method is always safe, since there is no await
point in between receiving message from stream and returning it? In more general sense, are there any guidelines for assessing cancel-safety? Or maybe there is a better suited solution?
CodePudding user response:
Yes. If the futures select!
awaits are cancellation safe, so is listen()
.
To verify a future is cancellation safe you need to inspect all points it .await
at (including nested) and make sure that if we stop the future there no data will be lost. This is because futures can be only cancelled at .await
s. Since there are no .await
s, there's nothing to verify.
Of course, the user can still drop the result of listen()
, but the same applies to any function, async or not.