retry, retryWhen, repeat

retry()

  • Use this operator to retry failed exceptions

  • When to use it?

    • Code interacts with external systems through network

      • Examples are : RestFul API calls, DB Calls

    • these calls may fail intermittently

์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ๋ฌดํ•œํžˆ ๋‹ค์‹œ subscribe()๋ฅผ ์‹œ๋„ํ•œ๋‹ค. onComplete() ์„ ๋ฐ›์ง€ ๋ชปํ–ˆ๋‹ค๋ฉด ๋์—†์ด subscribe()๋ฅผ ๋‹ค์‹œ ์‹œ๋„ํ•œ๋‹ค.

    @Test
    void retryTest() throws InterruptedException {
        AtomicInteger index = new AtomicInteger();
        Flux<Integer> numbersWithError = Flux.fromIterable(List.of(1, 2, 3))
                .concatWith(Mono.error(new RuntimeException()))
                .onErrorResume(exception -> {
                    if (index.get() == 5) {
                        System.out.println("index equals 5");
                        return Mono.just(10);
                    } else {
                        System.out.println("index < 5");
                        return Mono.error(new RuntimeException());
                    }
                })
                .doOnError(ex -> {
                    index.getAndIncrement();
                })
                .retry()
                .log();

        numbersWithError.subscribe();
        Thread.sleep(10000L);
    }
22:04:52.564 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
22:04:52.602 [main] INFO reactor.Flux.Retry.1 - onSubscribe(FluxRetry.RetrySubscriber)
22:04:52.606 [main] INFO reactor.Flux.Retry.1 - request(unbounded)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index equals 5
22:04:52.609 [main] INFO reactor.Flux.Retry.1 - onNext(10)
22:04:52.609 [main] INFO reactor.Flux.Retry.1 - onComplete()

๋ฐ˜๋ฉด์— ์•„๋ž˜์™€ ๊ฐ™์ด parameter๋กœ retry count๋ฅผ ๋„ฃ์–ด์ฃผ๋ฉด ์ •ํ•ด์ค€ count ๋งŒํผ๋งŒ retry๋ฅผ ์‹œ๋„ํ•œ๋‹ค.

retryWhen()

retryWhen์€ ๊ณต์‹ ๋ฌธ์„œ์˜ ์„ค๋ช…์ด ๋„ˆ๋ฌด ๊ธธ์–ด์„œ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ๋งŒ ๋ฐœ์ทŒํ•ด ์™”๋‹ค. retry๋ฅผ ๋ฌด์ž‘์ • ํ•˜์ง€ ์•Š๊ณ  retrySpec์— ์˜๊ฑฐํ•˜์—ฌ retry ๋ฅผ ํ•œ๋‹ค๋Š” ๊ฒƒ์ด ํŠน์ง•์ด๋‹ค. ์‚ฌ์‹ค ์‹ค๋ฌด์—์„œ๋Š” retry ๋ณด๋‹ค retryWhen ์„ ์“ธ ๊ฐ€๋Šฅ์„ฑ์ด ํฌ๋‹ค๊ณ  ํŒ๋‹จ๋œ๋‹ค. ํŠนํžˆ ์ •์ƒ์ ์ธ ์š”์ฒญ์— ๋Œ€ํ•ด์„œ ์ƒ๋Œ€ ์„œ๋ฒ„๊ฐ€ ๊ฐ„ํ—์ ์œผ๋กœ ์ด์ƒํ•œ ๊ฐ’์„ ๋‚ด๋ ค์ค€๋‹ค๋ฉด ์ด๋ฅผ ์กฐ๊ฑด์ ์œผ๋กœ ํŒ๋‹จํ•ด์„œ retry ํ•ด์ฃผ๋Š” ๋กœ์ง์ด ํ•„์š”ํ•˜๋ฏ€๋กœ ๊ทธ ๋•Œ ์‚ฌ์šฉํ•˜๋ฉด ์ข‹๋‹ค.

    @Test
    void retryWhenTest() throws InterruptedException {
        Flux<Integer> numbersWithError = Flux.fromIterable(List.of(1, 2, 3))
                .concatWith(Mono.error(new IllegalStateException()))
                .doOnError(exception -> {
                    System.out.println("exception : " + exception.getClass().getName());
                })
                .log();

        Retry retrySpec1 = Retry.backoff(3, Duration.ofMillis(300L))
                .filter(exception -> exception instanceof IllegalStateException);

        Retry retrySpec2 = Retry.backoff(3, Duration.ofMillis(300L))
                .filter(exception -> exception instanceof IllegalAccessError);


        //numbersWithError.retryWhen(retrySpec1).subscribe();
        numbersWithError.retryWhen(retrySpec2).subscribe();

        Thread.sleep(10000L);
    }

repeat()

  • Used to repeat an existing sequence

  • This operator gets invoked after the onCompletion() event from the existing sequence

  • Use it when you have an use-case to subscribe to same publisher again

  • This operator works as long as No Exception is thrown

๋‹ค์‹œ ๊ตฌ๋…(=๋ฐ˜๋ณต)์„ ํ•˜๊ธฐ ์œ„ํ•œ ์—ฐ์‚ฐ์ž์ด๋‹ค. ๊ฐ•์˜์—์„œ๋„ ์„ค๋ช…ํ•˜๊ณ  ์žˆ๊ณ , ๊ณต์‹ ๋ฌธ์„œ์—๋„ ๋‚˜์™€์žˆ๋“ฏ์ด onComplete() ์ด ์‹คํ–‰๋˜์–ด์•ผ๋งŒ ๋ฐ˜๋ณต ๊ตฌ๋…์„ ์‹คํ–‰ํ•œ๋‹ค. retry() ์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ parameter๋ฅผ ๋„ฃ์–ด์ฃผ์ง€ ์•Š์œผ๋ฉด ๋ฌดํ•œํžˆ ๋ฐ˜๋ณตํ•˜๊ณ  parameter๋กœ ๋ฐ˜๋ณต ํšŸ์ˆ˜๋ฅผ ์ œํ•œ ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋‹น์—ฐํ•œ ์ด์•ผ๊ธฐ์ด์ง€๋งŒ ๊ตฌ๋…์ค‘์— ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒฝ์šฐ repeat์€ ๋™์ž‘ํ•˜์ง€ ์•Š๋Š”๋‹ค. repeat ์ด onComplete() ์ดํ›„ ์‹คํ–‰๋œ๋‹ค๋Š” ๊ฒƒ์„ ์ƒ๊ฐํ•ด๋ด๋„ ๊ทธ๋ ‡๊ณ , ๊ตฌ๋…์ค‘ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด repeat ์—ฐ์‚ฐ์ž๋ฅผ ๋งŒ๋‚˜๊ธฐ ์ „์— ์ด๋ฏธ ๊ตฌ๋… ํ๋ฆ„์ด ์ค‘๋‹จ๋˜์–ด๋ฒ„๋ฆฐ๋‹ค๋Š” ๊ฒƒ์„ ์ƒ๊ฐํ•ด๋ด๋„ ์ด๊ฒƒ์ด ์ด์น˜์— ๋งž๋‹ค๊ณ  ์ธ์‹ํ•  ์ˆ˜ ์žˆ๋‹ค.

    @Test
    void repeatWithErrorTest() {
        Flux<Integer> numbers = Flux.fromIterable(List.of(1, 2, 3))
                .concatWith(Mono.error(new RuntimeException()))
                .repeat(1).log();

        StepVerifier.create(numbers)
                .expectNext(1, 2, 3)
                .expectError(RuntimeException.class)
                .verify();
    }

    @Test
    void repeatTest() {
        Flux<Integer> numbers = Flux.fromIterable(List.of(1, 2, 3)).repeat(1).log();

        StepVerifier.create(numbers)
                .expectNext(1, 2, 3, 1, 2, 3)
                .verifyComplete();
    }

Last updated