Home > Software engineering >  dispatch and _synchronized_subscribe RxSwift
dispatch and _synchronized_subscribe RxSwift

Time:01-02

I am trying to understand core logic of RxSwift and I have simple button which will trigger and prints console when onNext(5) trigger.

I am following the Thread from left side of Xcode as below.

-RxSwift Library

// Step 1
  public func onNext(_ element: E) {
        self.on(.next(element))
    }
// Step 2
    public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }
//Step 3
@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
    bag._value0?(event) 
/// after dispatch calling, it is creating BagKey as below
    func synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on) // directly flow comes here and my question is related with this step
        return SubscriptionDisposable(owner: self, key: key)
    }
// Step 4
        MapSink.on(_:)
// Step 5 FilterSink - PublishSubject... and flow goes on prints

  • My Code

    import UIKit
    import RxSwift
    
      class ViewController: UIViewController {
    
          let publisher = PublishSubject<Int>()
      let disposeBag = DisposeBag()
    
      @IBOutlet weak var button: UIButton!
    
      override func viewDidLoad() {
          super.viewDidLoad()
    
          publisher
                  .map { $0 * 2}
                  .filter { $0 > 4 }
                  .subscribe(onNext: { n in
                      print("We've got number \(n)!")
                  })
                  .disposed(by: disposeBag)
      }
    
      @IBAction func trigger(_ sender: Any) {
          print("button triggered")
          publisher.onNext(5)
      }
     }
    

My question is what is going on at Step 3 , Can you explain please? Also I checked out the synchronized_subscribe<O: ObserverType> function calling place but could not find it; thread directly create a key and comes to inside of this function. How come RxSwift jumps to there even tough it is not called, What is the magic ?

CodePudding user response:

I'm not sure what is really being asked here... The logic is that when onNext(5) is called, it will send a Event.next(5) to each of its subscribers (of which there is only one.)

As for what functions are called, the chain is like this:

ObserverType.onNext(_ element: Element)
    PublishSubject<Element>.on(_ event: Event<Element>)
        PublishSubject<Element>.synchronized_on(_ event: Event<Element>) // gets Bag of observers
        dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) // dispatches event to each observer in the bag.
            MapSink<SourceType, Observer: ObserverType>.on(_ event: Event<Self.Element>) // this is one of the observers.

You seem to be confused by the dispatch(_:_:) function. This function takes a Bag of observers (strictly speaking, a container of closures that accept Events) and sends the event to the one Observer contained in the Bag.

  • Related