Combine

Flux.concat vs Flux.concatWith

static ์œผ๋กœ ์ œ๊ณต๋˜๋Š” ํ•จ์ˆ˜๋‹ค. concatWith์€ ๊ทธ๊ฑธ ์‚ฌ์šฉํ•˜๋Š” publisher ์— parameter๋กœ ๋ฐ›๋Š” publisher๋ฅผ ์ด์–ด ๋ถ™์—ฌ์„œ ํ•˜๋‚˜์˜ down-stream์„ ๋งŒ๋“ค์–ด ์ฃผ๋Š” ๋ฐ˜๋ฉด์— concat ์€ ์ด์–ด ๋ถ™์ด๊ธฐ์œ„ํ•œ ์—ฌ๋Ÿฌ element ๋“ค์„ ๊ฐ€๋ณ€์ธ์ž๋กœ ๋ฐ›์•„ ์ค„ ์ˆ˜ ์žˆ๋‹ค. ๋˜ ์žฌ๋ฏธ์žˆ๋Š” ์ ์€ concat์˜ ๊ฒฝ์šฐ down-stream ์„ ๊ตฌ์„ฑํ•  element type ์— ๋Œ€ํ•ด ์กฐ๊ธˆ ๋” ์ž์œ ๋กญ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

    @Test
    void concatWithTest() {
        Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3));
        Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6));

        StepVerifier.create(numbers_1.concatWith(numbers_2).log())
                .expectNext(1, 2, 3, 4, 5, 6)
                .verifyComplete();
    }

    @Test
    void concatTest() {
        Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3));
        Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6));

        StepVerifier.create(Flux.concat(numbers_1, numbers_2, Mono.just("string")).log())
                .expectNext(1, 2, 3, 4, 5, 6, "string")
                .verifyComplete();
    }

Flux.merge vs Flux.mergeWith

๊ธฐ๋ณธ์ ์œผ๋กœ merge, mergeWith์€ ๋ณต์ˆ˜์˜ publisher ์— ๋Œ€ํ•ด์„œ ์ด๋ฅผ ํ•ฉ์ณ์„œ ํ•˜๋‚˜์˜ down-stream์œผ๋กœ ์ œ๊ณตํ•˜๋˜ ์ด๋ฅผ ๋ณ‘๋ ฌ์ฒ˜ํ•œ๋‹ค๋Š” ํŠน์ง•์ด ์žˆ๋‹ค. flatMap๊ณผ ์œ ์‚ฌํ•˜์ง€๋งŒ element์— ๋ณ€ํ˜•์„ ๊ฐ€ํ•˜์ง€ ์•Š๋Š”๋‹ค๋Š” ์ ์ด ๋‹ค๋ฅด๋‹ค๊ณ  ์ƒ๊ฐํ•˜๋ฉด ๋  ๊ฒƒ ๊ฐ™๋‹ค.

merge, mergeWith์˜ ์ฐจ์ด๋Š” concat๊ณผ concatWith๊ณผ ์œ ์‚ฌํ•˜๊ฒŒ static ์ด๋ฉด์„œ ๊ฐ€๋ณ€์ธ์ž๋กœ parameter ๋“ค์„ ๋ฐ›์•„์„œ merge ํ•ด์ฃผ๋Š” ๊ฒƒ๊ณผ ํŠน์ • publisher ์™€ parameter๋กœ ๋ฐ›์€ ํŠน์ • publisher ์™€ merge ํ•˜๋Š” ์ฐจ์ด๊ฐ€ ์žˆ์—ˆ๋‹ค.

    @Test
    void mergeTest() {
        Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3)).delayElements(Duration.ofMillis(100));
        Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6)).delayElements(Duration.ofMillis(110));

        Flux<Integer> merged = Flux.merge(numbers_1, numbers_2).log();

        StepVerifier.create(merged)
                .expectNext(1, 4, 2, 5, 3, 6)
                .verifyComplete();
    }

    @Test
    void mergeWithTest() {
        Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3)).delayElements(Duration.ofMillis(100));
        Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6)).delayElements(Duration.ofMillis(110));

        Flux<Integer> merged = numbers_1.mergeWith(numbers_2).log();

        StepVerifier.create(merged)
                .expectNext(1, 4, 2, 5, 3, 6)
                .verifyComplete();
    }

Flux.mergeSequantial

static ํ•จ์ˆ˜๋กœ ๊ฐ€๋ณ€์ธ์ž๋กœ ๋ฐ›์€ source๋“ค์„ eagerly ํ•˜๊ฒŒ subsribe() ํ•˜์ง€๋งŒ ์ตœ์ข…์ ์œผ๋กœ ๋ฐ˜ํ™˜ํ•ด์ฃผ๋Š” down-stream์€ ํ˜ธ์ถœ์„ ์‹œ์ž‘ํ•œ ์ˆœ์„œ๋Œ€๋กœ ์กฐํ•ฉํ•˜์—ฌ ๊ตฌ์„ฑํ•ด์ค€๋‹ค.

flatMapSequantial() ์€ element ๋“ค์„ ๋ณ€ํ˜•ํ•˜์—ฌ ํ˜ธ์ถœ ์ˆœ์„œ์— ๋งž๊ฒŒ down-stream์„ ๊ตฌ์„ฑํ•˜๋Š”๋ฐ ์—ฌ๊ธฐ์„œ element ๋“ค์„ ๋ณ€ํ˜•ํ•˜๋Š” ๊ฒƒ์„ ๋นผ๋ฉด mergeSequantial() ์ด๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค.

    @Test
    void mergeSequentialTest() {
        Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3)).delayElements(Duration.ofMillis(100));
        Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6)).delayElements(Duration.ofMillis(110));

        Flux<Integer> merged = Flux.mergeSequential(numbers_1, numbers_2).log();

        StepVerifier.create(merged)
                .expectNext(1, 2, 3, 4, 5, 6)
                .verifyComplete();
    }

Flux.zip vs Flux.zipWith

zip ์€ ๋‘ publisher ๋ฅผ ํ•˜๋‚˜๋กœ ๋ฌถ์–ด์ค„ ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค. concat์€ ๋‹จ์ˆœํžˆ publisher ๋ฅผ ์ด์–ด ๋ถ™์—ฌ์คฌ๊ณ , merge ๊ฐ€ ๋ณ‘๋ ฌ์ ์œผ๋กœ subscribe ํ•˜์—ฌ ํ•˜๋‚˜์˜ down-stream ์œผ๋กœ ํ•ฉ์ณ์คฌ๋‹ค๋ฉด zip์€ publisher ๋“ค์„ ์กฐํ•ฉํ•จ์— ์žˆ์–ด์„œ ์ข€๋” ์„ธ๋ฐ€ํ•˜๊ฒŒ ์ด๋ฅผ ์ปจํŠธ๋กคํ•˜์—ฌ down-stream ์ž์ฒด๊ฐ€ ์ด๋ฏธ ์›ํ•˜๋Š” ์ฒ˜๋ฆฌ๊ฐ€ ๋˜์–ด์„œ ๋‚˜์˜ค๋„๋ก ๊ตฌํ˜„์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

    @Test
    void zipSameCountTest() {
        Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3));
        Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6));

        Flux<Integer> ziped = Flux.zip(numbers_1, numbers_2, (num1, num2) -> num1 + num2).log();
        StepVerifier.create(ziped)
                .expectNext(5, 7, 9)
                .verifyComplete();
    }

    @Test
    void zipNotSameCountTest() {
        Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 1, 1));
        Flux<Integer> numbers_2 = Flux.fromIterable(List.of(1, 1));

        Flux<Integer> ziped = Flux.zip(numbers_1, numbers_2, (num1, num2) -> num1 + num2).log();
        StepVerifier.create(ziped)
                .expectNext(2, 2)
                .verifyComplete();
    }

์œ„ ์˜ˆ์‹œ๋Š” ํ•ฉ์ณ์คŒ๊ณผ ๋™์‹œ์— ์›ํ•˜๋Š” ํ˜•ํƒœ๋กœ ๋ณ€ํ˜•๊นŒ์ง€ ํ•œ ๊ฒƒ์ธ๋ฐ, ์ ์–ด๋„ ๋‚˜์˜ ๊ฒฝํ—˜์ƒ ์‹ค๋ฌด์—์„œ๋Š” ๋‹จ์ˆœํžˆ tuple ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋” ๋งŽ์ด ์ผ๋˜ ๊ฒƒ ๊ฐ™๋‹ค.(๋‚ด๊ฐ€ ๋ชปํ•ด์„œ ๊ทธ๋Ÿฐ๊ฐ€!?)

์ง€๊ธˆ ์ƒ๊ฐํ•ด๋ณด๋‹ˆ ํ•จ์ˆ˜์—์„œ ์˜๋„๋ฅผ ์ž˜ ๋“œ๋Ÿฌ๋‚ผ ์ˆ˜ ์žˆ๋‹ค๋ฉด tuple๋กœ ๊ฐ€์ ธ์™€์„œ ๋ฐ‘์—์„œ ์›ํ•˜๋Š” ๋กœ์ง์„ ๋„ฃ๋Š” ๊ฒƒ ๋ณด๋‹ค๋Š” ์œ„ ์˜ˆ์‹œ์ฒ˜๋Ÿผ ํ•จ์ˆ˜ ์ž์ฒด๋ฅผ parameter๋กœ ๋„ฃ์–ด์ค˜์„œ zip operator ๊ฐ€ ์ž์ฒด์ ์œผ๋กœ ๊ฐœ๋ฐœ์ž๊ฐ€ ์›ํ•˜๋Š” ๋ณ€ํ˜• ๋กœ์ง๊นŒ์ง€ ์ˆ˜ํ–‰ํ•˜๋„๋ก ํ•˜๋Š” ๊ฒƒ๋„ ๊ดœ์ฐฎ๊ฒ ๋‹ค.

์•„๋ž˜ document์™€ ์˜ˆ์ œ์ฝ”๋“œ๋Š” ๋‹จ์ˆœํžˆ ๊ฒฐํ•ฉ๋งŒ ํ•ด์ฃผ๋Š” zip ํ™œ์šฉ์— ๋Œ€ํ•œ ๋‚ด์šฉ์ด๋‹ค.

    @Test
    void simpleZipTest() {
        Flux<String> names = Flux.fromIterable(List.of("leo", "siri", "jbl"));
        Flux<String> capital = Flux.fromIterable(List.of("A", "B"));

        Flux<String> ziped = Flux.zip(names, capital)
                .map(t2 -> t2.getT1() + t2.getT2())
                .log();
        StepVerifier.create(ziped)
                .expectNext("leoA", "siriB")
                .verifyComplete();
    }

zipWith ๋„ ์ด๋ฏธ concat๊ณผ merge์—์„œ ๋‹ค๋ค˜๋˜ ๋‚ด์šฉ๊ณผ ์œ ์‚ฌํ•˜๋‹ค. zip ์ด static ๋ฉ”์†Œ๋“œ๋ผ๋ฉด zipWith์€ publisher ์˜ ๋ฉ”์†Œ๋“œ๋กœ ํŠน์ • publisher ๋ฅผ parameter๋กœ ๋ฐ›์•„์„œ tupleํ˜•ํƒœ๋กœ down-stream ์— ๋‚ด๋ ค์ค„ ์ˆ˜ ์žˆ๊ณ , ๋žŒ๋‹ค๋กœ ํ•จ์ˆ˜๋ฅผ parameter๋กœ ๋” ๋ฐ›์•„์„œ ๋ณ€ํ˜•๊นŒ์ง€ ๊ฐ€๋Šฅํ•˜๋‹ค.

    @Test
    void zipWithTest() {
        Flux<String> names = Flux.fromIterable(List.of("leo", "siri", "jbl"));
        Flux<String> capital = Flux.fromIterable(List.of("A", "B"));

        Flux<String> ziped = names.zipWith(capital, (str1, str2) -> str1 + str2).log();

        StepVerifier.create(ziped)
                .expectNext("leoA", "siriB")
                .verifyComplete();
    }

zip, zipWith ์ด ๋‚ด๋ถ€์ ์œผ๋กœ ๊ตฌ๋…๋˜๋Š” ๋ฐฉ์‹์— ๋Œ€ํ•ด์„œ ์ •ํ™•ํ•˜๊ฒŒ ์–ด๋–ป๊ฒŒ ์ž‘๋™ํ•˜๋Š”์ง€ ์˜๋ฌธ์ด ๋งŽ์•˜๋Š”๋ฐ(์•„์ง ๋ ˆํผ๋Ÿฐ์Šค๋ฅผ ์ฐพ์ง€๋Š” ๋ชปํ–ˆ์Œ) ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ ๊ทธ๋Œ€๋กœ๋กœ ์ผ๋‹จ ์ดํ•ดํ•˜๋ฉด ๋  ๊ฒƒ ๊ฐ™๋‹ค.

zip ์ด ๋˜๋Š” ๋‘ Flux ๊ฐ€ ์žˆ์„๋•Œ ์ด๊ฒƒ์ด ๋ชจ๋‘๋‹ค ์ญ‰ ๊ตฌ๋…์ด ์ผ๋‹จ ์™„๋ฃŒ๋˜๊ณ (๋ณ‘๋ ฌ์ ์œผ๋กœ ๋‘˜์„ ๋™์‹œ์—) ๋‚ด๋ถ€์ ์œผ๋กœ ์ด๋ฅผ ๊ฒฐํ•ฉ ํ•ด์ฃผ ๋Š” ๊ฒƒ์œผ๋กœ ๋ณด์ธ๋‹ค. ์™œ๋ƒํ•˜๋ฉด ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ ์ž์ฒด๊ฐ€ ๋‘˜ ๋‹ค complete ๊นŒ์ง€ ๋„๋‹ฌํ•˜๋Š” ๊ฒƒ์„ ๋ณด๋ฉด ์•Œ ์ˆ˜ ์žˆ๋‹ค. ๋”ฐ๋ผ์„œ ๊ฐœ์ˆ˜๋ฅผ ๋งž์ถฐ์ฃผ๊ธฐ ์œ„ํ•ด์„œ ํ•˜๋‚˜ emitํ•˜๊ณ  ๋‹ค๋ฅธ ๋Œ€์ƒ Flux์˜ emit์„ ๊ธฐ๋‹ค๋ฆฌ๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ ์ผ๋‹จ ์ญ‰ ๊ตฌ๋…์€ ๊ตฌ๋…๋Œ€๋กœ ์ง„ํ–‰ํ•˜๊ณ , ๊ฒฐํ•ฉ์€ ๊ฒฐํ•ฉ๋Œ€๋กœ ์ˆ˜๊ฐ€ ๋งž์„๋•Œ ๋งž์ถฐ์„œ emit ํ•ด์ฃผ๋Š” ๊ฒƒ์œผ๋กœ ๋ณด์ธ๋‹ค.

Last updated