Cold & Hot Streams
Cold Streams Overview
Cold Stream is a type of Stream which emits the elements from beginning to end for every new subscription.
ex. HTTP Cal with similar request, DB call with similar request
Hot Streams Overview
Data is emitted continuously
Any new subscriber will only get he current state of the Reactive Stream
Type 1 : Waits for the first subscription from the subscriber and emits the data continuously
Type 2 : Emits the data continuously without the need for subscription
ex. Stock Tickers - Emits stock updates continuously as the change, Uber Driver Tracking - Emits the of the current position of the Driver Continuously.
Hot publishers, on the other hand, do not depend on any number of subscribers. They might start publishing data right away and would continue doing so whenever a new
Subscriber
comes in (in which case, the subscriber would see only new elements emitted after it subscribed). For hot publishers, something does indeed happen before you subscribe.
Hot publisher๋ subscriber์ ์์กดํ์ง ์๋๋ค. subscriber ๊ฐ ์ธ์ ๋ถ์ด์ ์ธ์ ๊ตฌ๋ ์ ํ๋ ์๊ด์๋ค.
subscribe์ ์ ์ด๋ฏธ publisher๋ ๋ญ๊ฐ๋ฅผ ํ๋ค.
Sometimes, you may want to not defer only some processing to the subscription time of one subscriber, but you might actually want for several of them to rendezvous and then trigger the subscription and data generation.
๋ฉํฐ ํ๋ก์ธ์ฑ์ ํด์ผํ ๋ ์ด๋ค ๊ฒ์ ์ ์ ๋ฉ์ถ๊ฒ ํ๋ค๋๊ฐ ํ์ง ์๊ณ ๊ฐ์ ์์์ ๋ณ๋ ฌ์ํ์ ํ๊ณ , ๊ฐ๊ฐ์ ์ฒ๋ฆฌ๊ฒฐ๊ณผ๋ฅผ ํ๋ฐ ๋ชจ์์ ์ฌ์ฒ๋ฆฌ๋ฅผ ํด์ผํ ๊ฒฝ์ฐ๊ฐ ์๋ค๋ ๋ป.
This is what
ConnectableFlux
is made for. Two main patterns are covered in theFlux
API that return aConnectableFlux
:publish
andreplay
.
ConnectableFlux ๋ฅผ ์ฐ๋ฉด ์์ ๊ฐ์ ๊ฒฝ์ฐ๋ฅผ ํด๊ฒฐํ ์ ์๋ค.
package com.fistkim.reactorstudy.hotcold;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;
import java.time.Duration;
public class HotColdTest {
@Test
void hotTest_1() {
Flux<Integer> numbers = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1));
ConnectableFlux<Integer> connectableNumbers = numbers.publish();
connectableNumbers.connect();
connectableNumbers.subscribe(num -> System.out.println("subscriber 1 : " + num));
stopSeconds(5);
connectableNumbers.subscribe(num -> System.out.println("subscriber 2 : " + num));
stopSeconds(10);
}
@Test
void autoConnectTest() {
Flux<Integer> numbers = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1));
Flux<Integer> numbersFlux = numbers.publish().autoConnect(2);
numbersFlux.subscribe(num -> System.out.println("subscriber 1 : " + num));
stopSeconds(5);
numbersFlux.subscribe(num -> System.out.println("subscriber 2 : " + num));
stopSeconds(10);
}
@Test
void refConnectTest() {
Flux<Integer> numbers = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1));
Flux<Integer> numbersFlux = numbers.publish().refCount(2);
var a = numbersFlux.subscribe(number -> System.out.println("subscriber 1 : " + number));
var b = numbersFlux.subscribe(number -> System.out.println("subscriber 2 : " + number));
a.dispose();
b.dispose();
stopSeconds(10);
}
@Test
void virtualTimerTest() {
VirtualTimeScheduler.getOrSet();
Flux<Integer> numbers = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1));
StepVerifier.withVirtualTime(() -> numbers)
.thenAwait(Duration.ofSeconds(12))
.expectNextCount(10)
.verifyComplete();
}
private void stopSeconds(Integer second) {
try {
Thread.sleep(Long.parseLong(second.toString()) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Last updated