본문 바로가기

Spring/web-flux

Spring Webflux (5) Scheduler 2

목차

1. publishOn(), subscribeOn() 동작 이해

2. Scheduler 종류

 

 

publishOn()과 subscribeOn() 동작 이해


앞서 Spring Webflux (4) Scheduler 1에서 Scheduler Operator 에 대해 소개했는데요. 중요한 포인트를 다시 한 번 간략히 정리하고 넘어가겠습니다.

 

 

publishOn()

 

 

downstream으로 signal을 전송할 때 실행되는 스레드를 제어합니다. 쉽게 말하면 publishOn() 아래 선언된 operator 의 스레드를 제어합니다. 따라서 처음 fromArray 를 통해 데이터를 가져오는 부분은 Main 메서드에서 수행합니다.

 

실제 코드를 통해 결과를 확인해봅시다.

 

Flux.fromArray(new Integer[]{1,2,3,4,5,6})
        .doOnNext(data -> log.info("# doOnNext {}", data))
        .publishOn(Schedulers.boundedElastic()) // 스레드 제어
        .filter(data -> data % 2 == 0) // 짝수만 필터링
        .doOnNext(data -> log.info("# even filter {}", data))
        .subscribe();

Thread.sleep(500L);

 

결과

17:50:49.097 [main] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 1
17:50:49.109 [main] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 2
17:50:49.109 [main] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 3
17:50:49.109 [main] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 4
17:50:49.109 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # even filter 2
17:50:49.109 [main] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 5
17:50:49.109 [main] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 6
17:50:49.109 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # even filter 4
17:50:49.109 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # even filter 6

 

publishOn 오퍼레이터 upstream에 있는 doOnNext 오퍼레이터의 로깅 결과를 보면 main 메서드에서 실행된 것을 볼 수 있습니다.

 

 

subscribeOn()

 

반면 subscribeOn은 발행 직후 실행될 스레드를 제어합니다. 이에 따라 donOnSubscribe 에서 찍히는 로그를 보면 subscribeOn을 통해 설정한 스레드인 boundedElasitc thread 를 볼 수 있습니다.

 

 

Flux.fromArray(new Integer[]{1,2,3,4,5,6})
        .doOnNext(data -> log.info("# doOnNext {}", data))
        .doOnSubscribe(init -> log.info("# init"))
        .subscribeOn(Schedulers.boundedElastic())
        .filter(data -> data % 2 == 0)
        .doOnNext(data -> log.info("# even filter {}", data))
        .subscribe();

 

결과

18:22:38.778 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # init
18:22:38.783 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 1
18:22:38.786 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 2
18:22:38.786 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # even filter 2
18:22:38.786 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 3
18:22:38.787 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 4
18:22:38.787 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # even filter 4
18:22:38.787 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 5
18:22:38.787 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # doOnNext 6
18:22:38.787 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SchedulerOperator -- # even filter 6

 

 

정리

publishOn() : downstream 의 스레드를 제어

subscribeOn() 발행 직후 스레드를 제어

 

 

 

scheduler 종류


Reactor 에서 지원하는 scheduler 몇 가지를 정리하도록 하겠습니다. 많은 scheduler가 존재하는데요 포스팅에서 정리할 scheduler는 순서대로 immediate, single, boundedElasitc, parallel 입니다.

 

1. immediate()

별도의 스레드를 추가로 생성하지 않고 현재 스레드에서 작업을 처리합니다. 바로 코드로 살펴봅시다.

 

Flux.fromArray(new Integer[]{1,3,5,7})
        .publishOn(Schedulers.parallel())
        .filter(data -> data > 3)
        .doOnNext(data -> log.info("# doOnNext filter: {}", data))
        .publishOn(Schedulers.immediate())
        .map(data -> data * 10)
        .doOnNext(data -> log.info("# doOnNext map : {}", data))
        .subscribe();

Thread.sleep(200L);

 

결과

18:33:08.835 [parallel-1] INFO com.example.webfluxstart.scheduler.immediate -- # doOnNext filter: 5
18:33:08.842 [parallel-1] INFO com.example.webfluxstart.scheduler.immediate -- # doOnNext map : 50
18:33:08.843 [parallel-1] INFO com.example.webfluxstart.scheduler.immediate -- # doOnNext filter: 7
18:33:08.843 [parallel-1] INFO com.example.webfluxstart.scheduler.immediate -- # doOnNext map : 70

 

굳이 스레드를 변경하지 않는 immediate() 를 위 예제에서는 사용할 필요가 있나 싶습니다. 

 

public void execute(Scheduler scheduler)

 

하지만 위와 같이 Scheduler 를 인자로 받는 API 가 존재할 때, 현재 스레드를 변경하지 않는 용도로 사용할 때 유용할 것입니다. 

 

 

2. single()

single()은 스레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용하는 방식입니다.

 

public class Single {
    public static void main(String[] args) throws InterruptedException {
        doTask("task1", Schedulers.single())
                .subscribe(data -> log.info("# onNext: {}", data));

        doTask("task2", Schedulers.single())
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(200L);
    }

    private static Flux<Integer> doTask(String taskName, Scheduler scheduler) {
        return Flux.fromArray(new Integer[]{1,3,5,7})
                .publishOn(scheduler)
                .filter(data -> data < 3)
                .doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
                .map(data -> data * 10)
                .doOnNext(data -> log.info("# {} doOnNext map : {}", taskName, data));
    }
}

 

 

결과

18:42:04.973 [single-1] INFO com.example.webfluxstart.scheduler.Single -- # task1 doOnNext filter: 1
18:42:04.976 [single-1] INFO com.example.webfluxstart.scheduler.Single -- # task1 doOnNext map : 10
18:42:04.976 [single-1] INFO com.example.webfluxstart.scheduler.Single -- # onNext: 10
18:42:04.977 [single-1] INFO com.example.webfluxstart.scheduler.Single -- # task2 doOnNext filter: 1
18:42:04.977 [single-1] INFO com.example.webfluxstart.scheduler.Single -- # task2 doOnNext map : 10
18:42:04.977 [single-1] INFO com.example.webfluxstart.scheduler.Single -- # onNext: 10

 

위 로깅 결과와 같이 single-1 한 스레드에서 작업을 처리하는 것을 볼 수 있습니다. 하나의 스레드로 다수의 작업을 처리해야 되므로 지연 시간이 짧은 작업을 처리하는 것이 효과적입니다.

 

 

3. boundedElastic()

boundedElastic()은 ExecutorService 기반의 스레드 풀을 생성한 후, 그 안에서 정해진 수 만큼의 스레드를 사용하여 작업을 처리하고 작업이 종료된 스레드는 반납하여 재사용합니다.

 

참고로 ExecutorService Java5에서 추가된 Executor의 구현체입니다. 주요 특징으로는 스레드를 직접 생성하는 것이 아니라 스레드 풀에서 스레드를 꺼내와서 작업을 실행합니다. 이외에도 Callable 인터페이스를 실행할 수 있기도 합니다. 자세한 내용은 Java - Executor 을 참고하시면 좋을 것 같습니다.

 

다시 돌아와서 책의 내용을 정리하겠습니다. 기본적으로 CPU 코어 수 x 10개 만큼의 스레드를 생성합니다. 모든 스레드가 작업을 처리하고 있다면 이용 가능한 스레드가 생길 때까지 최대 100,000개의 작업이 큐(Queue)에서 대기할 수 있습니다. 

 

예제와 다르게 실제 애플리케이션에서는 데이터베이스, HTTP 요청과 같은 Blocking I/O 작업을 통해 전달받은 데이터를 데이터 소스로 사용하는 경우가 많습니다. 이러한 Blocking I/O 작업을 효과적으로 처리하기 위한 방식입니다.

 

실행 시간이 긴 Blocking I/O 작업이 포함된 경우, 다른 Non-Blocking 처리에 영향을 주지 않도록 전용 스레드를 할당해서 Blocking I/O 작업을 처리하기 때문에 처리 시간을 효율적으로 사용할 수 있습니다.

 

 

 

4. parallel()

scheduler 포스팅 1편에서도 정리했는데요. parallel()은 Non-Blocking I/O에 최적화되어 있는 Scheduler라고 합니다. 하드웨어의 물리적인 스레드 수 만큼 스레드를 생성합니다.

 

책의 설명이 간략해서 제 나름대로 추론을 해본다면 parallel은 생성한 스레드 수 만큼 작업을 병렬적으로 수행합니다. 만약 Blocking I/O 라면 병렬적으로 생성된 스레드들이 결국 작업의 결과를 기다려야 합니다. 생성된 스레드가 모두 Blocking 상태가 된다면 병렬적으로 처리하는 것의 효과가 떨어질 것입니다. 이러한 이유로 인해 결과를 기다리지 않아도 되는 Non-Blocking 작업에 최적화되어 있다는 것이 아닐까 하는 생각이 드네요.

 

 

 


위에서 설명한 4가지 Scheduler 에 대한 설명은 책에 나와있는 내용이기도 하지만, Reactor API docs 에서도 확인할 수 있는 내용입니다.