티스토리 뷰

실습 환경

  • Java 11
  • IntelliJ
  • Springboot

Project 생성 시 dependency 에서 Spring Reactive Web 추가
혹은
projectreactor 의 dependency를 추가한다.

<dependency> 
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
 </dependency>

Reactive Streams 등장 배경

리액티브 프로그래밍을 구현한 라이브러리들이 동시다발적으로 구현되기 시작했다. 이에 하나의 규칙을 정해서, 여러 리액티브 프로그래밍 구현체들이 상호 변환 가능하도록 만들자는 목소리가 나오기 시작했다.

It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.

그렇게 만들어진 규칙이 바로 Reactive Streams이다. RxJava, Reactor 등이 Reactive Streams를 기반으로 만들어졌다.


3가지 인터페이스

Reactive Streams는 매우 간단한 3개의 Interface로 이루어졌다.

  • Publisher
    • 0개 ~ 무한한 갯수의 data를 생성해서, Subcriber에게 data를 publish
      public interface Publisher<T> {
      public void subscribe(Subscriber<? super T> s);
      }
  • Subscriber
    • publisher로부터 데이터를 받았을 때, 데이터를 받는 과정 중 오류가 생기거나 데이터를 모두 받았을 때 어떻게 처리할지 정의
    • subcription.request(n)를 이용해 데이터를 몇 개 가져올지 publisher에게 요청할 수 있음(backpressure)
      public interface Subscriber<T> {
      public void onSubscribe(Subscription s);
      public void onNext(T t);
      public void onError(Throwable t);
      public void onComplete();
      }
  • Subscription
    • publisher가 subscriber에게 data를 어떻게 넘겨줄지 정의해 둔 객체
      public interface Subscription {
      public void request(long n);
      public void cancel();
      }

Protocol

Reactive-streams 에는 여러 규칙이 있지만 그중에서도 Publisher 구현체가 반드시 지켜야 하는 핵심 프로토콜이 있다.

onSubscribe onNext* (onError | onComplete)?

위 Method 흐름은 Publisher 가 따라야하는 Method 호출 흐름으로 보면 된다.

  • onSubscribe : Subscriber에게 구독이 시작되었음을 알림
  • onNext : Subscriber에게 다음 element를 넘겨줌. ‘*’ 은 onNext가 0번~무한 번까지 이어질 수 있음을 의미.
  • onError : reactive flow 중 에러 발생시 호출. onComplete과 동시에 호출될 수 없으며 onError 호출 시 reactive flow는 종료됨.
  • onComplete : reactive flow 완료시 호출. onError와 동시에 호출될 수 없으며 onComplete 호출 시 reactive flow는 종료됨.

순서도

  1. Publisher.subscribe(subscriber) 를 이용해 Publisher에게 subscribe(구독) 신청을 보낸다.
  2. Publisher가 onSubscribe(subscription) 를 호출해 Subscriber에게 구독이 시작되었음을 알린다.
    이때, subscriber가 데이터 흐름을 제어할 수 있는 subscription 을 함께 넘기는 것에 유의하자.
  3. subscriber가 subscription에게 request(n) 을 이용해 데이터를 요청한다. 여기서 n은 ’n 개의 데이터를 줘!’를 의미한다. (backpressure)
  4. Publisher로부터, 생성된 data가 subscription에게 전달된다.

위 그림 상에는 Publisher로부터 data가 시작되는 것으로 나와있지만, 사실 (2)에서 subscriber에게 subscription을 넘겨줄 때 이미 data callback이 subscription에 들어있는 상태다. 다시 말해 (4)는 정확히 말하자면, Publisher로부터 받은 dataCallback을 활용해서 데이터를 얻어오는 과정이다.

  1. subscription 이 onNext 로 subscriber에게 데이터를 전달한다.
  2. flow가 완료되면 onComplete , 에러 발생 시 onError 를 호출하면서 flow가 종료된다.

코드로 구현해보기

Iterator로부터 데이터를 받아서 이를 Publish - Subscribe 하는 코드를 구현해보자

Iterator

Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

Subscribe 시작

iterPub.subscribe(logSub);

Publisher & Subscription

Publisher<Integer> iterPub = new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {
            //-- subscription
            Iterator<Integer> iterator = iter.iterator();

            @Override
            public void request(long l) {
                try {
                    while(iterator.hasNext()){
                        subscriber.onNext(iterator.next());
                    }
                    subscriber.onComplete();
                } catch (RuntimeException e) {
                    subscriber.onError(e);
                }
            }
            @Override
            public void cancel() {
            }
        });
        //-- end of subscription
    }
};
  • subscribe에서 onSubscribe() 를 가장 먼저 호출하는 것에 주목하자.
  • publisher는 onSubscribe를 호출할 때 데이터를 처리하는 로직을 subscription에 미리 담아서 subscriber에게 전달한다.

Subscriber

Subscriber<Integer> logSub = new Subscriber<>() {
    Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        log.info("onSubscribe");
        this.subscription = subscription;    // ------- (1)
        subscription.request(Long.MAX_VALUE);   //있는거 다 줘
    }

    @Override
    public void onNext(Integer integer) {
        log.info("onNext : {}", integer);    // -------- (2)
    }

    @Override
    public void onError(Throwable throwable) {
        log.info("onError : {}",throwable);
    }

    @Override
    public void onComplete() {
        log.info("onComplete");
    }
};
  • (1) subscription을 저장해놓고 쓰는 형태.
  • (2) 예제에서는 단순히 log 만 호출했지만, publisher로부터 데이터를 받아서 처리하는 로직을 추가해야 한다면 onNext() 에 구현하면 된다. 추가로, onNext에서 subscription.request(n) 형태로 backpressure를 조절할 수도 있다. (이번 예제에서는 생략)

전체 코드

@Slf4j
public class PubSubPractice {

    public static void main(String[] args) {

        Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

        Publisher<Integer> iterPub = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> subscriber) {
                subscriber.onSubscribe(new Subscription() {
                    Iterator<Integer> iterator = iter.iterator();

                    @Override
                    public void request(long l) {
                        try {
                            while(iterator.hasNext()){
                                subscriber.onNext(iterator.next());
                            }
                            subscriber.onComplete();
                        } catch (RuntimeException e) {
                            subscriber.onError(e);
                        }

                    }
                    @Override
                    public void cancel() {
                    }
                });
            }
        };

        Subscriber<Integer> logSub = new Subscriber<>() {

            Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription){
                log.info("onSubscribe");
                this.subscription = subscription;
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.info("onNext : {}", integer);

            }

            @Override
            public void onError(Throwable throwable) {
                log.info("onError : {}",throwable);
            }

            @Override
            public void onComplete() {
                log.info("onComplete");
            }
        };

        iterPub.subscribe(logSub);
    }


}

다음 포스팅에서는 Operator로 작동하는 Publisher를 추가로 구현해보자.


References

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/10   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
글 보관함