Reactor execution model 3 - parallelism
Parallelism using parallel() and runOn() operator
public class PararellTest {
@Test
void pararellTest_1() {
Flux<Integer> numbers = Flux.range(1, 10)
.map(this::stop1Second)
.log();
StepVerifier.create(numbers)
.expectNextCount(10)
.verifyComplete();
}
@Test
void pararellTest_2() {
Flux<Integer> numbers = Flux.range(1, 10)
.publishOn(Schedulers.parallel())
.map(this::stop1Second)
.log();
StepVerifier.create(numbers)
.expectNextCount(10)
.verifyComplete();
}
@Test
void pararellTest_3() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
System.out.println("availableProcessors : " + availableProcessors);
ParallelFlux<Integer> numbers = Flux.range(1, 10)
.parallel()
.runOn(Schedulers.parallel())
.map(this::stop1Second)
.log();
StepVerifier.create(numbers)
.expectNextCount(10)
.verifyComplete();
}
private Integer stop1Second(int value) {
try {
System.out.println("current value : " + value);
Thread.sleep(1000L);
return value;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
์ด์ ์ publishOn()๊ณผ subscribeOn()์ผ๋ก ์์ ์ค๋ ๋๋ฅผ ์ ์ดํ์ฌ ์ํ๋ ์์ ์ ๋ณ๋ ฌ ์ํํ๋ ๋ฐฉ๋ฒ์ ๋ํด์ ์ค์ต์ ํด๋ณด์๋๋ฐ, ๊ฐ์์์ ์๊ฐํ๋ ๋ ๋ค๋ฅธ ๋ฐฉ์์ผ๋ก pararell์ ์ด์ฉํด์ ๋ก์ง์ ๋ณ๋ ฌ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ ์๊ฐํ๊ณ ์์๋ค.
pararellTest_1, pararellTest_2๋ ๋ชจ๋ 10์ด ์ด์์ด ๊ฑธ๋ฆฌ๋ฉฐ pararellTest_3์ ๋ชจ๋ ์์ ์ ์ํํ๋๋ฐ 1์ด ๋จ์ง์ด ๊ฑธ๋ฆฐ๋ค. ์ฐธ๊ณ ๋ก ๋ด๊ฐ ์์ ํ ์ปดํจํฐ์ ์ฝ์ด ์๋ 10๊ฐ๋ผ์ availableProcessors ๊ฐ 10์ด๋ค.
Parallelism using flatMap() operator
@Test
void pararellTest_4() {
Flux<Integer> numbers = Flux.range(1, 10)
.flatMap(number -> Mono.just(number)
.map(this::stop1Second)
.subscribeOn(Schedulers.parallel()))
.log();
StepVerifier.create(numbers)
.expectNextCount(10)
.verifyComplete();
}
flatMap()์ inner-publisher ๋ค์ eagarly ํ๊ฒ ๋ชจ๋ subscribe()ํด์ ๊ฐ๊ธฐ ๋ค๋ฅธ lifecycle์ ๋ฐ๋ผ ๋์ฐฉํ๋ ์์๋๋ก down-stream ์ผ๋ก ๋ณด๋ด์ฃผ๋ operator ์ด๋ค. ์ ์ฝ๋๋ ์ด๋ฅผ ์ด์ฉํด์ ๋ณ๋ ฌ ์ํํ๋ ์์ ์ฝ๋์ด๋ค. ์์์ ๋ค๋ฃฌ pararellTest_3 ๊ณผ ๊ฒฐ๊ณผ๋ ๋น์ทํ์ง๋ง flatMap()์ ์ด์ฉํ๋ค๋ ์ธก๋ฉด์์ ๋ฐฉ์์ด ๋ค๋ฅด๋ค๊ณ ํ ์ ์๋ค.
์ฌ๊ธฐ์ up-stream์ ์์๋ฅผ ๊ทธ๋๋ก ์ ์งํ์ฌ down-stream ์ผ๋ก ๋ณด๋ด์ฃผ๊ณ ์ถ์ผ๋ฉด flatMap์ flatMapSequantial()๋ก ๋ฐ๊ฟ์ฃผ๊ธฐ๋ง ํ๋ฉด ๋๋ค.
Last updated