I'm trying to create some functionality for uploading images to remote server using RxSwift.
My upload function is below:
func upload(image: UIImage) -> Single<UploadResponse> {
guard let data = image.jpegData(compressionQuality: 0.6) else { return .never()}
let target = UserEndpoint.addNewProfileImage(data: data, nama: "image", fileName: "image.jpeg", mimeType: "image/jpeg")
return networkProvider.request(target)}
}
And how I call it:
selectImageTrigger
.flatMapLatest { [weak self] image -> Observable<UploadResponse> in
guard let self = self else { return .never()}
return self.upload(image: image)
.trackError(self.errorTracker)
.trackActivity(self.activityIndicator)
.catchErrorJustComplete()
}
.subscribe()
.disposed(by: rx.disposeBag)
And I also need to make it possible to stop uploading images simply by clicking on the "stop" button.
Here's my approach, and it looks, in my opinion, ugly. But anyway it works :).
var token: Disposable?
selectedImage
.subscribe(onNext: { [weak self] image in
guard let self = self else { return }
token = self.upload(image: image)
.trackError(self.errorTracker)
.trackActivity(self.activityIndicator)
.catchErrorJustComplete()
.subscribe()
})
.disposed(by: rx.disposeBag)
stopTrigger
.subscribe(onNext: { _ in
token?.dispose()
})
.disposed(by: rx.disposeBag)
I use RxAlamofire which means that in order to cancel the request I have to dispose the subscription. But I want to stop and repeat (dispose and resubscribe?) at any time.
So what's the best way to do it?
CodePudding user response:
Here's a cleaner way... The merge
emits an Optional image, if it emit nil
, that will stop the current upload (if there is one).
Whenever a new event comes in, the flatMapLatest
operator will dispose the previous Observable and subscribe to the new one.
The function will emit a next event containing an UploadResponse if the upload completes, and will emit next(nil) if an upload is stopped.
func example(select: Observable<UIImage>, stop: Observable<Void>) -> Observable<UploadResponse?> {
Observable.merge(
select.map(Optional.some),
stop.map(to: Optional.none)
)
.flatMapLatest { [weak self] (image) -> Observable<UploadResponse?> in
guard let self = self else { return .empty() }
guard let image = image else { return .just(nil) }
return self.upload(image: image)
.map(Optional.some)
.trackError(self.errorTracker)
.trackActivity(self.activityIndicator)
.asObservable()
}
}
Also, that return .never()
in the upload(image:)
function is a mistake. It should either be .error
or .empty
and if the latter, then the function needs to return a Maybe
instead of a Single
.
CodePudding user response:
I have created an extension based on @daniel-t answer
extension ObservableType {
public func flatMapLatestCancellable<Source: ObservableConvertibleType>(cancel: Observable<Void>, _ selector: @escaping (Element) throws -> Source)
-> Observable<Source.Element?> {
Observable<Element?>.merge(
self.map(Optional.some),
cancel.map { _ in Optional.none }
)
.flatMapLatest { element -> Observable<Source.Element?> in
guard let element = element else { return .just(nil)}
return try selector(element)
.asObservable()
.map(Optional.some)
}
}
}
Simple usage:
start
.flatMapLatestCancellable(cancel: cancel, { [unowned self] _ in
return anyTask()
.catch { error in
print(error.localizedDescription)
return .empty()
}
})
.observe(on: MainScheduler.instance)
.subscribe(onNext: { item in
print(item)
})
.disposed(by: disposeBag)