Home > OS >  How to signal an infinite observable to stop/continue emission?
How to signal an infinite observable to stop/continue emission?

Time:06-20

Problem description

I writing an Android application in Java to handle RFID reading based on some proprietary library and hardware. The RFID reader has a JNI library, which acts as an API to the hardware. This library is thread safe (as they say) but I cannot make the scanning process run on the Android's UI thread. I also don't want to use AsyncTask because it has started being deprecated I want the user to be able to start/resume the scanning process based on the click of a button without the process starting each time from the beginning.

I have the following code inside a class Scanner that is a singleton object.

class Scanner {
/*...*/
    public Observable<Product> scan() {
        return Observable.create(emitter -> {
            while (true) {
                 UHFTAGInfo TAG = RFIDReader.inventorySingleTag();
                 if (TAG != null) {
                    emitter.onNext(new Product(TAG.getEPC()));
                 }
            }
        });
    }
/*...*/
}

On the UI side, I have a ScanFragment and some buttons. This is the OnClickListener() of the scanButton that triggers the scanning process.

scanButton.setOnClickListener(new View.OnClickListener() {
    @Override
    public void onClick(View view) {
         Scanner.getInstance().scan()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Product>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {

                }

                @Override
                public void onNext(@NonNull Product product) {
                    listProducts.add(product);
                    adapter.notifyDataSetChanged();
                }

                @Override
                public void one rror(@NonNull Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

Questions

  • What is the correct way to stop/resume/end the above infinite emitting Observable when another stopButton/resumeButton is pressed or when user moves to another fragment?
  • I have thought many different things like an Atomic flag without and with an Observable for it and using takeUntil for that Observable. But will takeUntil stop my Observable and I will have to start it from the beginning?

CodePudding user response:

First of all, you have an unconditional infinite loop which is unable to respond to when the downstream disposes the sequence. Use while (!emitter.isDisposed()).

Stopping a sequence is done via Disposable.dispose. You get one Disposable in onSubscribe so you'll have to make that available to the button that you want to stop a sequence.

Resuming is essentially subscribing to the sequence again, i.e., what you already do in that onClick callback.

  • Related