retry()
Use this operator to retry failed exceptions
When to use it?
Code interacts with external systems through network
Examples are : RestFul API calls, DB Calls
these calls may fail intermittently
์๋ฌ๊ฐ ๋ฐ์ํ๋ฉด ๋ฌดํํ ๋ค์ subscribe()๋ฅผ ์๋ํ๋ค. onComplete() ์ ๋ฐ์ง ๋ชปํ๋ค๋ฉด ๋์์ด subscribe()๋ฅผ ๋ค์ ์๋ํ๋ค.
@Test
void retryTest() throws InterruptedException {
AtomicInteger index = new AtomicInteger();
Flux<Integer> numbersWithError = Flux.fromIterable(List.of(1, 2, 3))
.concatWith(Mono.error(new RuntimeException()))
.onErrorResume(exception -> {
if (index.get() == 5) {
System.out.println("index equals 5");
return Mono.just(10);
} else {
System.out.println("index < 5");
return Mono.error(new RuntimeException());
}
})
.doOnError(ex -> {
index.getAndIncrement();
})
.retry()
.log();
numbersWithError.subscribe();
Thread.sleep(10000L);
}
22:04:52.564 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
22:04:52.602 [main] INFO reactor.Flux.Retry.1 - onSubscribe(FluxRetry.RetrySubscriber)
22:04:52.606 [main] INFO reactor.Flux.Retry.1 - request(unbounded)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.607 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index < 5
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(1)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(2)
22:04:52.608 [main] INFO reactor.Flux.Retry.1 - onNext(3)
index equals 5
22:04:52.609 [main] INFO reactor.Flux.Retry.1 - onNext(10)
22:04:52.609 [main] INFO reactor.Flux.Retry.1 - onComplete()
๋ฐ๋ฉด์ ์๋์ ๊ฐ์ด parameter๋ก retry count๋ฅผ ๋ฃ์ด์ฃผ๋ฉด ์ ํด์ค count ๋งํผ๋ง retry๋ฅผ ์๋ํ๋ค.
retryWhen()
retryWhen์ ๊ณต์ ๋ฌธ์์ ์ค๋ช
์ด ๋๋ฌด ๊ธธ์ด์ ๋ง๋ธ ๋ค์ด์ด๊ทธ๋จ๋ง ๋ฐ์ทํด ์๋ค. retry๋ฅผ ๋ฌด์์ ํ์ง ์๊ณ retrySpec์ ์๊ฑฐํ์ฌ retry ๋ฅผ ํ๋ค๋ ๊ฒ์ด ํน์ง์ด๋ค. ์ฌ์ค ์ค๋ฌด์์๋ retry ๋ณด๋ค retryWhen ์ ์ธ ๊ฐ๋ฅ์ฑ์ด ํฌ๋ค๊ณ ํ๋จ๋๋ค. ํนํ ์ ์์ ์ธ ์์ฒญ์ ๋ํด์ ์๋ ์๋ฒ๊ฐ ๊ฐํ์ ์ผ๋ก ์ด์ํ ๊ฐ์ ๋ด๋ ค์ค๋ค๋ฉด ์ด๋ฅผ ์กฐ๊ฑด์ ์ผ๋ก ํ๋จํด์ retry ํด์ฃผ๋ ๋ก์ง์ด ํ์ํ๋ฏ๋ก ๊ทธ ๋ ์ฌ์ฉํ๋ฉด ์ข๋ค.
@Test
void retryWhenTest() throws InterruptedException {
Flux<Integer> numbersWithError = Flux.fromIterable(List.of(1, 2, 3))
.concatWith(Mono.error(new IllegalStateException()))
.doOnError(exception -> {
System.out.println("exception : " + exception.getClass().getName());
})
.log();
Retry retrySpec1 = Retry.backoff(3, Duration.ofMillis(300L))
.filter(exception -> exception instanceof IllegalStateException);
Retry retrySpec2 = Retry.backoff(3, Duration.ofMillis(300L))
.filter(exception -> exception instanceof IllegalAccessError);
//numbersWithError.retryWhen(retrySpec1).subscribe();
numbersWithError.retryWhen(retrySpec2).subscribe();
Thread.sleep(10000L);
}
repeat()
Used to repeat an existing sequence
This operator gets invoked after the onCompletion() event from the existing sequence
Use it when you have an use-case to subscribe to same publisher again
This operator works as long as No Exception is thrown
๋ค์ ๊ตฌ๋
(=๋ฐ๋ณต)์ ํ๊ธฐ ์ํ ์ฐ์ฐ์์ด๋ค. ๊ฐ์์์๋ ์ค๋ช
ํ๊ณ ์๊ณ , ๊ณต์ ๋ฌธ์์๋ ๋์์๋ฏ์ด onComplete() ์ด ์คํ๋์ด์ผ๋ง ๋ฐ๋ณต ๊ตฌ๋
์ ์คํํ๋ค. retry() ์ ๋ง์ฐฌ๊ฐ์ง๋ก parameter๋ฅผ ๋ฃ์ด์ฃผ์ง ์์ผ๋ฉด ๋ฌดํํ ๋ฐ๋ณตํ๊ณ parameter๋ก ๋ฐ๋ณต ํ์๋ฅผ ์ ํ ํ ์ ์๋ค.
๋น์ฐํ ์ด์ผ๊ธฐ์ด์ง๋ง ๊ตฌ๋
์ค์ ์๋ฌ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ repeat์ ๋์ํ์ง ์๋๋ค. repeat ์ด onComplete() ์ดํ ์คํ๋๋ค๋ ๊ฒ์ ์๊ฐํด๋ด๋ ๊ทธ๋ ๊ณ , ๊ตฌ๋
์ค ์๋ฌ๊ฐ ๋ฐ์ํ๋ฉด repeat ์ฐ์ฐ์๋ฅผ ๋ง๋๊ธฐ ์ ์ ์ด๋ฏธ ๊ตฌ๋
ํ๋ฆ์ด ์ค๋จ๋์ด๋ฒ๋ฆฐ๋ค๋ ๊ฒ์ ์๊ฐํด๋ด๋ ์ด๊ฒ์ด ์ด์น์ ๋ง๋ค๊ณ ์ธ์ํ ์ ์๋ค.
@Test
void repeatWithErrorTest() {
Flux<Integer> numbers = Flux.fromIterable(List.of(1, 2, 3))
.concatWith(Mono.error(new RuntimeException()))
.repeat(1).log();
StepVerifier.create(numbers)
.expectNext(1, 2, 3)
.expectError(RuntimeException.class)
.verify();
}
@Test
void repeatTest() {
Flux<Integer> numbers = Flux.fromIterable(List.of(1, 2, 3)).repeat(1).log();
StepVerifier.create(numbers)
.expectNext(1, 2, 3, 1, 2, 3)
.verifyComplete();
}