😀
fistkim TECH BLOG
  • Intro
  • 강의
    • Reactive Programming in Modern Java using Project Reactor
      • Reactor execution model 1
      • Reactor execution model 2
      • Reactor execution model 3 - parallelism
      • Reactor execution model 4 - overview
      • Transform
      • Combine
      • Side Effect Methods
      • Exception/Error handling
      • retry, retryWhen, repeat
      • BackPressure
      • Cold & Hot Streams
    • NEXTSTEP 클린코드 with java 9기
      • 정리노트
    • NEXTSTEP DDD 세레나데 2기
      • CH01 도메인 주도 설계 이해
      • CH02 크게 소리 내어 모델링 하기
      • CH03 도메인 주도 설계 기본 요소
      • CH04 도메인 주도 설계 아키텍처
      • CH05 도메인 이벤트
    • NEXTSTEP 인프라 공방 1기
      • 망 분리하기
      • 통신 확인하기
      • 도커 컨테이너 이해하기
      • [미션 1] 서비스 구성하기 실습
      • [미션 2] 서비스 배포하기 실습
      • 서버 진단하기
      • 어플리케이션 진단하기
      • [미션 3] 서비스 운영하기
      • 웹 성능 진단하기
      • 부하 테스트
      • k6
      • [미션 4] 성능 테스트
      • 리버스 프록시 개선하기
      • 캐싱 활용하기
      • [미션 5] 화면 응답 개선하기
      • Redis Annotation 및 설정
      • 인덱스 이해하기 & DB 튜닝
      • [미션 6-1] 조회 성능 개선하기
      • [미션 6-2] DB 이중화 적용
    • NEXTSTEP 만들면서 배우는 Spring 3기
      • CH01 올바른 방향 바라보기
      • CH02 HTTP 이해 - 웹 서버 구현
        • HTTP 파싱
        • HTTP 웹 서버 구현
      • CH03 MVC - @MVC 프레임워크 구현
        • Servlet 다시 짚기
        • Cookie, Session 다시 짚기
        • MVC 프레임워크 구현
      • CH04 나만의 라이브러리 구현
      • CH05 DI - DI 프레임워크 구현
      • CH06 Aspect OP
    • 스프링 시큐리티
      • 스프링 시큐리티 아키텍처
      • WebAsyncManagerIntegrationFilter
      • SecurityContextPersistenceFilter
      • HeaderWriterFilter
      • CsrfFilter
      • (+) 스프링 시큐리티 + JWT
      • (+) 마치며
    • 더 자바, 코드를 조작하는 다양한 방법
      • CH01 JVM 이해하기
      • (+) 클래스 로더 이해하기
      • CH02 바이트 코드 분석 및 조작
      • (+) jacoco
      • CH03 리플렉션
      • CH04 다이나믹 프록시
      • CH05 애노테이션 프로세서
    • 더 자바, 애플리케이션을 테스트하는 다양한 방법
      • CH01 JUnit 5
      • CH02 Mockito
      • (+) Spy vs Mock
      • CH03 도커와 테스트
      • CH04 성능 테스트
      • (+) VisualVM
      • (+) 테스트 자동화
      • CH05 운영 이슈 테스트
      • CH06 아키텍처 테스트
    • 모든 개발자를 위한 HTTP 웹 기본 지식
      • CH01 인터넷 네트워크
      • CH02 HTTP 기본
      • CH03 HTTP 메서드 속성
      • CH04 HTTP 메서드 활용
      • CH05 HTTP 상태코드
      • CH06 HTTP 헤더1 - 일반 헤더
      • CH07 HTTP 헤더2 - 캐시와 조건부 요청
      • (+) HTTPS 원리
    • 스프링 프레임워크 핵심 기술
      • CH01 IOC 컨테이너
      • CH02 AOP
      • (+) 스프링 의존성 관리
      • (+) 생성자 주입 장점
    • 코딩으로 학습하는 GoF의 디자인 패턴
      • 객체 생성
        • 싱글톤 패턴
        • 팩토리 메소드 패턴
        • 추상 팩토리 패턴
        • 빌더 패턴
        • 프로토타입 패턴
      • 구조
        • 어댑터 패턴
        • 브릿지 패턴
        • 컴포짓 패턴
      • 행동
        • (작성중)
    • 실전 Querydsl
      • CH01 프로젝트 환경구성
      • CH02 예제 도메인 모델
      • CH03 기본문법
      • CH04 중급 문법
      • CH05 실무활용 (스프링 데이터 JPA와 Querydsl)
      • CH06 스프링데이터JPA 가 제공하는 Querydsl 기능
      • (+) 별칭(alias)
      • (+) Slice 쿼리
    • 스프링 데이터 JPA
      • CH01 핵심개념이해 1
      • CH02 핵심개념이해 2
      • CH03 핵심개념이해 3
      • CH04 Spring Data Common
      • CH05 Spring Data JPA
    • 실전! 스프링 부트와 JPA 활용2 - API 개발과 성능 최적화
      • CH01 지연 로딩과 조회 성능 최적화
      • CH02 컬렉션 조회 최적화
      • CH03 전체 정리
    • 초보를 위한 쿠버네티스 안내서
      • CH01 쿠버네티스 시작하기
      • CH02 쿠버네티스 알아보기
      • CH03 쿠버네티스 실습 준비
      • CH04 쿠버네티스 기본 실습
    • Flutter Provider Essential
      • CH01 Introduction
      • CH02 Provider Overview
      • CH03 TODO App
      • CH04 Weather App
      • CH05 Firebase Authentication App
    • Flutter Bloc Essential
      • CH01 Introduction
      • CH02 Bloc Overview
      • CH03 TODO App
      • CH04 Weather App
      • CH05 Firebase Authentication App
    • Flutter Advanced Course - Clean Architecture With MVVM
      • CH01 Introduction
      • CH02 Clean Architecture 4 Layer
      • CH03 MVVM
      • CH04 Data Layer
      • (+) Data Layer - response to model
      • (+) Data Layer - Network
      • CH05 Domain Layer
      • CH06 Presentation Layer
      • CH07 Application Layer
      • (+) Application Layer - l10n
      • (+) Application Layer - DI
      • (+) Application Layer - environment
    • 자바 알고리즘 입문
      • CH01 문자열
      • CH02 Array(1, 2 차원 배열)
      • CH03 Two pointers, Sliding window[효율성: O(n^2)-->O(n)]
      • CH04 HashMap, TreeSet (해쉬, 정렬지원 Set)
      • CH05 Stack, Queue(자료구조)
      • CH06 Sorting and Searching(정렬, 이분검색과 결정알고리즘)
      • CH07 Recursive, Tree, Graph(DFS, BFS 기초)
      • CH08 DFS, BFS 활용
      • CH09 Greedy Algorithm
      • CH10 dynamic programming(동적계획법)
  • 도서
    • 만들면서 배우는 클린 아키텍처
      • 학습목표
      • CH01 계층형 아키텍처의 문제는 무엇일까?
      • CH02 의존성 역전하기
      • CH03 코드 구성하기
      • CH04 유스케이스 구현하기
      • CH05 웹 어댑터 구현하기
      • CH06 영속성 어댑터 구현하기
      • CH07 아키텍처 요소 테스트하기
      • CH08 경계 간 매핑하기
      • CH09 어플리케이션 조립하기
      • CH10 아키텍처 경계 강제하기
      • CH11 의식적으로 지름길 사용하기
      • CH12 아키텍처 스타일 결정하기
    • 클린 아키텍처
      • 들어가며
      • 1부 소개
        • 1장 설계와 아키텍처란?
        • 2장 두 가지 가치에 대한 이야기
      • 2부 벽돌부터 시작하기: 프로그래밍 패러다임
        • 3장 패러다임 개요
        • 4장 구조적 프로그래밍
        • 5장 객체 지향 프로그래밍
        • 6장 함수형 프로그래밍
      • 3부 설계 원칙
        • 7장 SRP: 단일 책임 원칙
        • 8장 OCP: 개방-폐쇄 원칙
        • 9장 LSP: 리스코프 치환 원칙
        • 10장 ISP: 인터페이스 분리 원칙
        • 11장 DIP: 의존성 역전 원칙
      • 4부 컴포넌트 원칙
        • 12장 컴포넌트
        • 13장 컴포넌트 응집도
        • 14장 컴포넌트 결합
      • 5부
        • 15장 아키텍처란?
    • 스프링 입문을 위한 자바 객체 지향의 원리와 이해
      • CH01 사람을 사랑한 기술
      • CH02 자바와 절차적/구조적 프로그래밍
      • CH03 자바와 객체 지향
      • (+) 자바 코드 실행에 따른 메모리 적재과정
      • CH04 자바가 확장한 객체 지향
      • CH05 객체 지향 설계 5 원칙 - SOLID
      • CH06 스프링이 사랑한 디자인 패턴
      • CH07 스프링 삼각형과 설정 정보
      • (부록) 람다(lambda)
    • 객체지향의 사실과 오해
      • CH01 협력하는 객체들의 공동체
      • CH02 이상한 나라의 객체
      • CH03 타입과 추상화
      • CH04 역할, 책임, 협력
      • CH05 책임과 메시지
      • CH06 객체 지도
      • CH07 함께 모으기
      • (+) 인터페이스 개념 바로잡기
    • 도메인 주도 개발 시작하기
      • CH01 도메인 모델 시작하기
      • CH02 아키텍처 개요
      • CH03 애그리거트
      • CH04 리포지터리와 모델 구현
      • CH05 스프링 데이터 JPA를 이용한 조회 기능
      • CH06 응용 서비스와 표현 영역
      • CH07 도메인 서비스
      • CH08 애그리거트 트랜잭션 관리
      • CH09 도메인 모델과 바운디드 컨텍스트
      • CH10 이벤트
      • CH11 CQRS
    • 자바 ORM 표준 JPA 프로그래밍
      • CH01 JPA 소개
      • CH02 JPA 시작
      • CH03 영속성 관리
      • CH04 엔티티 매핑
      • CH05 연관관계 매핑 기초
      • CH06 다양한 연관관계 매핑
      • CH07 고급 매핑
      • CH08 프록시와 연관관계 관리
      • CH09 값 타입
      • CH10 객체지향 쿼리 언어
      • CH11 웹 애플리케이션 제작
      • CH12 스프링 데이터 JPA
      • CH13 웹 애플리케이션과 영속성 관리
      • CH14 컬렉션과 부가 기능
      • CH15 고급 주제와 성능 최적화
      • CH16 트랜잭션과 락, 2차 캐시
    • 소프트웨어 세상을 여는 컴퓨터과학
      • CH01 컴퓨터 과학 소개
      • CH02 데이터 표현과 디지털 논리
    • 이펙티브 자바
      • 1 장 들어가기
      • 2장 객체 생성과 파괴
        • [01] 생성자 대신 정적 팩터리 메서드를 고려하라
        • [02] 생성자에 매개변수가 많다면 빌더를 고려하라
        • [03] private 생성자나 열거 타입으로 싱글턴임을 보증하라
        • [04] 인스턴스화를 막으려거든 private 생성자를 사용하라
        • [05] 자원을 직접 명시하지 말고 의존 객체 주입을 사용하라
        • [06] 불필요한 객체 생성을 피하라
        • [07] 다 쓴 객체 참조를 해제하라
        • [08] finalizer 와 cleaner 사용을 피하라
        • [09] try-finally 보다는 try-with-resources 를 사용하라
      • 3장 모든 객체의 공통 메서드
        • [10] equals는 일반 규약을 지켜 재정의하라
        • [11] equals 를 재정의하려거든 hashCode도 재정의하라
        • [12] toString 을 항상 재정의하라
        • [13] clone 재정의는 주의해서 진행하라
        • [14] Comparable 을 구현할지 고려하라
      • 4장 클래스와 인터페이스
        • [15] 클래스와 멤버의 접근 권한을 최소화하라
  • 토픽
    • 서버 모니터링
      • CPU 사용량
      • 메모리 사용량
      • 스레드 풀
    • Spring Boot Monitoring
      • Spring actuator
      • Spring eureka
      • Prometheus
      • grafana
      • Spring actuator + Prometheus + grafana
    • JAVA 데일리 토픽
      • 메모리 누수(memory leak)
      • 객체 참조의 유형
      • 커스텀 스레드 풀
      • Mark And Compact
      • serialVersionUID 이해하기
      • 함수형 인터페이스
      • 메소드 참조
      • equals()와 hashCode()가 무엇이고 역할이 무엇인지
      • StringBuffer vs StringBuilder
      • String vs StringBuilder, StringBuffer
      • String interning
    • JAVA GC
    • 프로그래머스 문제 풀기
      • 해시
      • 스택/큐
      • 힙(Heap)
      • 정렬
      • 완전탐색
      • DFS/BFS
    • 데이터베이스 구성 및 작동 흐름
    • 데이터베이스 JOIN 원리
    • 객체지향생활체조 원칙
    • 상태(state), 상속(inheritance), 합성(composition) 의 상관관계
    • java enum은 메모리에 언제, 어떻게 할당되는가
    • Checked Exception vs UnChecked Exception
    • Reactive Streams 원리탐구 - 간단한 예제 직접 작성해보기
    • Flutter Basic
    • Flutter StatefulWidget 생명주기
    • Flutter 가 위젯을 그리는 원리
    • Flutter 클린 아키텍처
      • application layer
        • 패키지 구조 및 레이어 설명
        • environment
        • dependency injection
        • go_router
        • foreground & background
        • 다국어처리 (l10n, i18n)
        • Global 처리(시스템 점검, fore->back 등)
        • connection_manager
        • permission_manager
        • push_notification_manager
        • firebase 연동
      • data layer
        • 패키지 구조 및 레이어 설명
        • network
        • repository
      • domain layer
        • 패키지 구조 및 레이어 설명
      • presentation layer
        • 패키지 구조 및 레이어 설명
        • resources
    • 기술 관련 포스팅 읽기
  • 기타
    • 작업일지
      • 2023. 10
      • 2023. 09
      • 2023. 08
      • 2023. 07
      • 2023. 06
      • 2023. 05
      • 2023. 04
      • 2023. 03
      • 2023. 02
      • 2023. 01
      • 2022. 12
    • Business Model
      • 아이디어 불패의 법칙
      • 린 모바일 앱 개발
      • 린 스타트업
      • 제로투원
      • MIT 스타트업 바이블
      • 린치핀
    • 백로그 종합
Powered by GitBook
On this page
  • Reactor Threading and Schedulers
  • 기본적으로 subscribe()한 thread가 whole pipeline execution을 수행한다
  • subscribeOn() vs publishOn()
  • subscribeOn()
  • publishOn()
  1. 강의
  2. Reactive Programming in Modern Java using Project Reactor

Reactor execution model 2

Reactor 스레드 동작 탐구

PreviousReactor execution model 1NextReactor execution model 3 - parallelism

Last updated 1 year ago

Reactor Threading and Schedulers

Reactor의 개요에서도 이미 확인했듯이 Reactor의 실행 모델을 이해하는 것의 핵심은 내부적인 스레드 동작에 대한 파악에 있다고 생각한다.

마침 이에 대해서 정리가 잘된 글()을 발견하여 이를 보면서 이론적인 정리를 하고, 카카오 Tech 에 올라온 Reactor 관련 포스팅()을 통해서 실습하며 이론을 확인해본다.

기본적으로 subscribe()한 thread가 whole pipeline execution을 수행한다

위 소제목이 핵심적인 원리이다. 아래 코드를 보자.

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
            .map(String::toUpperCase)
            .filter(cityName -> cityName.length() <= 8)
            .map(cityName -> cityName.concat(" City"))
            .log();

    cities.subscribe();

  }
}
INFO 14040 --- [main] reactor.Flux.MapFuseable.1  : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
INFO 14040 --- [main] reactor.Flux.MapFuseable.1  : | request(unbounded)
INFO 14040 --- [main] reactor.Flux.MapFuseable.1  : | onNext(NEW YORK City)
INFO 14040 --- [main] reactor.Flux.MapFuseable.1  : | onNext(LONDON City)
INFO 14040 --- [main] reactor.Flux.MapFuseable.1  : | onNext(PARIS City)
INFO 14040 --- [main] reactor.Flux.MapFuseable.1  : | onComplete()

위 예제코드를 보면 main thread가 구독부터 발행까지 전체 파이프라인을 책임지고 수행하고 있다.

"The same thread that performs a subscription will be used for the whole pipeline execution."

이미 앞서 살펴 보았듯이 비동기, 논블로킹의 동작을 통해 얻을 수 있는 장점을 고려해본다면, 위와 같이 하나의 스레드가 모든 것을 처리하는 경우에는 성능에 이점이 없다.

이러한 구독의 흐름에서 특정한 Flux 또는 Mono에 대해 처음 구독을 시작한 thread가 아니라 이를 처리할 thread를 따로 분기하여 담당하게 만들면 수행 속도가 더 빨라질 수 있다. 이 때 특정한 pool을 지정해주고 subscribe 혹은 publish시 thread를 거기서 꺼내오게 지정해줄 수 있다.

이 thread pool에 대해서 Reactor 에서는 Schdulers 라는 Factory 클래스를 제공한다. 이를 이용하면 Flux 또는 Mono 수행에 사용되는 thread 를 switch 할 수 있다. 아래는 Schedulers의 종류들이다.

  • Schedulers.parallel() – It has a fixed pool of workers. The number of threads is equivalent to the number of CPU cores.

  • Schedulers.boundElastic() – It has a bounded elastic thread pool of workers. The number of threads can grow based on the need. The number of threads can be much bigger than the number of CPU cores. Used mainly for making blocking IO calls.

  • Schedulers.single() – Reuses the same thread for all callers.

그리고 아래 method 들을 이용해서 Reactor 가 어떤 Schedulers 들을 쓸지를 명령 할 수 있다.

  • The publishOn method

  • The subscribeOn method

subscribeOn() vs publishOn()

subscribeOn()

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
            .subscribeOn(Schedulers.boundedElastic())
            .map(String::toUpperCase)
            .filter(cityName -> cityName.length() <= 8)
            .map(cityName -> cityName.concat(" City"))
            .log();

    cities.subscribe();

  }
}
Output:
INFO 7500 --- [main] reactor.Flux.Map.1  : onSubscribe(FluxMap.MapSubscriber)
INFO 7500 --- [main] reactor.Flux.Map.1  : request(unbounded)
INFO 7500 --- [boundedElastic-1] reactor.Flux.Map.1  : onNext(NEW YORK City)
INFO 7500 --- [boundedElastic-1] reactor.Flux.Map.1  : onNext(LONDON City)
INFO 7500 --- [boundedElastic-1] reactor.Flux.Map.1  : onNext(PARIS City)
INFO 7500 --- [boundedElastic-1] reactor.Flux.Map.1  : onComplete()

위와 같이 subscribe()를 하고 onSubscribe() 를 수행하며 Subscription을 넘기면서 request()를 수행하는 것 까지 구독을 시작한 thread인 main 이 수행하고, 그 이후부터는 다른 thread가 처리를 담당한 것을 확인할 수 있다.

publishOn()

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just("New York", "London", "Paris", "Amsterdam")
            .map(ReactiveJavaTutorial::stringToUpperCase)
            .publishOn(Schedulers.boundedElastic())
            .map(ReactiveJavaTutorial::concat)
            .subscribe();
  }

  private static String stringToUpperCase(String name) {
    System.out.println("stringToUpperCase: " + Thread.currentThread().getName());
    return name.toUpperCase();
  }

  private static String concat(String name) {
    System.out.println("concat: " + Thread.currentThread().getName());
    return name.concat(" City");
  }
}
stringToUpperCase: main
stringToUpperCase: main
stringToUpperCase: main
concat: boundedElastic-1
concat: boundedElastic-1
concat: boundedElastic-1

publishOn()을 만나기 직전까지는 구독을 시작한 thread에 의해서 처리되다가, publishOn을 만난 직후부터 처리를 담당해주는 thread 가 변경된 것을 확인할 수 있다.

publishOn()은 operator와 유사하게 수행이 된다고 보면 된다. up-stream과 down-stream 사이에 존재하면서 명시된 Schedulers의 pool 에서 thread를 꺼내와 이를 처리하도록 thread 를 switch 해준다.

subscribeOn()과 publishOn()의 가장 큰 차이점은 publishOn()은 operator 처럼 pipe line 어디에든 원하는 곳에 넣어서 사용할 수 있고, subscribeOn()은 어디에 위치하든 whole reactive chain 에 적용이 된다는 것이다.

"That is the main difference between the subscribeOn and publishOn operators since the subscribeOn will apply the provided Scheduler to the whole reactive chain, no matter where we placed it."

보면 subscribe() 를 시작한 thread가 op1, op2 까지 처리를 하고 publishOn 이후부터 thread 가 바뀌는 것을 볼 수 있다.

subscribeOn 은 동작하는 것이 약간 다르다. 위 publishOn과는 달리 숫자의 순서가 subscribe가 1이 아닌 것을 알 수 있다. Flux를 구성할때 안에 subscribeOn 이 있으면 무조건 subscribe 를 시작할때 subscribeOn이 지정해준 thread 로 시작을 한다는 의미이다.

그래서 subscribeOn은 reactive chain 어디에 있든지 간에 전체 pipe에 영향을 줄 수 밖에 없는 것이다.

이건 subscribeOn 과 publishOn이 섞인 경우이다. 이거 그림의 숫자가 좀 잘못된 것 같은데 영상에서는 이렇게 나오긴 했다. 아무튼 subscribeOn 이 op1의 앞에 있든, op2의 뒤에 있든, op1과 op2의 중간에 있든 publishOn을 만나기 전까지의 chain 전체에는 subscribeOn에서 명시한 thread로 동작하고 publishOn이후는 publishOn에서 명시한 thread가 처리를 하며 subscribe 내부의 로직은 결국 구독이 되어 나온 시점이므로 publishOn에서 명시한 thread가 이를 처리하는게 맞다.

publishOn() 활용 예제

public class ExecutionModelTest {

    @Test
    void publishOnTest() {

        Flux<String> alphabet1 = Flux.fromIterable(List.of("a", "b", "c"))
                .map(this::toUpperCase);
        Flux<String> alphabet2 = Flux.fromIterable(List.of("d", "e", "f"))
                .map(this::toUpperCase);

        StepVerifier.create(alphabet1.mergeWith(alphabet2).log())
                .expectNextCount(6)
                .verifyComplete();
    }

    @Test
    void publishOnTestAsync() {

        Flux<String> alphabet1 = Flux.fromIterable(List.of("a", "b", "c"))
                .publishOn(Schedulers.parallel())
                .map(this::toUpperCase);
        Flux<String> alphabet2 = Flux.fromIterable(List.of("d", "e", "f"))
                .publishOn(Schedulers.parallel())
                .map(this::toUpperCase);

        StepVerifier.create(alphabet1.mergeWith(alphabet2).log())
                .expectNextCount(6)
                .verifyComplete();
    }

    private String toUpperCase(String value) {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return value.toUpperCase();
    }

}
/Users/jungkwonkim/Library/Java/JavaVirtualMachines/corretto-11.0.15/Contents/Home/bin/java -ea -Didea.test.cyclic.buffer.size=1048576 -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=54498:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/jungkwonkim/.m2/repository/org/junit/platform/junit-platform-launcher/1.8.2/junit-platform-launcher-1.8.2.jar:/Users/jungkwonkim/.m2/repository/org/junit/platform/junit-platform-engine/1.8.2/junit-platform-engine-1.8.2.jar:/Users/jungkwonkim/.m2/repository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar:/Users/jungkwonkim/.m2/repository/org/junit/platform/junit-platform-commons/1.8.2/junit-platform-commons-1.8.2.jar:/Users/jungkwonkim/.m2/repository/org/apiguardian/apiguardian-api/1.1.2/apiguardian-api-1.1.2.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar:/Applications/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit5-rt.jar:/Applications/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit-rt.jar:/Users/jungkwonkim/Lab/App/reactor-study/build/classes/java/test:/Users/jungkwonkim/Lab/App/reactor-study/build/classes/java/main:/Users/jungkwonkim/Lab/App/reactor-study/build/resources/main:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/io.projectreactor/reactor-test/3.4.0/ad22a711534639a5f34dd696594c7a1ade5f8e83/reactor-test-3.4.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/io.projectreactor/reactor-core/3.4.0/683c9c676c438e5945b0f12808575f22160c5e54/reactor-core-3.4.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-test/2.6.7/8fe7761bc8609b832d8cc1104ff59a0512ef9aaf/spring-boot-starter-test-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter/2.6.7/59bd62cd60e6e7bb2a525931947c13548b207288/spring-boot-starter-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.7.0-Beta/836df62ad6bfc8a2ed7d4ba59a49bf2d251a75b5/kotlin-stdlib-jdk8-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter/5.5.1/4e3393c2238d1cf48d29b3c79f0c0928e873bc11/junit-jupiter-5.5.1.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.reactivestreams/reactive-streams/1.0.3/d9fb7a7926ffa635b3dcaa5049fb2bfa25b3e7d0/reactive-streams-1.0.3.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-test-autoconfigure/2.6.7/b95d7d094604e315cbd0c7746d700acdddeee95d/spring-boot-test-autoconfigure-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-test/2.6.7/78741985ff50aecc5cd2776c74804a288ffc208d/spring-boot-test-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-test/5.3.19/84d0f06bfe2433173880240808203b69bc40244/spring-test-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-core/5.3.19/344ff3b291d7fdfdb08e865f26238a6caa86acc5/spring-core-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/com.jayway.jsonpath/json-path/2.6.0/67f565b424f7903a12d4f5b9361b11462ecacdac/json-path-2.6.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/jakarta.xml.bind/jakarta.xml.bind-api/2.3.3/48e3b9cfc10752fba3521d6511f4165bea951801/jakarta.xml.bind-api-2.3.3.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.assertj/assertj-core/3.21.0/27a14d6d22c4e3d58f799fb2a5ca8eaf53e6942a/assertj-core-3.21.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.hamcrest/hamcrest/2.2/1820c0968dba3a11a1b30669bb1f01978a91dedc/hamcrest-2.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.mockito/mockito-junit-jupiter/4.0.0/b76de25bd6e5d8f7924d0536729c0076e37e9396/mockito-junit-jupiter-4.0.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.mockito/mockito-core/4.0.0/f5195e0c4a45716bbd2d1d29173adbd148acce3a/mockito-core-4.0.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.skyscreamer/jsonassert/1.5.0/6c9d5fe2f59da598d9aefc1cfc6528ff3cf32df3/jsonassert-1.5.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.xmlunit/xmlunit-core/2.8.4/35be57989ca80eefa03161b211630e319a8f36c6/xmlunit-core-2.8.4.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-autoconfigure/2.6.7/d96e51e7a13f16c07f60654071f157f99a0b99be/spring-boot-autoconfigure-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot/2.6.7/aef2fff50d49feb43a0726ab7e4945914186e06/spring-boot-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-logging/2.6.7/b49b159bb93086c76fa42555ca074c7ebe9d5fe6/spring-boot-starter-logging-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/jakarta.annotation/jakarta.annotation-api/1.3.5/59eb84ee0d616332ff44aba065f3888cf002cd2d/jakarta.annotation-api-1.3.5.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.yaml/snakeyaml/1.29/6d0cdafb2010f1297e574656551d7145240f6e25/snakeyaml-1.29.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.7.0-Beta/ab6a690623c06efecd79fd7774de41a29c792d9d/kotlin-stdlib-jdk7-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib/1.7.0-Beta/fb95185efb95996ea0f51dd7aa3e746f5f697236/kotlin-stdlib-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter-params/5.8.2/ddeafe92fc263f895bfb73ffeca7fd56e23c2cce/junit-jupiter-params-5.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter-api/5.8.2/4c21029217adf07e4c0d0c5e192b6bf610c94bdc/junit-jupiter-api-5.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-jcl/5.3.19/eae3d8b8728133782d9e41f9b3ecbd19a4146114/spring-jcl-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.minidev/json-smart/2.4.8/7c62f5f72ab05eb54d40e2abf0360a2fe9ea477f/json-smart-2.4.8.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.36/6c62681a2f655b49963a5983b8b0950a6120ae14/slf4j-api-1.7.36.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/jakarta.activation/jakarta.activation-api/1.2.2/99f53adba383cb1bf7c3862844488574b559621f/jakarta.activation-api-1.2.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy/1.11.22/8b4c7fa5562a09da1c2a9ab0873cb51f5034d83f/byte-buddy-1.11.22.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.11.22/2fbcf3210dfc09b42242e3b66a5281cc5b9adb80/byte-buddy-agent-1.11.22.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/com.vaadin.external.google/android-json/0.0.20131108.vaadin1/fa26d351fe62a6a17f5cda1287c1c6110dec413f/android-json-0.0.20131108.vaadin1.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-context/5.3.19/d663259767f8fb66229209db0422d65978235525/spring-context-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-classic/1.2.11/4741689214e9d1e8408b206506cbe76d1c6a7d60/logback-classic-1.2.11.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-to-slf4j/2.17.2/17dd0fae2747d9a28c67bc9534108823d2376b46/log4j-to-slf4j-2.17.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.slf4j/jul-to-slf4j/1.7.36/ed46d81cef9c412a88caef405b58f93a678ff2ca/jul-to-slf4j-1.7.36.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-common/1.7.0-Beta/5339108aef3a69a5030701ff3ebe9b811dfc54ac/kotlin-stdlib-common-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains/annotations/13.0/919f0dfe192fb4e063e7dacadee7f8bb9a2672a9/annotations-13.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.apiguardian/apiguardian-api/1.1.2/a231e0d844d2721b0fa1b238006d15c6ded6842a/apiguardian-api-1.1.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.platform/junit-platform-commons/1.8.2/32c8b8617c1342376fd5af2053da6410d8866861/junit-platform-commons-1.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.opentest4j/opentest4j/1.2.0/28c11eb91f9b6d8e200631d46e20a7f407f2a046/opentest4j-1.2.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.minidev/accessors-smart/2.4.8/6e1bee5a530caba91893604d6ab41d0edcecca9a/accessors-smart-2.4.8.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-aop/5.3.19/339b0ad295a7af51ac716bd89d36a4fb0febaf5d/spring-aop-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-beans/5.3.19/4bc68c392ed320c9ab5dc439d7f2deb83f03fe76/spring-beans-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-expression/5.3.19/14346b7b84721f61d2b23d3c8baa60c6655527a6/spring-expression-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-core/1.2.11/a01230df5ca5c34540cdaa3ad5efb012f1f1f792/logback-core-1.2.11.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-api/2.17.2/f42d6afa111b4dec5d2aea0fe2197240749a4ea6/log4j-api-2.17.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.ow2.asm/asm/9.1/a99500cf6eea30535eeac6be73899d048f8d12a8/asm-9.1.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter-engine/5.8.2/c598b4328d2f397194d11df3b1648d68d7d990e3/junit-jupiter-engine-5.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.objenesis/objenesis/3.2/7fadf57620c8b8abdf7519533e5527367cb51f09/objenesis-3.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.platform/junit-platform-engine/1.8.2/b737de09f19864bd136805c84df7999a142fec29/junit-platform-engine-1.8.2.jar com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit5 com.fistkim.reactorstudy.execution.ExecutionModelTest,publishOnTest
07:06:44.856 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
07:06:44.889 [main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
07:06:44.895 [main] INFO reactor.Flux.Merge.1 - request(unbounded)
07:06:45.906 [main] INFO reactor.Flux.Merge.1 - onNext(A)
07:06:46.912 [main] INFO reactor.Flux.Merge.1 - onNext(B)
07:06:47.915 [main] INFO reactor.Flux.Merge.1 - onNext(C)
07:06:48.921 [main] INFO reactor.Flux.Merge.1 - onNext(D)
07:06:49.924 [main] INFO reactor.Flux.Merge.1 - onNext(E)
07:06:50.930 [main] INFO reactor.Flux.Merge.1 - onNext(F)
07:06:50.933 [main] INFO reactor.Flux.Merge.1 - onComplete()

Process finished with exit code 0
/Users/jungkwonkim/Library/Java/JavaVirtualMachines/corretto-11.0.15/Contents/Home/bin/java -ea -Didea.test.cyclic.buffer.size=1048576 -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=54501:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/jungkwonkim/.m2/repository/org/junit/platform/junit-platform-launcher/1.8.2/junit-platform-launcher-1.8.2.jar:/Users/jungkwonkim/.m2/repository/org/junit/platform/junit-platform-engine/1.8.2/junit-platform-engine-1.8.2.jar:/Users/jungkwonkim/.m2/repository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar:/Users/jungkwonkim/.m2/repository/org/junit/platform/junit-platform-commons/1.8.2/junit-platform-commons-1.8.2.jar:/Users/jungkwonkim/.m2/repository/org/apiguardian/apiguardian-api/1.1.2/apiguardian-api-1.1.2.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar:/Applications/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit5-rt.jar:/Applications/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit-rt.jar:/Users/jungkwonkim/Lab/App/reactor-study/build/classes/java/test:/Users/jungkwonkim/Lab/App/reactor-study/build/classes/java/main:/Users/jungkwonkim/Lab/App/reactor-study/build/resources/main:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/io.projectreactor/reactor-test/3.4.0/ad22a711534639a5f34dd696594c7a1ade5f8e83/reactor-test-3.4.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/io.projectreactor/reactor-core/3.4.0/683c9c676c438e5945b0f12808575f22160c5e54/reactor-core-3.4.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-test/2.6.7/8fe7761bc8609b832d8cc1104ff59a0512ef9aaf/spring-boot-starter-test-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter/2.6.7/59bd62cd60e6e7bb2a525931947c13548b207288/spring-boot-starter-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.7.0-Beta/836df62ad6bfc8a2ed7d4ba59a49bf2d251a75b5/kotlin-stdlib-jdk8-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter/5.5.1/4e3393c2238d1cf48d29b3c79f0c0928e873bc11/junit-jupiter-5.5.1.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.reactivestreams/reactive-streams/1.0.3/d9fb7a7926ffa635b3dcaa5049fb2bfa25b3e7d0/reactive-streams-1.0.3.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-test-autoconfigure/2.6.7/b95d7d094604e315cbd0c7746d700acdddeee95d/spring-boot-test-autoconfigure-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-test/2.6.7/78741985ff50aecc5cd2776c74804a288ffc208d/spring-boot-test-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-test/5.3.19/84d0f06bfe2433173880240808203b69bc40244/spring-test-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-core/5.3.19/344ff3b291d7fdfdb08e865f26238a6caa86acc5/spring-core-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/com.jayway.jsonpath/json-path/2.6.0/67f565b424f7903a12d4f5b9361b11462ecacdac/json-path-2.6.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/jakarta.xml.bind/jakarta.xml.bind-api/2.3.3/48e3b9cfc10752fba3521d6511f4165bea951801/jakarta.xml.bind-api-2.3.3.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.assertj/assertj-core/3.21.0/27a14d6d22c4e3d58f799fb2a5ca8eaf53e6942a/assertj-core-3.21.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.hamcrest/hamcrest/2.2/1820c0968dba3a11a1b30669bb1f01978a91dedc/hamcrest-2.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.mockito/mockito-junit-jupiter/4.0.0/b76de25bd6e5d8f7924d0536729c0076e37e9396/mockito-junit-jupiter-4.0.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.mockito/mockito-core/4.0.0/f5195e0c4a45716bbd2d1d29173adbd148acce3a/mockito-core-4.0.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.skyscreamer/jsonassert/1.5.0/6c9d5fe2f59da598d9aefc1cfc6528ff3cf32df3/jsonassert-1.5.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.xmlunit/xmlunit-core/2.8.4/35be57989ca80eefa03161b211630e319a8f36c6/xmlunit-core-2.8.4.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-autoconfigure/2.6.7/d96e51e7a13f16c07f60654071f157f99a0b99be/spring-boot-autoconfigure-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot/2.6.7/aef2fff50d49feb43a0726ab7e4945914186e06/spring-boot-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-logging/2.6.7/b49b159bb93086c76fa42555ca074c7ebe9d5fe6/spring-boot-starter-logging-2.6.7.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/jakarta.annotation/jakarta.annotation-api/1.3.5/59eb84ee0d616332ff44aba065f3888cf002cd2d/jakarta.annotation-api-1.3.5.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.yaml/snakeyaml/1.29/6d0cdafb2010f1297e574656551d7145240f6e25/snakeyaml-1.29.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.7.0-Beta/ab6a690623c06efecd79fd7774de41a29c792d9d/kotlin-stdlib-jdk7-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib/1.7.0-Beta/fb95185efb95996ea0f51dd7aa3e746f5f697236/kotlin-stdlib-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter-params/5.8.2/ddeafe92fc263f895bfb73ffeca7fd56e23c2cce/junit-jupiter-params-5.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter-api/5.8.2/4c21029217adf07e4c0d0c5e192b6bf610c94bdc/junit-jupiter-api-5.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-jcl/5.3.19/eae3d8b8728133782d9e41f9b3ecbd19a4146114/spring-jcl-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.minidev/json-smart/2.4.8/7c62f5f72ab05eb54d40e2abf0360a2fe9ea477f/json-smart-2.4.8.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.36/6c62681a2f655b49963a5983b8b0950a6120ae14/slf4j-api-1.7.36.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/jakarta.activation/jakarta.activation-api/1.2.2/99f53adba383cb1bf7c3862844488574b559621f/jakarta.activation-api-1.2.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy/1.11.22/8b4c7fa5562a09da1c2a9ab0873cb51f5034d83f/byte-buddy-1.11.22.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.11.22/2fbcf3210dfc09b42242e3b66a5281cc5b9adb80/byte-buddy-agent-1.11.22.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/com.vaadin.external.google/android-json/0.0.20131108.vaadin1/fa26d351fe62a6a17f5cda1287c1c6110dec413f/android-json-0.0.20131108.vaadin1.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-context/5.3.19/d663259767f8fb66229209db0422d65978235525/spring-context-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-classic/1.2.11/4741689214e9d1e8408b206506cbe76d1c6a7d60/logback-classic-1.2.11.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-to-slf4j/2.17.2/17dd0fae2747d9a28c67bc9534108823d2376b46/log4j-to-slf4j-2.17.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.slf4j/jul-to-slf4j/1.7.36/ed46d81cef9c412a88caef405b58f93a678ff2ca/jul-to-slf4j-1.7.36.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-common/1.7.0-Beta/5339108aef3a69a5030701ff3ebe9b811dfc54ac/kotlin-stdlib-common-1.7.0-Beta.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.jetbrains/annotations/13.0/919f0dfe192fb4e063e7dacadee7f8bb9a2672a9/annotations-13.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.apiguardian/apiguardian-api/1.1.2/a231e0d844d2721b0fa1b238006d15c6ded6842a/apiguardian-api-1.1.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.platform/junit-platform-commons/1.8.2/32c8b8617c1342376fd5af2053da6410d8866861/junit-platform-commons-1.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.opentest4j/opentest4j/1.2.0/28c11eb91f9b6d8e200631d46e20a7f407f2a046/opentest4j-1.2.0.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/net.minidev/accessors-smart/2.4.8/6e1bee5a530caba91893604d6ab41d0edcecca9a/accessors-smart-2.4.8.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-aop/5.3.19/339b0ad295a7af51ac716bd89d36a4fb0febaf5d/spring-aop-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-beans/5.3.19/4bc68c392ed320c9ab5dc439d7f2deb83f03fe76/spring-beans-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.springframework/spring-expression/5.3.19/14346b7b84721f61d2b23d3c8baa60c6655527a6/spring-expression-5.3.19.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-core/1.2.11/a01230df5ca5c34540cdaa3ad5efb012f1f1f792/logback-core-1.2.11.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-api/2.17.2/f42d6afa111b4dec5d2aea0fe2197240749a4ea6/log4j-api-2.17.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.ow2.asm/asm/9.1/a99500cf6eea30535eeac6be73899d048f8d12a8/asm-9.1.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.jupiter/junit-jupiter-engine/5.8.2/c598b4328d2f397194d11df3b1648d68d7d990e3/junit-jupiter-engine-5.8.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.objenesis/objenesis/3.2/7fadf57620c8b8abdf7519533e5527367cb51f09/objenesis-3.2.jar:/Users/jungkwonkim/.gradle/caches/modules-2/files-2.1/org.junit.platform/junit-platform-engine/1.8.2/b737de09f19864bd136805c84df7999a142fec29/junit-platform-engine-1.8.2.jar com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit5 com.fistkim.reactorstudy.execution.ExecutionModelTest,publishOnTestAsync
07:07:03.163 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
07:07:03.206 [main] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
07:07:03.209 [main] INFO reactor.Flux.Merge.1 - request(unbounded)
07:07:04.241 [parallel-1] INFO reactor.Flux.Merge.1 - onNext(A)
07:07:04.250 [parallel-2] INFO reactor.Flux.Merge.1 - onNext(D)
07:07:05.244 [parallel-1] INFO reactor.Flux.Merge.1 - onNext(B)
07:07:05.256 [parallel-2] INFO reactor.Flux.Merge.1 - onNext(E)
07:07:06.250 [parallel-1] INFO reactor.Flux.Merge.1 - onNext(C)
07:07:06.261 [parallel-2] INFO reactor.Flux.Merge.1 - onNext(F)
07:07:06.267 [parallel-2] INFO reactor.Flux.Merge.1 - onComplete()

Process finished with exit code 0

각각 publishOnTest(), publishOnTestAsync() 에 대한 로그이다. 일부러 toUpperCast()에서 delay를 1초씩 발생시켰다.

먼저 publishOnTest() 는 6초 정도 걸렸다. 왜냐하면 각 알파벳을 처리하는데 1초 이상이 소요되는데 publishOnTest()는 따로 스레드를 할당해주지 않아서 subscribe()를 시작한 main 스레드 하나가 이를 모두 처리해야하기 때문에 모든 delay를 main 스레드 하나가 모두 감내해야 하기 때문이다.

반면에 publishOnTestAsync()는 3초 정도 걸렸다. 왜냐하면 alpabet1, alpabet2 각각에 .publishOn(Schedulers.parallel())을 설정해줌으로써 각각 병렬적으로 다른 스레드로 처리하도록 지시해뒀기 때문에 두 flux가 병렬처리 되도록 했다. 이를 subscribeOn()으로 바꿔줘도 동일한 결과를 볼 수 있다. 로그를 보면 request() 까지 main 스레드가 처리하고 그 뒤 작업부터 parallel 스레드가 처리한 것을 확인할 수 있다.

이 부분에 대해서 그림으로 너무 잘 설명해놓은 자료가 있어 첨부한다. 모든 그림은 이 유투브()에서 가져왔다. 20분 40초 부터 해당 부분에 관련된 내용이 나온다.

https://alegrucoding.com/reactor-execution-model-threading-and-schedulers/
https://tech.kakao.com/2018/05/29/reactor-programming/
https://www.youtube.com/watch?v=hfupNIxzNP4&t=2194s