Home > front end >  How to Queue/Delay a function from starting if that same function is running elsewhere
How to Queue/Delay a function from starting if that same function is running elsewhere

Time:08-27

I'm kinda new to threadding stuffs and want to ask how to fix this problem. There are 3 different places to call a printing function (from a printing class).

Current pseudo-code is like this.

fun printOrderDirect(params) {
  PrintClass.execute(params) // using Rx
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object: Subscriber<Int>() {
      override fun onNext(t: Int) { response = t }
      override fun onCompleted() {}
      override fun one rror(e: Throwable?) { log(e) }
    })
}

fun printOrderRemote(params) { // called when receiving signal online
  PrintClass.execute(params).observeOn(Schedulers.immediate())
    .subscribe(object: Subscriber<Int>() {
      override fun onNext(t: Int) { response = t }
      override fun onCompleted() {
        // Do some filtering and call another print job for online order
        PrintClass.execute(params).observeOn(Schedulers.immediate())
          .subscribe(object: Subscriber<Int>() {
            override fun onNext(t: Int) { response = t }
            override fun onCompleted() {}
            override fun one rror(e: Throwable?) { log(e) }
          })
      }
      override fun one rror(e: Throwable) { log(e) }
    })
}

The PrintClass code:

class PrintClass {
  fun execute(params): Observable<Int> {
    return Observable.create<Int> { subscriber ->
      // do a lot of filtering, formatting stuffs
      val task = PrintTask()
      task.execute().observeOn(Schedulers.immediate())
        .subscribe(object: Subscriber<Int>() {
          override fun onNext(t: Int?) {}
          override fun onCompleted() {
            subscriber.onNext(RESPONSE_OK)
            subscriber.onCompleted()
          }
          override fun one rror(e: Throwable) {
            subscriber.onNext(RESPONSE_FAIL)
            subscriber.onError(e)
          }
        })
    }
      .onBackpressureBuffer()
      .subscribeOn(Schedulers.newThread())
  }
}

For the last 2 places to call the print job, it's being chained so it's okay, the first task needs to be completed in order to run the other.

But in the case of many online orders coming at once (the order update is not realtime, just syncing once every 90 secs, but in that timeframe, there can be many orders filling in), or simply just direct order 1 order online at once may cause congestions (the printing machine buffer or cache (idk how it's called) is pretty small I guess).

I would like to know how to block the PrintClass.execute() task from running when it's already running from another call. But not like stopping it, just delay/queue it until the previous one finished.
The printing machine dies everytime 3 online orders coming in.

I can't use Kotlin coroutines since the current codebase Kt version is super low. I tried my best to learn Rx and migrated from AsyncTask to Rx already.

I've tried using Lock.lock() before starting the PrintClass.execute() in the two functions and Lock.unlock() in onCompleted() but it would freeze my UI and getting ANR error if the print jobs are too long.

Thanks everyone in advance.

CodePudding user response:

You could use a global serialized PublishSubject and submit the params to it. Then a concatMap will make sure only one PrintTask.execute runs and captures the result.

Subject<Params> subject = PublishSubject.create().toSerialized();

subject.observeOn(Schedulers.io())
.concatMap(params -> {
    return PrintTask.execute(params)
           .observeOn(AndroidSchedulers.mainThread())
           .doOnNext(t -> { response = t })
           .onErrorResumeNext(e -> { log(e); return Observable.empty(); });
})
.subscribe();

fun printOrderDirect(params: Params) {
  subject.onNext(params);
}

fun printOrderRemote(params: Params) {
  subject.onNext(params);
}
  • Related