Home > Software engineering >  RxJava calling PublishSubject only once instead of multiple times
RxJava calling PublishSubject only once instead of multiple times

Time:10-27

I'm trying to get data from an API paging service, after each call I have to call for new data by setting a new page number and once the API responds with an empty array I just have to stop the observable.

So I've tried making retrofit return an Observable and by subscribing to it

    private fun getProducts(page: Int, lastId: Int): Observable<Response<List<ProdottoBarcode>>>? {
        val observable = urlServer?.let {
            RetrofitClient.getInstance()?.getService()?.getProducts(it, "A", page, lastId)
        }

        return observable
    }

Then the subscription is made via PublicSubject in that way

    private fun getAllProducts(): Observable<Response<List<ProdottoBarcode>>> {
        // TODO: get response headers with MAX_PRODUCTS and return it to activity
        var currentPage = 1
        val subject: PublishSubject<Response<List<ProdottoBarcode>>> = PublishSubject.create()
        return subject.doOnSubscribe {
            getProducts(currentPage, 0)?.subscribe(subject)
        }.doOnNext {
            val products = it.body()
            val lastId = it.headers().get("lastId")?.toInt()

            if (products?.isEmpty() == true) {
                subject.onComplete()
            } else {
                currentPage  = 1
                lastId?.let { id -> getProducts(currentPage, id)?.subscribe(subject) }
            }
        }
    }

And I'm subscribing to the data in my handleMessage() in service:

    override fun handleMessage(msg: Message) {
        getAllProducts().subscribe {
            val products = it.body()
            if (products != null) {
                for (product in products) {
                    scope.launch {
                        // TODO: return to activity counter of insert items
                        repository.insert(product)
                    }
                }
            }
        }

        stopSelf(msg.arg1)
    }

The issue is that getAllProducts stops after the 2nd page is fetched even if there are other data...

So doOnNext() is made just twice.

CodePudding user response:

It looks like you have a reentrancy and recursion problem.

You could just use range, concatMap the count to the service call, then make it stop when the result has an empty body:

int[] lastId = { 0 };
Observable.range(1, Integer.MAX_VALUE - 1)
.concatMap(currentPage -> getProducts(currentPage, lastId[0]))
.takeUntil(response -> response.body()?.products()?.isEmpty())
.doOnNext(response -> {
    lastId[0] = response.headers().get("lastId")?.toInt();
});
  • Related