retry, retryWhen, repeat

retry()

  • Use this operator to retry failed exceptions

  • When to use it?

    • Code interacts with external systems through network

      • Examples are : RestFul API calls, DB Calls

    • these calls may fail intermittently

에러가 발생하면 무한히 다시 subscribe()를 시도한다. onComplete() 을 받지 못했다면 끝없이 subscribe()를 다시 시도한다.

    @Test
    void retryTest() throws InterruptedException {
        AtomicInteger index = new AtomicInteger();
        Flux<Integer> numbersWithError = Flux.fromIterable(List.of(1, 2, 3))
                .concatWith(Mono.error(new RuntimeException()))
                .onErrorResume(exception -> {
                    if (index.get() == 5) {
                        System.out.println("index equals 5");
                        return Mono.just(10);
                    } else {
                        System.out.println("index < 5");
                        return Mono.error(new RuntimeException());
                    }
                })
                .doOnError(ex -> {
                    index.getAndIncrement();
                })
                .retry()
                .log();

        numbersWithError.subscribe();
        Thread.sleep(10000L);
    }
22:04:52.564 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
22:04:52.602 [main] INFO reactor.Flux.Retry.1 - onSubscribe(FluxRetry.RetrySubscriber)
22:04:52.606 [main] INFO reactor.Flux.Retry.1 - request(unbounded)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index equals 5
22:04:52.609 [main] INFO reactor.Flux.Retry.1 - onNext(10)
22:04:52.609 [main] INFO reactor.Flux.Retry.1 - onComplete()

반면에 아래와 같이 parameter로 retry count를 넣어주면 정해준 count 만큼만 retry를 시도한다.

retryWhen()

retryWhen은 공식 문서의 설명이 너무 길어서 마블 다이어그램만 발췌해 왔다. retry를 무작정 하지 않고 retrySpec에 의거하여 retry 를 한다는 것이 특징이다. 사실 실무에서는 retry 보다 retryWhen 을 쓸 가능성이 크다고 판단된다. 특히 정상적인 요청에 대해서 상대 서버가 간헐적으로 이상한 값을 내려준다면 이를 조건적으로 판단해서 retry 해주는 로직이 필요하므로 그 때 사용하면 좋다.

    @Test
    void retryWhenTest() throws InterruptedException {
        Flux<Integer> numbersWithError = Flux.fromIterable(List.of(1, 2, 3))
                .concatWith(Mono.error(new IllegalStateException()))
                .doOnError(exception -> {
                    System.out.println("exception : " + exception.getClass().getName());
                })
                .log();

        Retry retrySpec1 = Retry.backoff(3, Duration.ofMillis(300L))
                .filter(exception -> exception instanceof IllegalStateException);

        Retry retrySpec2 = Retry.backoff(3, Duration.ofMillis(300L))
                .filter(exception -> exception instanceof IllegalAccessError);


        //numbersWithError.retryWhen(retrySpec1).subscribe();
        numbersWithError.retryWhen(retrySpec2).subscribe();

        Thread.sleep(10000L);
    }

repeat()

  • Used to repeat an existing sequence

  • This operator gets invoked after the onCompletion() event from the existing sequence

  • Use it when you have an use-case to subscribe to same publisher again

  • This operator works as long as No Exception is thrown

다시 구독(=반복)을 하기 위한 연산자이다. 강의에서도 설명하고 있고, 공식 문서에도 나와있듯이 onComplete() 이 실행되어야만 반복 구독을 실행한다. retry() 와 마찬가지로 parameter를 넣어주지 않으면 무한히 반복하고 parameter로 반복 횟수를 제한 할 수 있다.

당연한 이야기이지만 구독중에 에러가 발생할 경우 repeat은 동작하지 않는다. repeat 이 onComplete() 이후 실행된다는 것을 생각해봐도 그렇고, 구독중 에러가 발생하면 repeat 연산자를 만나기 전에 이미 구독 흐름이 중단되어버린다는 것을 생각해봐도 이것이 이치에 맞다고 인식할 수 있다.

    @Test
    void repeatWithErrorTest() {
        Flux<Integer> numbers = Flux.fromIterable(List.of(1, 2, 3))
                .concatWith(Mono.error(new RuntimeException()))
                .repeat(1).log();

        StepVerifier.create(numbers)
                .expectNext(1, 2, 3)
                .expectError(RuntimeException.class)
                .verify();
    }

    @Test
    void repeatTest() {
        Flux<Integer> numbers = Flux.fromIterable(List.of(1, 2, 3)).repeat(1).log();

        StepVerifier.create(numbers)
                .expectNext(1, 2, 3, 1, 2, 3)
                .verifyComplete();
    }

Last updated