Reactor execution model 3 - parallelism

Parallelism using parallel() and runOn() operator

public class PararellTest {

    @Test
    void pararellTest_1() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .map(this::stop1Second)
                .log();

        StepVerifier.create(numbers)
                .expectNextCount(10)
                .verifyComplete();
    }

    @Test
    void pararellTest_2() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .publishOn(Schedulers.parallel())
                .map(this::stop1Second)
                .log();

        StepVerifier.create(numbers)
                .expectNextCount(10)
                .verifyComplete();
    }

    @Test
    void pararellTest_3() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("availableProcessors : " + availableProcessors);
        
        ParallelFlux<Integer> numbers = Flux.range(1, 10)
                .parallel()
                .runOn(Schedulers.parallel())
                .map(this::stop1Second)
                .log();

        StepVerifier.create(numbers)
                .expectNextCount(10)
                .verifyComplete();
    }

    private Integer stop1Second(int value) {
        try {
            System.out.println("current value : " + value);
            Thread.sleep(1000L);
            return value;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

์ด์ „์— publishOn()๊ณผ subscribeOn()์œผ๋กœ ์ž‘์—… ์Šค๋ ˆ๋“œ๋ฅผ ์ œ์–ดํ•˜์—ฌ ์›ํ•˜๋Š” ์ž‘์—…์„ ๋ณ‘๋ ฌ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด์„œ ์‹ค์Šต์„ ํ•ด๋ณด์•˜๋Š”๋ฐ, ๊ฐ•์˜์—์„œ ์†Œ๊ฐœํ•˜๋Š” ๋˜ ๋‹ค๋ฅธ ๋ฐฉ์‹์œผ๋กœ pararell์„ ์ด์šฉํ•ด์„œ ๋กœ์ง์„ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์„ ์†Œ๊ฐœํ•˜๊ณ  ์žˆ์—ˆ๋‹ค.

pararellTest_1, pararellTest_2๋Š” ๋ชจ๋‘ 10์ดˆ ์ด์ƒ์ด ๊ฑธ๋ฆฌ๋ฉฐ pararellTest_3์€ ๋ชจ๋“  ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š”๋ฐ 1์ดˆ ๋‚จ์ง“์ด ๊ฑธ๋ฆฐ๋‹ค. ์ฐธ๊ณ ๋กœ ๋‚ด๊ฐ€ ์ž‘์—…ํ•œ ์ปดํ“จํ„ฐ์˜ ์ฝ”์–ด ์ˆ˜๋Š” 10๊ฐœ๋ผ์„œ availableProcessors ๊ฐ€ 10์ด๋‹ค.

Parallelism using flatMap() operator

    @Test
    void pararellTest_4() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .flatMap(number -> Mono.just(number)
                        .map(this::stop1Second)
                        .subscribeOn(Schedulers.parallel()))
                .log();

        StepVerifier.create(numbers)
                .expectNextCount(10)
                .verifyComplete();
    }

flatMap()์€ inner-publisher ๋“ค์„ eagarly ํ•˜๊ฒŒ ๋ชจ๋‘ subscribe()ํ•ด์„œ ๊ฐ๊ธฐ ๋‹ค๋ฅธ lifecycle์— ๋”ฐ๋ผ ๋„์ฐฉํ•˜๋Š” ์ˆœ์„œ๋Œ€๋กœ down-stream ์œผ๋กœ ๋ณด๋‚ด์ฃผ๋Š” operator ์ด๋‹ค. ์œ„ ์ฝ”๋“œ๋Š” ์ด๋ฅผ ์ด์šฉํ•ด์„œ ๋ณ‘๋ ฌ ์ˆ˜ํ–‰ํ•˜๋Š” ์˜ˆ์ œ์ฝ”๋“œ์ด๋‹ค. ์œ„์—์„œ ๋‹ค๋ฃฌ pararellTest_3 ๊ณผ ๊ฒฐ๊ณผ๋Š” ๋น„์Šทํ•˜์ง€๋งŒ flatMap()์„ ์ด์šฉํ–ˆ๋‹ค๋Š” ์ธก๋ฉด์—์„œ ๋ฐฉ์‹์ด ๋‹ค๋ฅด๋‹ค๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

์—ฌ๊ธฐ์„œ up-stream์˜ ์ˆœ์„œ๋ฅผ ๊ทธ๋Œ€๋กœ ์œ ์ง€ํ•˜์—ฌ down-stream ์œผ๋กœ ๋ณด๋‚ด์ฃผ๊ณ  ์‹ถ์œผ๋ฉด flatMap์„ flatMapSequantial()๋กœ ๋ฐ”๊ฟ”์ฃผ๊ธฐ๋งŒ ํ•˜๋ฉด ๋œ๋‹ค.

Last updated