웹 애플리케이션을 만듬에 있어 예외를 처리하는 것은 재밋기도하고 매우 중요한 작업이라고 생각하는데요. 그러다보니 관련 오퍼레이터 대부분이 중요하다고 생각되더라구요. 이번 포스팅에서는 5개의 예외 처리 오퍼레이터를 정리합니다.
목차와 함께 해당 오퍼레이터의 특징을 요약하였습니다.
목차
1. error : 예외 전파
2. onErrorReturn : 예외 전파 X -> 다른 값으로 대체하여 emit
3. onErrorResume : 예외 전파 X -> 새로운 Publisher로 대체
4. onErrorContinue : 예외 전파 X -> 후속 작업을 진행
5. retry : 예외 발생 시 재시도, 설정한 재시도 횟수 안에 작업을 마치지 못할 경우 예외 전파
error
error() 오퍼레이터가 시그널을 받으면 Publisher를 종료합니다. 예를 들어 아래 1부터 5를 emit하는 Flux가 있을 때 3을 emit할 때 예외가 발생하게 됩니다. 이후 4, 5는 emit하지 않고 Flux 를 종료합니다.
public static void main(String[] args) {
Flux.range(1, 5)
.flatMap(num -> {
if (num % 3 == 0) {
return Flux.error(
new IllegalArgumentException("Not allowed multiple of 3"));
} else {
return Mono.just(num);
}
})
.subscribe(data -> log.info("# onNext: {}", data),
error -> log.error("#onError: ", error));
}
실행 결과
12:04:07.169 [main] INFO com.example.operators.error.Error -- # onNext: 1
12:04:07.173 [main] INFO com.example.operators.error.Error -- # onNext: 2
12:04:07.180 [main] ERROR com.example.operators.error.Error -- #onError:
onErrorReturn
마블 다이어그램을 보면 알수있듯이 onErrorReturn 오퍼레이터는 대체 값을 emit 합니다. 여기서 중요한 것은 예외를 전파하지 않는다는 점입니다.
아래 Flux 에서는 기존대로라면 3번 째 데이터가 null이기 때문에 NPE가 발생해야 합니다. 하지만 onErrorReturn 을 통해 대체 값을 대신 emit 합니다.
public static void main(String[] args) {
Flux.just("spring", "java", null, "jpa")
.map(topic -> topic.toUpperCase())
.onErrorReturn("ANONYMOUS")
.subscribe(log::info);
}
실행 결과
12:09:27.973 [main] INFO com.example.operators.error.OnErrorReturn -- SPRING
12:09:27.976 [main] INFO com.example.operators.error.OnErrorReturn -- JAVA
12:09:27.976 [main] INFO com.example.operators.error.OnErrorReturn -- ANONYMOUS
여기서 추가적으로 예외가 발생하면 이후 남은 데이터는 emit하지 않고 Publisher를 종료합니다.
onErrorReturn 오퍼레이터를 통해서 특정 예외를 타겟해서 대체 값을 emit 할 수도 있습니다. Topic 객체의 name 이 spring일 때 NotAllowedException이 발생하도록 하였습니다.
public static void main(String[] args) {
List<Topic> topics = List.of(topic("spring"), topic(null), topic("java"), topic("jpa"));
Flux.fromIterable(topics)
.filter(topic -> topic.validate())
.map(topic -> topic.toUpper())
.onErrorReturn(NullPointerException.class,"ANONYMOUS")
.onErrorReturn(NotAllowedException.class,"spring is not allowed")
.subscribe(log::info);
}
실행 결과
12:36:50.331 [main] INFO com.example.operators.error.OnErrorReturnV2 -- spring is not allowed
onErrorResume
마블 다이어그램을 보면 새로운 예외(X 표시가 예외를 의미)가 발생하면 새로운 sequence가 생성되는 것을 볼 수 있스비다. 즉, onErrorResume은 예외가 발생했을 때 downstream으로 예외를 전파하지 않고 대체 Publisher를 리턴합니다.
예제를 보시면 onErrorResume 오퍼레이터를 더 잘 이해할 수 있을 것이에요. 예제가 조금 길어서 코드를 나눠서 설명하겠습니다.
해당 코드는 캐시에 저장된 데이터를 의미합니다.
private static Map<Long, Item> itemsLocalCache = Map.of(
1l, Item.builder().id(1l).name("마라탕").price(15000).build(),
2l, Item.builder().id(2l).name("차돌짬뽕").price(12000).build(),
3l, Item.builder().id(3l).name("연어 포케").price(10000).build()
);
해당 코드는 데이터베이스에 저장된 데이터를 의미합니다.
private static Map<Long, Item> itemDatabase = Map.of(
1l, Item.builder().id(1l).name("마라탕").price(15000).build(),
2l, Item.builder().id(2l).name("차돌짬뽕").price(12000).build(),
3l, Item.builder().id(3l).name("연어 포케").price(10000).build(),
4l, Item.builder().id(4l).name("짜장면").price(9000).build()
);
일반적인 look aside cache 패턴인 사용자 요청 -> 캐시 -> 데이터베이스 순으로 접근한다고 가정합시다. 데이터베이스는 캐시 미스가 일어날 때 접근합니다.
예제 코드
public static void main(String[] args) {
Long requestItemId = 4l;
Mono.just(requestItemId)
.map(id -> itemsLocalCache.get(id)) // key 값이 존재하지 않을 경우 NPE 발생
.onErrorResume(NullPointerException.class, error ->
Mono.just(requestItemId)
.doOnNext(data -> log.info("캐시에 데이터가 존재하지 않아 DB를 조회합니다."))
.map(id -> itemDatabase.get(id)))
.subscribe(data -> log.info("data {}", data));
}
실행 결과
13:27:05.237 [main] INFO com.operators.error.OnErrorResume -- 캐시에 데이터가 존재하지 않아 DB를 조회합니다.
13:27:05.242 [main] INFO com.operators.error.OnErrorResume -- data Item(id=4, name=짜장면, price=9000)
onErrorContinue
onErrorContinue 오퍼레이터는 예외가 발생했을 때, 예외 영역 내에 있는 데이터는 제거하고 Upstream에서 후속 데이터를 emit하는 방식으로 에러를 복수할 수 있도록 해줍니다. 쉽게 말해 에러가 발생하더라도 emit 해야할 데이터가 남아있다면 Publisher를 종료하지 않고 작업을 진행합니다.
예제 코드
public static void main(String[] args) {
Flux
.just(1,2,4,0,6,12)
.map(num -> 12 / num)
.onErrorContinue(ArithmeticException.class, (error, num) ->
log.error("error : {}, num: {}", error.getMessage(), num))
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:" , error)
);
}
실행 결과
13:47:10.641 [main] INFO com.example.operators.error.OnErrorContinue -- # onNext: 12
13:47:10.647 [main] INFO com.example.operators.error.OnErrorContinue -- # onNext: 6
13:47:10.647 [main] INFO com.example.operators.error.OnErrorContinue -- # onNext: 3
13:47:10.653 [main] ERROR com.example.operators.error.OnErrorContinue -- error : / by zero, num: 0
13:47:10.654 [main] INFO com.example.operators.error.OnErrorContinue -- # onNext: 2
13:47:10.654 [main] INFO com.example.operators.error.OnErrorContinue -- # onNext: 1
retry
retry는 에러가 발생했을 때 처음부터 데이터를 다시 emit 합니다. 외부 API 호출, 데이터베이스 접근 등 결과 값을 받는데 시간이 오래걸릴 때 유용할 수 있을 것 같습니다.
예제 코드를 통해 retry 오퍼레이터를 다시 살펴봅시다.
public static void main(String[] args) throws InterruptedException {
final int[] count = {1};
Flux.range(1, 3)
.delayElements(Duration.ofSeconds(1)) // 1초 마다 데이터 emit
.map(num -> {
try {
if (num == 3 && count[0] == 1) {
count[0]++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {}
return num;
})
.timeout(Duration.ofMillis(1500)) // 작업이 1.5초 이상 걸리면 인터럽트가 발생
.retry(1) // 에러 발생 시 1번 재시도
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError: ", error),
() -> log.info("#onComplete"));
Thread.sleep(7000);
}
데이터 3 count[0] 이 1일 때, if문 분기로 접어들어 시간이 1초+ 걸리는 작업을 수행합니다. 이로 인해 1.5초가 넘게 됩니다.
(딜레이 1초, if 분기 작업 1초) 이로 인해 에러가 발생하게 됩니다.
실행 결과
14:10:37.713 [parallel-1] INFO com.example.operators.error.Retry -- # onNext: 1
14:10:38.724 [parallel-4] INFO com.example.operators.error.Retry -- # onNext: 2
14:10:41.240 [parallel-7] INFO com.example.operators.error.Retry -- # onNext: 1
14:10:42.246 [parallel-2] INFO com.example.operators.error.Retry -- # onNext: 2
14:10:43.257 [parallel-4] INFO com.example.operators.error.Retry -- # onNext: 3
14:10:43.258 [parallel-4] INFO com.example.operators.error.Retry -- #onComplete
마지막으로 retry 횟수안에 작업을 성공시키지 못하면 예외가 전파되게 됩니다.
실행 결과
14:11:42.434 [parallel-1] INFO com.example.operators.error.Retry -- # onNext: 1
14:11:43.458 [parallel-4] INFO com.example.operators.error.Retry -- # onNext: 2
14:11:45.976 [parallel-7] INFO com.example.operators.error.Retry -- # onNext: 1
14:11:46.988 [parallel-2] INFO com.example.operators.error.Retry -- # onNext: 2
14:11:48.496 [parallel-3] ERROR com.example.operators.error.Retry -- # onError:
'Spring > web-flux' 카테고리의 다른 글
Blocking I/O, Non-Blocking I/O (0) | 2023.08.04 |
---|---|
Spring Webflux (9) WebClient (0) | 2023.07.29 |
Spring Webflux (7) Sequence 변환 Operator - map, flatMap, zip (0) | 2023.07.27 |
Spring Webflux (6) Sequence 생성 Operator - justOrEmpty, defer, fromIterable (0) | 2023.07.25 |
Spring Webflux (5) Scheduler 2 (1) | 2023.07.21 |