Cold & Hot Streams

Cold Streams Overview

  • Cold Stream is a type of Stream which emits the elements from beginning to end for every new subscription.

  • ex. HTTP Cal with similar request, DB call with similar request

Hot Streams Overview

  • Data is emitted continuously

  • Any new subscriber will only get he current state of the Reactive Stream

    • Type 1 : Waits for the first subscription from the subscriber and emits the data continuously

    • Type 2 : Emits the data continuously without the need for subscription

  • ex. Stock Tickers - Emits stock updates continuously as the change, Uber Driver Tracking - Emits the of the current position of the Driver Continuously.

Hot publishers, on the other hand, do not depend on any number of subscribers. They might start publishing data right away and would continue doing so whenever a new Subscriber comes in (in which case, the subscriber would see only new elements emitted after it subscribed). For hot publishers, something does indeed happen before you subscribe.

  • Hot publisher๋Š” subscriber์— ์˜์กดํ•˜์ง€ ์•Š๋Š”๋‹ค. subscriber ๊ฐ€ ์–ธ์ œ ๋ถ™์–ด์„œ ์–ธ์ œ ๊ตฌ๋…์„ ํ•˜๋“  ์ƒ๊ด€์—†๋‹ค.

  • subscribe์ „์— ์ด๋ฏธ publisher๋Š” ๋ญ”๊ฐ€๋ฅผ ํ•œ๋‹ค.

Sometimes, you may want to not defer only some processing to the subscription time of one subscriber, but you might actually want for several of them to rendezvous and then trigger the subscription and data generation.

  • ๋ฉ€ํ‹ฐ ํ”„๋กœ์„ธ์‹ฑ์„ ํ•ด์•ผํ•  ๋•Œ ์–ด๋–ค ๊ฒƒ์„ ์ž ์‹œ ๋ฉˆ์ถ”๊ฒŒ ํ•œ๋‹ค๋˜๊ฐ€ ํ•˜์ง€ ์•Š๊ณ  ๊ฐ์ž ์•Œ์•„์„œ ๋ณ‘๋ ฌ์ˆ˜ํ–‰์„ ํ•˜๊ณ , ๊ฐ๊ฐ์˜ ์ฒ˜๋ฆฌ๊ฒฐ๊ณผ๋ฅผ ํ•œ๋ฐ ๋ชจ์•„์„œ ์žฌ์ฒ˜๋ฆฌ๋ฅผ ํ•ด์•ผํ•  ๊ฒฝ์šฐ๊ฐ€ ์žˆ๋‹ค๋Š” ๋œป.

This is what ConnectableFlux is made for. Two main patterns are covered in the Flux API that return a ConnectableFlux: publish and replay.

  • ConnectableFlux ๋ฅผ ์“ฐ๋ฉด ์œ„์™€ ๊ฐ™์€ ๊ฒฝ์šฐ๋ฅผ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค.

package com.fistkim.reactorstudy.hotcold;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;

import java.time.Duration;

public class HotColdTest {

    @Test
    void hotTest_1() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .delayElements(Duration.ofSeconds(1));
        ConnectableFlux<Integer> connectableNumbers = numbers.publish();
        connectableNumbers.connect();
        connectableNumbers.subscribe(num -> System.out.println("subscriber 1 : " + num));
        stopSeconds(5);
        connectableNumbers.subscribe(num -> System.out.println("subscriber 2 : " + num));

        stopSeconds(10);
    }

    @Test
    void autoConnectTest() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .delayElements(Duration.ofSeconds(1));
        Flux<Integer> numbersFlux = numbers.publish().autoConnect(2);
        numbersFlux.subscribe(num -> System.out.println("subscriber 1 : " + num));
        stopSeconds(5);
        numbersFlux.subscribe(num -> System.out.println("subscriber 2 : " + num));

        stopSeconds(10);
    }

    @Test
    void refConnectTest() {
        Flux<Integer> numbers = Flux.range(1, 10)
                .delayElements(Duration.ofSeconds(1));
        Flux<Integer> numbersFlux = numbers.publish().refCount(2);
        var a = numbersFlux.subscribe(number -> System.out.println("subscriber 1 : " + number));
        var b = numbersFlux.subscribe(number -> System.out.println("subscriber 2 : " + number));
        a.dispose();
        b.dispose();

        stopSeconds(10);
    }

    @Test
    void virtualTimerTest() {
        VirtualTimeScheduler.getOrSet();
        Flux<Integer> numbers = Flux.range(1, 10)
                .delayElements(Duration.ofSeconds(1));

        StepVerifier.withVirtualTime(() -> numbers)
                .thenAwait(Duration.ofSeconds(12))
                .expectNextCount(10)
                .verifyComplete();
    }

    private void stopSeconds(Integer second) {
        try {
            Thread.sleep(Long.parseLong(second.toString()) * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Last updated