Home > Back-end >  What is the best way to stop an image uploading and restart it using RxSwift?
What is the best way to stop an image uploading and restart it using RxSwift?

Time:05-06

I'm trying to create some functionality for uploading images to remote server using RxSwift.

My upload function is below:

func upload(image: UIImage) -> Single<UploadResponse>  {
    guard let data = image.jpegData(compressionQuality: 0.6) else { return .never()}
    let target = UserEndpoint.addNewProfileImage(data: data, nama: "image", fileName: "image.jpeg", mimeType: "image/jpeg")
    return networkProvider.request(target)}
  }

And how I call it:

selectImageTrigger
      .flatMapLatest { [weak self] image -> Observable<UploadResponse> in
        guard let self = self else { return .never()}
        return self.upload(image: image)
          .trackError(self.errorTracker)
          .trackActivity(self.activityIndicator)
          .catchErrorJustComplete()
      }
      .subscribe()
      .disposed(by: rx.disposeBag)

And I also need to make it possible to stop uploading images simply by clicking on the "stop" button.

Here's my approach, and it looks, in my opinion, ugly. But anyway it works :).

 var token: Disposable?
    
    selectedImage
      .subscribe(onNext: { [weak self] image  in
        guard let self = self else { return }
        token = self.upload(image: image)
          .trackError(self.errorTracker)
          .trackActivity(self.activityIndicator)
          .catchErrorJustComplete()
          .subscribe() 
      })
      .disposed(by: rx.disposeBag)
    
    
    stopTrigger
      .subscribe(onNext: { _ in
        token?.dispose()
      })
      .disposed(by: rx.disposeBag)

I use RxAlamofire which means that in order to cancel the request I have to dispose the subscription. But I want to stop and repeat (dispose and resubscribe?) at any time.

So what's the best way to do it?

CodePudding user response:

Here's a cleaner way... The merge emits an Optional image, if it emit nil, that will stop the current upload (if there is one).

Whenever a new event comes in, the flatMapLatest operator will dispose the previous Observable and subscribe to the new one.

The function will emit a next event containing an UploadResponse if the upload completes, and will emit next(nil) if an upload is stopped.

func example(select: Observable<UIImage>, stop: Observable<Void>) -> Observable<UploadResponse?> {
    Observable.merge(
        select.map(Optional.some),
        stop.map(to: Optional.none)
    )
    .flatMapLatest { [weak self] (image) -> Observable<UploadResponse?> in
        guard let self = self else { return .empty() }
        guard let image = image else { return .just(nil) }
        return self.upload(image: image)
            .map(Optional.some)
            .trackError(self.errorTracker)
            .trackActivity(self.activityIndicator)
            .asObservable()
    }
}

Also, that return .never() in the upload(image:) function is a mistake. It should either be .error or .empty and if the latter, then the function needs to return a Maybe instead of a Single.

CodePudding user response:

I have created an extension based on @daniel-t answer

extension ObservableType {
  public func flatMapLatestCancellable<Source: ObservableConvertibleType>(cancel: Observable<Void>, _ selector: @escaping (Element) throws -> Source)
  -> Observable<Source.Element?> {
    Observable<Element?>.merge(
      self.map(Optional.some),
      cancel.map { _ in Optional.none }
    )
      .flatMapLatest { element -> Observable<Source.Element?> in
        guard let element = element else { return .just(nil)}
        return try selector(element)
          .asObservable()
          .map(Optional.some)
      }
  }
}

Simple usage:

 start
      .flatMapLatestCancellable(cancel: cancel, { [unowned self] _ in
        return anyTask()
          .catch { error in
            print(error.localizedDescription)
            return .empty()
          }
      })
      .observe(on: MainScheduler.instance)
      .subscribe(onNext: { item in
        print(item)
      })
      .disposed(by: disposeBag)
  • Related