관리 메뉴

김종권의 iOS 앱 개발 알아가기

[iOS - swift] [오픈소스 까보기] RxSwift - Observable과 Observer 구현부 본문

오픈소스 까보기

[iOS - swift] [오픈소스 까보기] RxSwift - Observable과 Observer 구현부

jake-kim 2023. 8. 28. 01:30

RxSwift의 스트림 처리

  • RxSwift를 사용할 때 보통 Observable 인스턴스를 .subscribe하여 onNext, onError, onCompleted로 처리가 가능
  • RxSwift는 위에서 아래로 stream 형태로 비동기 처리를 코드상으로는 순서대로 눈에보기 쉽게 처리가 가능
API.download(file: "http://www...")
  .subscribe(onNext: { data in
    // Append data to temporary file
  },
  onError: { error in
    // Display error to user
  },
  onCompleted: {
    // Use downloaded file
  })
  • 이번 포스팅 글에서 RxSwift의 Observable과 Observer 구현부를 분석하여 어떻게 위처럼 처리가 가능한지 파악하는게 목적

RxSwift의 Observable 구현부

  • init(), deinit()
    • 초기화할때 leak 체크를 위해 Resource에 하나를 추가
    • deinit될때는 Resouce 하나 제거
  • subscribe메소드
    • 파라미터로 Observer 타입을 받아서, Disposable을 반환
    • Observer 타입에서는 onNext, onError, onCompleted와 같은 이벤트 처리가 가능해야하므로 아래에서 알아볼 것이지만 Closure타입임을 예측 가능
    • 구현부에서는 rxAbstractMethod()가 호출 - subscribe메소드를 반드시 서브 클래스에서 구현하라는 일종의 방어코드
// Observable.swift

public typealias RxObservable<Element> = RxSwift.Observable<Element>

public class Observable<Element> : ObservableType {
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<Element> { self }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }
}
  • rxAbstractMethod()의 정체
    • swift에서는 추상 메소드를 구현할 수 없으니, 이 메소드가 호출되는 쪽의 메소드가 override 되지 않으면 일부러 crash가 발생하게끔 런타임때 체크하기 위함
// Rx.swift

/// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
    rxFatalError("Abstract method", file: file, line: line)
}
  • Observable에서는 subscribe 메소드가 핵심인데, 여기서 Observer 타입을 파라미터로 받으므로 Observer 타입 이해가 중요

RxSwift의 Observer 타입

  • Observable.swift안에 subscribe 메소드에 있는 Observer 타입
    • 사용하는 쪽에서 이 메소드를 사용하면 closure형태로 onNext, onError, onCompleted 상태가 있으므로 ObserverType은 closure형태일 것
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    rxAbstractMethod()
}
  • ObserverType 프로토콜은 단순히 Observer들이 갖추어야 하는 onNext와 같은 이벤트 행위들이 정의되어 있을 것
  • ObserverType.swift 구현부
    • 단순히 on 메소드가 존재
    • on 메소드는 observer들에게 이벤트가 일어났다고 알려주는 메소드
// ObserverType.swift

/// Supports push-style iteration over an observable sequence.
public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype Element

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<Element>)
}
  • 바로 밑에 extension으로 onNext, onCompleted, onError 메소드가 존재
    • ObserverType은 이벤트를 발생하는 역할을 하고, 각 이벤트는 onNext, onCompleted, onError가 존재
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
@frozen public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}


public struct AnyObserver<Element> : ObserverType {
    public typealias EventHandler = (Event<Element>) -> Void
    private let observer: EventHandler
    
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
    public func on(_ event: Event<Element>) {
        self.observer(event)
    }
    
    public func asObserver() -> AnyObserver<Element> {
        self
    }
}

정리

  • Observable에 generics로 특정 타입을 선언하면 이 타입은 subscirbe 메소드 사용 가능
    • subscribe 메소드의 파라미터에는 ObserverType을 받도록 구현
    • ObserverType은 onNext, onError, onCompleted 메소드가 있는 추상화된 프로토콜 타입
    • ObserverType을 준수하는 AnyObserver들을 보면 생성자에서 closure를 받아서 이 closure를 저장하고 있다가 과업이 끝나면 이 closure를 실행시켜주는데 next, error, completed 케이스 형태로 반환해줌
  • 즉, 사용하는 쪽에서 Observable을 만들고 subscribe를 하면 ObserverType이 따로 클로저를 저장하고 있다가 비동기 처리가 끝나면 next, error, completed 타입으로 클로저를 실행시켜주는 것

* 참고

https://github.com/ReactiveX/RxSwift

Comments