Reactor execution model 3 - parallelism

Parallelism using parallel() and runOn() operator

public class PararellTest {

    @Test
    void pararellTest_1() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .map(this::stop1Second)
                .log();

        StepVerifier.create(numbers)
                .expectNextCount(10)
                .verifyComplete();
    }

    @Test
    void pararellTest_2() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .publishOn(Schedulers.parallel())
                .map(this::stop1Second)
                .log();

        StepVerifier.create(numbers)
                .expectNextCount(10)
                .verifyComplete();
    }

    @Test
    void pararellTest_3() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("availableProcessors : " + availableProcessors);
        
        ParallelFlux<Integer> numbers = Flux.range(1, 10)
                .parallel()
                .runOn(Schedulers.parallel())
                .map(this::stop1Second)
                .log();

        StepVerifier.create(numbers)
                .expectNextCount(10)
                .verifyComplete();
    }

    private Integer stop1Second(int value) {
        try {
            System.out.println("current value : " + value);
            Thread.sleep(1000L);
            return value;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

이전에 publishOn()κ³Ό subscribeOn()으둜 μž‘μ—… μŠ€λ ˆλ“œλ₯Ό μ œμ–΄ν•˜μ—¬ μ›ν•˜λŠ” μž‘μ—…μ„ 병렬 μˆ˜ν–‰ν•˜λŠ” 방법에 λŒ€ν•΄μ„œ μ‹€μŠ΅μ„ ν•΄λ³΄μ•˜λŠ”λ°, κ°•μ˜μ—μ„œ μ†Œκ°œν•˜λŠ” 또 λ‹€λ₯Έ λ°©μ‹μœΌλ‘œ pararell을 μ΄μš©ν•΄μ„œ λ‘œμ§μ„ λ³‘λ ¬μ²˜λ¦¬ν•˜λŠ” 방식을 μ†Œκ°œν•˜κ³  μžˆμ—ˆλ‹€.

pararellTest_1, pararellTest_2λŠ” λͺ¨λ‘ 10초 이상이 걸리며 pararellTest_3은 λͺ¨λ“  μž‘μ—…μ„ μˆ˜ν–‰ν•˜λŠ”λ° 1초 남짓이 κ±Έλ¦°λ‹€. 참고둜 λ‚΄κ°€ μž‘μ—…ν•œ μ»΄ν“¨ν„°μ˜ μ½”μ–΄ μˆ˜λŠ” 10κ°œλΌμ„œ availableProcessors κ°€ 10이닀.

Parallelism using flatMap() operator

flatMap()은 inner-publisher 듀을 eagarly ν•˜κ²Œ λͺ¨λ‘ subscribe()ν•΄μ„œ 각기 λ‹€λ₯Έ lifecycle에 따라 λ„μ°©ν•˜λŠ” μˆœμ„œλŒ€λ‘œ down-stream 으둜 λ³΄λ‚΄μ£ΌλŠ” operator 이닀. μœ„ μ½”λ“œλŠ” 이λ₯Ό μ΄μš©ν•΄μ„œ 병렬 μˆ˜ν–‰ν•˜λŠ” μ˜ˆμ œμ½”λ“œμ΄λ‹€. μœ„μ—μ„œ 닀룬 pararellTest_3 κ³Ό κ²°κ³ΌλŠ” λΉ„μŠ·ν•˜μ§€λ§Œ flatMap()을 μ΄μš©ν–ˆλ‹€λŠ” μΈ‘λ©΄μ—μ„œ 방식이 λ‹€λ₯΄λ‹€κ³  ν•  수 μžˆλ‹€.

μ—¬κΈ°μ„œ up-stream의 μˆœμ„œλ₯Ό κ·ΈλŒ€λ‘œ μœ μ§€ν•˜μ—¬ down-stream 으둜 보내주고 μ‹ΆμœΌλ©΄ flatMap을 flatMapSequantial()둜 λ°”κΏ”μ£ΌκΈ°λ§Œ ν•˜λ©΄ λœλ‹€.

Last updated