본문 바로가기

Spring/web-flux

Spring Webflux (6) Sequence 생성 Operator - justOrEmpty, defer, fromIterable

Reactor API에서 제공하는 오퍼레이터의 종류는 매우 다양합니다. 현재 읽고있는 책에서도 오퍼레이터를 8종류로 분류해서 설명하는데요. 저의 상상력 부족 혹은 현재 구상하고 있는 프로젝트를 고려했을 때 8종류를 모두 정리하는 것은 비효율적이라고 생각했어요. 무엇보다 활용처가 딱히 떠오르지 않는 것들도 많았습니다.

 

개인적으로 범용성이 높다고 생각하는 오퍼레이터 위주로 설명하도록 하겠습니다. Sequence 생성 Operator는 쉽게 말해 Publisher를 만드는 기능을 담당하고 있습니다.

 

 

목차

1. justOrEmpty

2. defer

3. fromIterable

 

justOrEmpty


 

 

justOrEmpty 마블 다이어그램

 

justOrEmpty()는 just()를 확장한 오퍼레이터입니다. 만약 null, 다르게 말하면 Optional.empty() 일 경우 onComplete 시그널을 전송합니다. just() 경우 emit 할 데이터가 null 일 경우 NPE를 던집니다.

 

public static void main(String[] args) {
    Mono
        .justOrEmpty(null)
        .subscribe(
                data -> {},
                error -> {},
                () -> log.info("# onComplete"));
}

 

 

실행 결과

13:37:58.131 [main] INFO com.example.webfluxstart.operators.create.Just -- # onComplete

 

 

 

defer


 

defer 마블 다이어그램

 

defer() 오퍼레이터는 오퍼레이터를 선언한 시점에 데이터를 emit하는 것이 아니라 구독(subscribe)하는 시점까지 데이터emit을 지연(deferred)하고 구독 시점에 데이터를 emit하는 publisher를 생성합니다. 아래 코드를 통해 이것이 어떤 의미인지 파악해보도록 하겠습니다.

 

public static void main(String[] args) throws InterruptedException {
    log.info("# start: {}", LocalDateTime.now());
    Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
    Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now()));

    Thread.sleep(2000);

    justMono.subscribe(data -> log.info("# onNext just1: {}", data));
    deferMono.subscribe(data -> log.info("# onNext defer1: {}", data));

    Thread.sleep(2000);

    justMono.subscribe(data -> log.info("# onNext just2: {}", data));
    deferMono.subscribe(data -> log.info("# onNext defer2: {}", data));
}

 

1. just(), defer() 를 통해 시퀀스를 생성하여 현재 시간을 emit 하도록 정의했습니다.

2. 2초 지연 후, justMono, deferMono 각각 한 번씩 구독합니다.

3. 다시 2초 지연 후, justMono, deferMono 각각 한 번씩 구독합니다.

 

실행 결과

13:57:16.318 [main] INFO com.operators.create.Defer -- # start: 2023-07-25T13:57:16.315411900
13:57:18.450 [main] INFO com.operators.create.Defer -- # onNext just1: 2023-07-25T13:57:16.324514
13:57:18.451 [main] INFO com.operators.create.Defer -- # onNext defer1: 2023-07-25T13:57:18.451255100
13:57:20.467 [main] INFO com.operators.create.Defer -- # onNext just2: 2023-07-25T13:57:16.324514
13:57:20.469 [main] INFO com.operators.create.Defer -- # onNext defer2: 2023-07-25T13:57:20.469145600

 

결과를 보면 deferMono의 경우 예상대로 emit된 현재 시간 데이터가 2초의 간격을 두고 emit되었음을 알 수 있습니다.

그런데 justMono의 경우 출력된 현재 시간 데이터가 지연 시간과 상관없이 동일한 시간을 출력합니다.

 

그 이유는 just가 구독 여부와는 상관없이 데이터를 emit하는 Hot Publisher이기 때문입니다. 그리고 구독이 발생하면 emit된 데이터를 다시 재생하여 Subscriber에게 전달합니다. 따라서 justMono의 경우 출력 결과가 동일합니다.

 

돌아와서 defer() 오퍼레이터는 emit을 실제 구독이 일어나는 시점까지 미룰 수 있기 때문에 불필요한 메서드 호출을 막을 수 있습니다. 예를 들어 아래와 같은 경우 defer() 오퍼레이터를 사용하면 효율적일 수 있다.

 

public static void main(String[] args) throws InterruptedException {
    log.info("# start: {}", LocalDateTime.now());
    Mono.just("Hello")
            .delayElement(Duration.ofSeconds(3))
            .switchIfEmpty(sayDefault())
//                .switchIfEmpty(Mono.defer(() -> sayDefault()))
            .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(3500);
}

private static Mono<String> sayDefault() {
    log.info("# Say Hi");
    return Mono.just("Hi");
}

 

 

실행 결과

18:16:31.144 [main] INFO com.operators.create.Defer2 -- # start: 2023-07-25T18:16:31.140977600
18:16:31.293 [main] INFO com.operators.create.Defer2 -- # Say Hi
18:16:34.329 [parallel-1] INFO com.operators.create.Defer2 -- # onNext: Hello

 

switchIfEmpty()는 emit되는 데이터가 존재하지 않을 때 디폴트로 emit할 데이터를 지정하는 오퍼레이터입니다. 실행 결과를 보면 just() 오퍼레이터를 통해 "hello" 라는 String 타입 데이터가 들어갔음에도 불구하고 sayDefault() 메서드가 실행된다. defer()를 사용할 경우, 이런 불필요한 실행을 막을 수 있다.

 

 

 

fromIterable


 

fromIterable 마블 다이어그램

 

fromIterable() 오퍼레이터는 Iterable 인터페이스를 구현하고 있는 객체의 데이터를 emit하는 Flux를 생성합니다.

 

public static void main(String[] args) {
    Flux
        .fromIterable(People.samples())
        .subscribe(people -> log.info("이름 {}, 나이 {}", people.getName(), people.getAge())
        );
}

@Getter
static class People {
    String name;
    Integer age;

    public People(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    public static List<People> samples() {
        return List.of(
                new People("jxx", 15),
                new People("xuni", 24),
                new People("yani", 21),
                new People("shy", 39));
    }
}

 

 

실행 결과

13:43:52.184 [main] INFO com.example.webfluxstart.operators.create.FromIterable -- 이름 jxx, 나이 15
13:43:52.190 [main] INFO com.example.webfluxstart.operators.create.FromIterable -- 이름 xuni, 나이 24
13:43:52.191 [main] INFO com.example.webfluxstart.operators.create.FromIterable -- 이름 yani, 나이 21
13:43:52.191 [main] INFO com.example.webfluxstart.operators.create.FromIterable -- 이름 shy, 나이 39