I'm working on binding stream data that is consumed from a gRPC service stream call. The stream data is sent every X millisecs from the gRPC service and i'm binding this as observable data to a div in html, like simple ngIf*.
I need to handle a case when the service is not sending any data after X secs, I should not show a div. So planning to explicitly set null to the binding variable only when there is no stream received after X seconds.
So trying to figure out the best way to do this with RxJs operators. I was looking at Interval, timer, switchMap, but not sure how to implement it correctly.
Service.ts
@Injectable()
export class ApiMyTestService {
client: GrpcMyTestServiceClient;
public readonly testData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient) {
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.testData$ = this.listCoreStream();
}
listCoreStream(): Observable <TestReply.AsObject> {
return new Observable(obs => {
const req = new SomeRequest();
req.setClientName("GrpcWebTestClient");
const stream = this.client.getCoreUpdates(req); // Main gRPC Server stream Api call
stream.on('status', (status: Status) => {
console.log('ApiService.getStream.status', status);
});
stream.on('data', (message: any) => {
// gRPC Api call returns stream data here. Every X millisec service sends stream data.
console.log('ApiService.getStream.data', message.toObject());
obs.next(message.toObject() as TestReply.AsObject);
});
stream.on('end', () => {
console.log('ApiService.getStream.end');
obs.complete();
});
});
}
Component.ts
export class AppComponent implements AfterViewInit, OnDestroy {
public testReply?: TestReply.AsObject;
private _subscription: Subscription;
constructor(private readonly _MyTestService: ApiMyTestService) {
this._subscription = new Subscription();
}
public ngAfterViewInit(): void {
this._subscription = this._MyTestService.testData$.subscribe((data) => {
//testReply will be binded to a div, so hide div if null data
// how to set to null when the service didn't stream any data after X seconds
this.testReply = data;
});
}
Html:
//show this div only when we have stream data. Hide it when no data is binded for X secs!
<div *ngIf=(testReply)>
html elements that uses testReply
</div>
CodePudding user response:
You can create an observable that emits the values from your service, but will also emit undefined
after x number of seconds:
public testReply$ = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
Here's how it works:
merge
creates a singe observable from multiple sources, which will emit when any source emits. In this case the sources are:switchMap
subscribes to this "inner observable" for us and emit it's emissions. It will also replace the inner observable with a new one each time it receives an emission. Therefore, thetimer()
will only emit ifswitchMap
doesn't receive the next emission within 5000ms.
Now, you just use testReply$
:
public ngAfterViewInit(): void {
this._subscription = this.testReply$.subscribe(
data => this.testReply = data
);
}
To simplify things a bit, you can leverayge the AsyncPipe
in your template, and not need to worry about subscriptions:
<div *ngIf="testReply$ | async as testReply">
<p> {{ testReply }} </p>
</div>
export class AppComponent {
public testReply$: Observable<TestReply.AsObject> = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
constructor(private readonly _MyTestService: ApiMyTestService) { }
}