목차
1. 물리적인 스레드 vs 논리적인 스레드
2. Reactor 의 Scheduler
3. Schduler 대표 operator - subscribeOn(), publishOn(), parallel()
물리적인 스레드 VS 논리적인 스레드
Reactor에서 사용되는 Scheduler는 Reactor Sequence에서 사용되는 스레드를 관리해 주는 관리자 역할을 합니다. 이를 위해 물리적인 스레드/논리적인 스레드가 무엇인지 정리하고 갑시다.
물리적인 스레드(논리적인 코어)
아래 그림은 CPU 사양이 듀얼코어 4 스레드를 도식화한 그림이다. 코어를 보면 2개의 스레드를 포함하고 있다. 이는 한 개의 물리적인 코어를 논리적으로 나눈 것을 의미하고 이를 물리적인 스레드라고 부른다. 그리고 다른 말로는 논리적인 코어라고 부른다.
논리적인 스레드
소프트웨어적으로 생성되는 스레드를 의미한다. Java 프로그래밍에서 사용되는 스레드가 논리적인 스레드의 예시이다. 논리적인 스레드는 프로세스 내에서 실행되는 세부 작업의 단위가 된다.
논리적인 스레드는 이론적으로는 메모리가 허용하는 범위 내에서 얼마든지 만들 수 있다. 그러나 물리적인 스레드의 가용 범위 내에서 실행될 수 있습니다.
정리
물리적인 스레드, 물리적인 코어를 논리적으로 나눈 것이다. 논리적인 코어라고도 불린다.
논리적인 스레드, 소프트웨어 내에서 생성되는 스레드이다. 생성은 메모리 허용 범위 내에서 얼마든지 할 수 있으나 실행 범위는 물리적인 스레드의 제약을 받는다.
Scheduler
Reactor의 Scheduler는 비동기 프로그래밍을 위해 사용되는 스레드를 관리해 주는 역할을 합니다. 다시 말해 Scheduler를 사용하여 어떤 스레드에서 무엇을 처리할지 제어합니다.
개발자가 직접 멀티스레드 환경에서 스레드를 제어하는 것은 쉬운 일이 아니라고 합니다. 이러한 문제를 최소화할 수 있도록 도와주는 것이 Reactor의 Scheduler입니다.
Scheduler를 위한 전용 Operator
Reactor 스레드 관리를 위해 대표적으로 사용되는 subscribeOn(), publishOn(), parallel() 세 가지를 살펴보도록 하겠습니다.
1. subscribeOn()
발행 직후 실행될 스레드를 지정하는 Operator입니다.
Flux.fromArray(new Integer[]{1, 3})
.subscribeOn(Schedulers.boundedElastic()) // 스레드 할당
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe")) // 발행 시점의 스레드 확인을 위한 로깅
.subscribe(data -> log.info("# onNext: {}", data));
결과
01:58:27.176 [main] INFO com.example.webfluxstart.scheduler.SubscribeOn -- # doOnSubscribe
01:58:27.183 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SubscribeOn -- # doOnNext: 1
01:58:27.184 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SubscribeOn -- # onNext: 1
01:58:27.184 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SubscribeOn -- # doOnNext: 3
01:58:27.185 [boundedElastic-1] INFO com.example.webfluxstart.scheduler.SubscribeOn -- # onNext: 3
발행 시점에 main 스레드였지만 subscribeOn 오퍼레이터를 통해 스레드를 boundedElastic 유형으로 지정하였습니다.
2. publishOn()
downstream으로 Signal을 전송할 때 실행되는 스레드를 제어하는 역할을 합니다.
Flux.fromArray(new Integer[]{1, 3})
.doOnNext(data -> log.info("# doOnNext: {}", data)) // 첫 emit 에서 스레드
.doOnSubscribe(subscription -> log.info("# doOnSubscribe")) // 발행 시점의 스레드
.publishOn(Schedulers.parallel()) // downstream 스레드 변경
.subscribe(data -> log.info("# onNext: {}", data)); // 변경된 스레드 로깅
결과
02:13:45.640 [main] INFO com.example.webfluxstart.scheduler.PublishOn -- # doOnSubscribe
02:13:45.650 [main] INFO com.example.webfluxstart.scheduler.PublishOn -- # doOnNext: 1
02:13:45.652 [main] INFO com.example.webfluxstart.scheduler.PublishOn -- # doOnNext: 3
02:13:45.652 [parallel-1] INFO com.example.webfluxstart.scheduler.PublishOn -- # onNext: 1
02:13:45.652 [parallel-1] INFO com.example.webfluxstart.scheduler.PublishOn -- # onNext: 3
이런 식으로 publishOn 밑 downstream인 subcribe 오퍼레이터에서는 parallel 스레드를 사용합니다.
3. parallel()
parallel은 병렬성과 관련있습니다. 해당 오퍼레이터의 경우 라운드 로빈 방식으로 CPU 코어 개수만큼의 스레드를 병렬로 실행합니다.
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(4) // 파라미터로 사용하고자 하는 스레드의 개수가 들어감
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
결과
02:22:40.111 [parallel-2] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 3
02:22:40.119 [parallel-2] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 11
02:22:40.114 [parallel-4] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 7
02:22:40.114 [parallel-1] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 1
02:22:40.111 [parallel-3] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 5
02:22:40.120 [parallel-4] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 15
02:22:40.119 [parallel-2] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 19
02:22:40.120 [parallel-3] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 13
02:22:40.120 [parallel-1] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 9
02:22:40.120 [parallel-1] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 17
앞서, 논리적인 스레드는 메모리가 허용하는 범위 내에서 무한히 만들 수 있지만, 실행은 물리적인 스레드의 제약을 받는다고 했습니다. 현재 제가 코드를 실행하고 있는 환경에서 물리적인 스레드는 8개 입니다.
따라서 아래와 같이 스레드를 20개로 설정하더라도 8개 이상 사용하지 못합니다.
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(20) // 20개로 설정했지만 8개 이상 실행하지 못함
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
결과
02:28:18.138 [parallel-3] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 5
02:28:18.138 [parallel-2] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 3
02:28:18.138 [parallel-4] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 7
02:28:18.138 [parallel-5] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 9
02:28:18.138 [parallel-6] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 11
02:28:18.138 [parallel-7] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 13
02:28:18.138 [parallel-1] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 1
02:28:18.138 [parallel-8] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 15
02:28:18.148 [parallel-3] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 21
02:28:18.148 [parallel-1] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 17
02:28:18.148 [parallel-5] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 25
02:28:18.148 [parallel-4] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 23
02:28:18.148 [parallel-2] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 19
02:28:18.148 [parallel-6] INFO com.example.webfluxstart.scheduler.Parallel -- # onNext: 27
'Spring > web-flux' 카테고리의 다른 글
Spring Webflux (6) Sequence 생성 Operator - justOrEmpty, defer, fromIterable (0) | 2023.07.25 |
---|---|
Spring Webflux (5) Scheduler 2 (1) | 2023.07.21 |
Spring Webflux (3) Backpressure (0) | 2023.07.17 |
Spring Webflux (2) Cold/Hot Sequence (0) | 2023.07.17 |
Spring Webflux (1) Reactive Streams 용어 정리 (0) | 2023.07.03 |