Combine
Flux.concat vs Flux.concatWith


static ์ผ๋ก ์ ๊ณต๋๋ ํจ์๋ค. concatWith์ ๊ทธ๊ฑธ ์ฌ์ฉํ๋ publisher ์ parameter๋ก ๋ฐ๋ publisher๋ฅผ ์ด์ด ๋ถ์ฌ์ ํ๋์ down-stream์ ๋ง๋ค์ด ์ฃผ๋ ๋ฐ๋ฉด์ concat ์ ์ด์ด ๋ถ์ด๊ธฐ์ํ ์ฌ๋ฌ element ๋ค์ ๊ฐ๋ณ์ธ์๋ก ๋ฐ์ ์ค ์ ์๋ค. ๋ ์ฌ๋ฏธ์๋ ์ ์ concat์ ๊ฒฝ์ฐ down-stream ์ ๊ตฌ์ฑํ element type ์ ๋ํด ์กฐ๊ธ ๋ ์์ ๋กญ๋ค๋ ๊ฒ์ด๋ค.
@Test
void concatWithTest() {
Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3));
Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6));
StepVerifier.create(numbers_1.concatWith(numbers_2).log())
.expectNext(1, 2, 3, 4, 5, 6)
.verifyComplete();
}
@Test
void concatTest() {
Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3));
Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6));
StepVerifier.create(Flux.concat(numbers_1, numbers_2, Mono.just("string")).log())
.expectNext(1, 2, 3, 4, 5, 6, "string")
.verifyComplete();
}
Flux.merge vs Flux.mergeWith


๊ธฐ๋ณธ์ ์ผ๋ก merge, mergeWith์ ๋ณต์์ publisher ์ ๋ํด์ ์ด๋ฅผ ํฉ์ณ์ ํ๋์ down-stream์ผ๋ก ์ ๊ณตํ๋ ์ด๋ฅผ ๋ณ๋ ฌ์ฒํ๋ค๋ ํน์ง์ด ์๋ค. flatMap๊ณผ ์ ์ฌํ์ง๋ง element์ ๋ณํ์ ๊ฐํ์ง ์๋๋ค๋ ์ ์ด ๋ค๋ฅด๋ค๊ณ ์๊ฐํ๋ฉด ๋ ๊ฒ ๊ฐ๋ค.
merge, mergeWith์ ์ฐจ์ด๋ concat๊ณผ concatWith๊ณผ ์ ์ฌํ๊ฒ static ์ด๋ฉด์ ๊ฐ๋ณ์ธ์๋ก parameter ๋ค์ ๋ฐ์์ merge ํด์ฃผ๋ ๊ฒ๊ณผ ํน์ publisher ์ parameter๋ก ๋ฐ์ ํน์ publisher ์ merge ํ๋ ์ฐจ์ด๊ฐ ์์๋ค.
@Test
void mergeTest() {
Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3)).delayElements(Duration.ofMillis(100));
Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6)).delayElements(Duration.ofMillis(110));
Flux<Integer> merged = Flux.merge(numbers_1, numbers_2).log();
StepVerifier.create(merged)
.expectNext(1, 4, 2, 5, 3, 6)
.verifyComplete();
}
@Test
void mergeWithTest() {
Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3)).delayElements(Duration.ofMillis(100));
Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6)).delayElements(Duration.ofMillis(110));
Flux<Integer> merged = numbers_1.mergeWith(numbers_2).log();
StepVerifier.create(merged)
.expectNext(1, 4, 2, 5, 3, 6)
.verifyComplete();
}Flux.mergeSequantial

static ํจ์๋ก ๊ฐ๋ณ์ธ์๋ก ๋ฐ์ source๋ค์ eagerly ํ๊ฒ subsribe() ํ์ง๋ง ์ต์ข ์ ์ผ๋ก ๋ฐํํด์ฃผ๋ down-stream์ ํธ์ถ์ ์์ํ ์์๋๋ก ์กฐํฉํ์ฌ ๊ตฌ์ฑํด์ค๋ค.
flatMapSequantial() ์ element ๋ค์ ๋ณํํ์ฌ ํธ์ถ ์์์ ๋ง๊ฒ down-stream์ ๊ตฌ์ฑํ๋๋ฐ ์ฌ๊ธฐ์ element ๋ค์ ๋ณํํ๋ ๊ฒ์ ๋นผ๋ฉด mergeSequantial() ์ด๋ผ๊ณ ํ ์ ์์ ๊ฒ ๊ฐ๋ค.
@Test
void mergeSequentialTest() {
Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3)).delayElements(Duration.ofMillis(100));
Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6)).delayElements(Duration.ofMillis(110));
Flux<Integer> merged = Flux.mergeSequential(numbers_1, numbers_2).log();
StepVerifier.create(merged)
.expectNext(1, 2, 3, 4, 5, 6)
.verifyComplete();
}Flux.zip vs Flux.zipWith

zip ์ ๋ publisher ๋ฅผ ํ๋๋ก ๋ฌถ์ด์ค ๋ ์ฌ์ฉํ๋ค. concat์ ๋จ์ํ publisher ๋ฅผ ์ด์ด ๋ถ์ฌ์คฌ๊ณ , merge ๊ฐ ๋ณ๋ ฌ์ ์ผ๋ก subscribe ํ์ฌ ํ๋์ down-stream ์ผ๋ก ํฉ์ณ์คฌ๋ค๋ฉด zip์ publisher ๋ค์ ์กฐํฉํจ์ ์์ด์ ์ข๋ ์ธ๋ฐํ๊ฒ ์ด๋ฅผ ์ปจํธ๋กคํ์ฌ down-stream ์์ฒด๊ฐ ์ด๋ฏธ ์ํ๋ ์ฒ๋ฆฌ๊ฐ ๋์ด์ ๋์ค๋๋ก ๊ตฌํ์ด ๊ฐ๋ฅํ๋ค.
@Test
void zipSameCountTest() {
Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 2, 3));
Flux<Integer> numbers_2 = Flux.fromIterable(List.of(4, 5, 6));
Flux<Integer> ziped = Flux.zip(numbers_1, numbers_2, (num1, num2) -> num1 + num2).log();
StepVerifier.create(ziped)
.expectNext(5, 7, 9)
.verifyComplete();
}
@Test
void zipNotSameCountTest() {
Flux<Integer> numbers_1 = Flux.fromIterable(List.of(1, 1, 1));
Flux<Integer> numbers_2 = Flux.fromIterable(List.of(1, 1));
Flux<Integer> ziped = Flux.zip(numbers_1, numbers_2, (num1, num2) -> num1 + num2).log();
StepVerifier.create(ziped)
.expectNext(2, 2)
.verifyComplete();
}
์ ์์๋ ํฉ์ณ์ค๊ณผ ๋์์ ์ํ๋ ํํ๋ก ๋ณํ๊น์ง ํ ๊ฒ์ธ๋ฐ, ์ ์ด๋ ๋์ ๊ฒฝํ์ ์ค๋ฌด์์๋ ๋จ์ํ tuple ํํ๋ก ๋ฐํํ๋๋ก ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ ๋ง์ด ์ผ๋ ๊ฒ ๊ฐ๋ค.(๋ด๊ฐ ๋ชปํด์ ๊ทธ๋ฐ๊ฐ!?)
์ง๊ธ ์๊ฐํด๋ณด๋ ํจ์์์ ์๋๋ฅผ ์ ๋๋ฌ๋ผ ์ ์๋ค๋ฉด tuple๋ก ๊ฐ์ ธ์์ ๋ฐ์์ ์ํ๋ ๋ก์ง์ ๋ฃ๋ ๊ฒ ๋ณด๋ค๋ ์ ์์์ฒ๋ผ ํจ์ ์์ฒด๋ฅผ parameter๋ก ๋ฃ์ด์ค์ zip operator ๊ฐ ์์ฒด์ ์ผ๋ก ๊ฐ๋ฐ์๊ฐ ์ํ๋ ๋ณํ ๋ก์ง๊น์ง ์ํํ๋๋ก ํ๋ ๊ฒ๋ ๊ด์ฐฎ๊ฒ ๋ค.
์๋ document์ ์์ ์ฝ๋๋ ๋จ์ํ ๊ฒฐํฉ๋ง ํด์ฃผ๋ zip ํ์ฉ์ ๋ํ ๋ด์ฉ์ด๋ค.

@Test
void simpleZipTest() {
Flux<String> names = Flux.fromIterable(List.of("leo", "siri", "jbl"));
Flux<String> capital = Flux.fromIterable(List.of("A", "B"));
Flux<String> ziped = Flux.zip(names, capital)
.map(t2 -> t2.getT1() + t2.getT2())
.log();
StepVerifier.create(ziped)
.expectNext("leoA", "siriB")
.verifyComplete();
}zipWith ๋ ์ด๋ฏธ concat๊ณผ merge์์ ๋ค๋ค๋ ๋ด์ฉ๊ณผ ์ ์ฌํ๋ค. zip ์ด static ๋ฉ์๋๋ผ๋ฉด zipWith์ publisher ์ ๋ฉ์๋๋ก ํน์ publisher ๋ฅผ parameter๋ก ๋ฐ์์ tupleํํ๋ก down-stream ์ ๋ด๋ ค์ค ์ ์๊ณ , ๋๋ค๋ก ํจ์๋ฅผ parameter๋ก ๋ ๋ฐ์์ ๋ณํ๊น์ง ๊ฐ๋ฅํ๋ค.

@Test
void zipWithTest() {
Flux<String> names = Flux.fromIterable(List.of("leo", "siri", "jbl"));
Flux<String> capital = Flux.fromIterable(List.of("A", "B"));
Flux<String> ziped = names.zipWith(capital, (str1, str2) -> str1 + str2).log();
StepVerifier.create(ziped)
.expectNext("leoA", "siriB")
.verifyComplete();
}
zip, zipWith ์ด ๋ด๋ถ์ ์ผ๋ก ๊ตฌ๋ ๋๋ ๋ฐฉ์์ ๋ํด์ ์ ํํ๊ฒ ์ด๋ป๊ฒ ์๋ํ๋์ง ์๋ฌธ์ด ๋ง์๋๋ฐ(์์ง ๋ ํผ๋ฐ์ค๋ฅผ ์ฐพ์ง๋ ๋ชปํ์) ๋ง๋ธ ๋ค์ด์ด๊ทธ๋จ ๊ทธ๋๋ก๋ก ์ผ๋จ ์ดํดํ๋ฉด ๋ ๊ฒ ๊ฐ๋ค.
zip ์ด ๋๋ ๋ Flux ๊ฐ ์์๋ ์ด๊ฒ์ด ๋ชจ๋๋ค ์ญ ๊ตฌ๋ ์ด ์ผ๋จ ์๋ฃ๋๊ณ (๋ณ๋ ฌ์ ์ผ๋ก ๋์ ๋์์) ๋ด๋ถ์ ์ผ๋ก ์ด๋ฅผ ๊ฒฐํฉ ํด์ฃผ ๋ ๊ฒ์ผ๋ก ๋ณด์ธ๋ค. ์๋ํ๋ฉด ๋ง๋ธ ๋ค์ด์ด๊ทธ๋จ ์์ฒด๊ฐ ๋ ๋ค complete ๊น์ง ๋๋ฌํ๋ ๊ฒ์ ๋ณด๋ฉด ์ ์ ์๋ค. ๋ฐ๋ผ์ ๊ฐ์๋ฅผ ๋ง์ถฐ์ฃผ๊ธฐ ์ํด์ ํ๋ emitํ๊ณ ๋ค๋ฅธ ๋์ Flux์ emit์ ๊ธฐ๋ค๋ฆฌ๋ ๊ฒ์ด ์๋๋ผ ์ผ๋จ ์ญ ๊ตฌ๋ ์ ๊ตฌ๋ ๋๋ก ์งํํ๊ณ , ๊ฒฐํฉ์ ๊ฒฐํฉ๋๋ก ์๊ฐ ๋ง์๋ ๋ง์ถฐ์ emit ํด์ฃผ๋ ๊ฒ์ผ๋ก ๋ณด์ธ๋ค.
Last updated