😀
fistkim TECH BLOG
  • Intro
  • 강의
    • Reactive Programming in Modern Java using Project Reactor
      • Reactor execution model 1
      • Reactor execution model 2
      • Reactor execution model 3 - parallelism
      • Reactor execution model 4 - overview
      • Transform
      • Combine
      • Side Effect Methods
      • Exception/Error handling
      • retry, retryWhen, repeat
      • BackPressure
      • Cold & Hot Streams
    • NEXTSTEP 클린코드 with java 9기
      • 정리노트
    • NEXTSTEP DDD 세레나데 2기
      • CH01 도메인 주도 설계 이해
      • CH02 크게 소리 내어 모델링 하기
      • CH03 도메인 주도 설계 기본 요소
      • CH04 도메인 주도 설계 아키텍처
      • CH05 도메인 이벤트
    • NEXTSTEP 인프라 공방 1기
      • 망 분리하기
      • 통신 확인하기
      • 도커 컨테이너 이해하기
      • [미션 1] 서비스 구성하기 실습
      • [미션 2] 서비스 배포하기 실습
      • 서버 진단하기
      • 어플리케이션 진단하기
      • [미션 3] 서비스 운영하기
      • 웹 성능 진단하기
      • 부하 테스트
      • k6
      • [미션 4] 성능 테스트
      • 리버스 프록시 개선하기
      • 캐싱 활용하기
      • [미션 5] 화면 응답 개선하기
      • Redis Annotation 및 설정
      • 인덱스 이해하기 & DB 튜닝
      • [미션 6-1] 조회 성능 개선하기
      • [미션 6-2] DB 이중화 적용
    • NEXTSTEP 만들면서 배우는 Spring 3기
      • CH01 올바른 방향 바라보기
      • CH02 HTTP 이해 - 웹 서버 구현
        • HTTP 파싱
        • HTTP 웹 서버 구현
      • CH03 MVC - @MVC 프레임워크 구현
        • Servlet 다시 짚기
        • Cookie, Session 다시 짚기
        • MVC 프레임워크 구현
      • CH04 나만의 라이브러리 구현
      • CH05 DI - DI 프레임워크 구현
      • CH06 Aspect OP
    • 스프링 시큐리티
      • 스프링 시큐리티 아키텍처
      • WebAsyncManagerIntegrationFilter
      • SecurityContextPersistenceFilter
      • HeaderWriterFilter
      • CsrfFilter
      • (+) 스프링 시큐리티 + JWT
      • (+) 마치며
    • 더 자바, 코드를 조작하는 다양한 방법
      • CH01 JVM 이해하기
      • (+) 클래스 로더 이해하기
      • CH02 바이트 코드 분석 및 조작
      • (+) jacoco
      • CH03 리플렉션
      • CH04 다이나믹 프록시
      • CH05 애노테이션 프로세서
    • 더 자바, 애플리케이션을 테스트하는 다양한 방법
      • CH01 JUnit 5
      • CH02 Mockito
      • (+) Spy vs Mock
      • CH03 도커와 테스트
      • CH04 성능 테스트
      • (+) VisualVM
      • (+) 테스트 자동화
      • CH05 운영 이슈 테스트
      • CH06 아키텍처 테스트
    • 모든 개발자를 위한 HTTP 웹 기본 지식
      • CH01 인터넷 네트워크
      • CH02 HTTP 기본
      • CH03 HTTP 메서드 속성
      • CH04 HTTP 메서드 활용
      • CH05 HTTP 상태코드
      • CH06 HTTP 헤더1 - 일반 헤더
      • CH07 HTTP 헤더2 - 캐시와 조건부 요청
      • (+) HTTPS 원리
    • 스프링 프레임워크 핵심 기술
      • CH01 IOC 컨테이너
      • CH02 AOP
      • (+) 스프링 의존성 관리
      • (+) 생성자 주입 장점
    • 코딩으로 학습하는 GoF의 디자인 패턴
      • 객체 생성
        • 싱글톤 패턴
        • 팩토리 메소드 패턴
        • 추상 팩토리 패턴
        • 빌더 패턴
        • 프로토타입 패턴
      • 구조
        • 어댑터 패턴
        • 브릿지 패턴
        • 컴포짓 패턴
      • 행동
        • (작성중)
    • 실전 Querydsl
      • CH01 프로젝트 환경구성
      • CH02 예제 도메인 모델
      • CH03 기본문법
      • CH04 중급 문법
      • CH05 실무활용 (스프링 데이터 JPA와 Querydsl)
      • CH06 스프링데이터JPA 가 제공하는 Querydsl 기능
      • (+) 별칭(alias)
      • (+) Slice 쿼리
    • 스프링 데이터 JPA
      • CH01 핵심개념이해 1
      • CH02 핵심개념이해 2
      • CH03 핵심개념이해 3
      • CH04 Spring Data Common
      • CH05 Spring Data JPA
    • 실전! 스프링 부트와 JPA 활용2 - API 개발과 성능 최적화
      • CH01 지연 로딩과 조회 성능 최적화
      • CH02 컬렉션 조회 최적화
      • CH03 전체 정리
    • 초보를 위한 쿠버네티스 안내서
      • CH01 쿠버네티스 시작하기
      • CH02 쿠버네티스 알아보기
      • CH03 쿠버네티스 실습 준비
      • CH04 쿠버네티스 기본 실습
    • Flutter Provider Essential
      • CH01 Introduction
      • CH02 Provider Overview
      • CH03 TODO App
      • CH04 Weather App
      • CH05 Firebase Authentication App
    • Flutter Bloc Essential
      • CH01 Introduction
      • CH02 Bloc Overview
      • CH03 TODO App
      • CH04 Weather App
      • CH05 Firebase Authentication App
    • Flutter Advanced Course - Clean Architecture With MVVM
      • CH01 Introduction
      • CH02 Clean Architecture 4 Layer
      • CH03 MVVM
      • CH04 Data Layer
      • (+) Data Layer - response to model
      • (+) Data Layer - Network
      • CH05 Domain Layer
      • CH06 Presentation Layer
      • CH07 Application Layer
      • (+) Application Layer - l10n
      • (+) Application Layer - DI
      • (+) Application Layer - environment
    • 자바 알고리즘 입문
      • CH01 문자열
      • CH02 Array(1, 2 차원 배열)
      • CH03 Two pointers, Sliding window[효율성: O(n^2)-->O(n)]
      • CH04 HashMap, TreeSet (해쉬, 정렬지원 Set)
      • CH05 Stack, Queue(자료구조)
      • CH06 Sorting and Searching(정렬, 이분검색과 결정알고리즘)
      • CH07 Recursive, Tree, Graph(DFS, BFS 기초)
      • CH08 DFS, BFS 활용
      • CH09 Greedy Algorithm
      • CH10 dynamic programming(동적계획법)
  • 도서
    • 만들면서 배우는 클린 아키텍처
      • 학습목표
      • CH01 계층형 아키텍처의 문제는 무엇일까?
      • CH02 의존성 역전하기
      • CH03 코드 구성하기
      • CH04 유스케이스 구현하기
      • CH05 웹 어댑터 구현하기
      • CH06 영속성 어댑터 구현하기
      • CH07 아키텍처 요소 테스트하기
      • CH08 경계 간 매핑하기
      • CH09 어플리케이션 조립하기
      • CH10 아키텍처 경계 강제하기
      • CH11 의식적으로 지름길 사용하기
      • CH12 아키텍처 스타일 결정하기
    • 클린 아키텍처
      • 들어가며
      • 1부 소개
        • 1장 설계와 아키텍처란?
        • 2장 두 가지 가치에 대한 이야기
      • 2부 벽돌부터 시작하기: 프로그래밍 패러다임
        • 3장 패러다임 개요
        • 4장 구조적 프로그래밍
        • 5장 객체 지향 프로그래밍
        • 6장 함수형 프로그래밍
      • 3부 설계 원칙
        • 7장 SRP: 단일 책임 원칙
        • 8장 OCP: 개방-폐쇄 원칙
        • 9장 LSP: 리스코프 치환 원칙
        • 10장 ISP: 인터페이스 분리 원칙
        • 11장 DIP: 의존성 역전 원칙
      • 4부 컴포넌트 원칙
        • 12장 컴포넌트
        • 13장 컴포넌트 응집도
        • 14장 컴포넌트 결합
      • 5부
        • 15장 아키텍처란?
    • 스프링 입문을 위한 자바 객체 지향의 원리와 이해
      • CH01 사람을 사랑한 기술
      • CH02 자바와 절차적/구조적 프로그래밍
      • CH03 자바와 객체 지향
      • (+) 자바 코드 실행에 따른 메모리 적재과정
      • CH04 자바가 확장한 객체 지향
      • CH05 객체 지향 설계 5 원칙 - SOLID
      • CH06 스프링이 사랑한 디자인 패턴
      • CH07 스프링 삼각형과 설정 정보
      • (부록) 람다(lambda)
    • 객체지향의 사실과 오해
      • CH01 협력하는 객체들의 공동체
      • CH02 이상한 나라의 객체
      • CH03 타입과 추상화
      • CH04 역할, 책임, 협력
      • CH05 책임과 메시지
      • CH06 객체 지도
      • CH07 함께 모으기
      • (+) 인터페이스 개념 바로잡기
    • 도메인 주도 개발 시작하기
      • CH01 도메인 모델 시작하기
      • CH02 아키텍처 개요
      • CH03 애그리거트
      • CH04 리포지터리와 모델 구현
      • CH05 스프링 데이터 JPA를 이용한 조회 기능
      • CH06 응용 서비스와 표현 영역
      • CH07 도메인 서비스
      • CH08 애그리거트 트랜잭션 관리
      • CH09 도메인 모델과 바운디드 컨텍스트
      • CH10 이벤트
      • CH11 CQRS
    • 자바 ORM 표준 JPA 프로그래밍
      • CH01 JPA 소개
      • CH02 JPA 시작
      • CH03 영속성 관리
      • CH04 엔티티 매핑
      • CH05 연관관계 매핑 기초
      • CH06 다양한 연관관계 매핑
      • CH07 고급 매핑
      • CH08 프록시와 연관관계 관리
      • CH09 값 타입
      • CH10 객체지향 쿼리 언어
      • CH11 웹 애플리케이션 제작
      • CH12 스프링 데이터 JPA
      • CH13 웹 애플리케이션과 영속성 관리
      • CH14 컬렉션과 부가 기능
      • CH15 고급 주제와 성능 최적화
      • CH16 트랜잭션과 락, 2차 캐시
    • 소프트웨어 세상을 여는 컴퓨터과학
      • CH01 컴퓨터 과학 소개
      • CH02 데이터 표현과 디지털 논리
    • 이펙티브 자바
      • 1 장 들어가기
      • 2장 객체 생성과 파괴
        • [01] 생성자 대신 정적 팩터리 메서드를 고려하라
        • [02] 생성자에 매개변수가 많다면 빌더를 고려하라
        • [03] private 생성자나 열거 타입으로 싱글턴임을 보증하라
        • [04] 인스턴스화를 막으려거든 private 생성자를 사용하라
        • [05] 자원을 직접 명시하지 말고 의존 객체 주입을 사용하라
        • [06] 불필요한 객체 생성을 피하라
        • [07] 다 쓴 객체 참조를 해제하라
        • [08] finalizer 와 cleaner 사용을 피하라
        • [09] try-finally 보다는 try-with-resources 를 사용하라
      • 3장 모든 객체의 공통 메서드
        • [10] equals는 일반 규약을 지켜 재정의하라
        • [11] equals 를 재정의하려거든 hashCode도 재정의하라
        • [12] toString 을 항상 재정의하라
        • [13] clone 재정의는 주의해서 진행하라
        • [14] Comparable 을 구현할지 고려하라
      • 4장 클래스와 인터페이스
        • [15] 클래스와 멤버의 접근 권한을 최소화하라
  • 토픽
    • 서버 모니터링
      • CPU 사용량
      • 메모리 사용량
      • 스레드 풀
    • Spring Boot Monitoring
      • Spring actuator
      • Spring eureka
      • Prometheus
      • grafana
      • Spring actuator + Prometheus + grafana
    • JAVA 데일리 토픽
      • 메모리 누수(memory leak)
      • 객체 참조의 유형
      • 커스텀 스레드 풀
      • Mark And Compact
      • serialVersionUID 이해하기
      • 함수형 인터페이스
      • 메소드 참조
      • equals()와 hashCode()가 무엇이고 역할이 무엇인지
      • StringBuffer vs StringBuilder
      • String vs StringBuilder, StringBuffer
      • String interning
    • JAVA GC
    • 프로그래머스 문제 풀기
      • 해시
      • 스택/큐
      • 힙(Heap)
      • 정렬
      • 완전탐색
      • DFS/BFS
    • 데이터베이스 구성 및 작동 흐름
    • 데이터베이스 JOIN 원리
    • 객체지향생활체조 원칙
    • 상태(state), 상속(inheritance), 합성(composition) 의 상관관계
    • java enum은 메모리에 언제, 어떻게 할당되는가
    • Checked Exception vs UnChecked Exception
    • Reactive Streams 원리탐구 - 간단한 예제 직접 작성해보기
    • Flutter Basic
    • Flutter StatefulWidget 생명주기
    • Flutter 가 위젯을 그리는 원리
    • Flutter 클린 아키텍처
      • application layer
        • 패키지 구조 및 레이어 설명
        • environment
        • dependency injection
        • go_router
        • foreground & background
        • 다국어처리 (l10n, i18n)
        • Global 처리(시스템 점검, fore->back 등)
        • connection_manager
        • permission_manager
        • push_notification_manager
        • firebase 연동
      • data layer
        • 패키지 구조 및 레이어 설명
        • network
        • repository
      • domain layer
        • 패키지 구조 및 레이어 설명
      • presentation layer
        • 패키지 구조 및 레이어 설명
        • resources
    • 기술 관련 포스팅 읽기
  • 기타
    • 작업일지
      • 2023. 10
      • 2023. 09
      • 2023. 08
      • 2023. 07
      • 2023. 06
      • 2023. 05
      • 2023. 04
      • 2023. 03
      • 2023. 02
      • 2023. 01
      • 2022. 12
    • Business Model
      • 아이디어 불패의 법칙
      • 린 모바일 앱 개발
      • 린 스타트업
      • 제로투원
      • MIT 스타트업 바이블
      • 린치핀
    • 백로그 종합
Powered by GitBook
On this page
  1. 강의
  2. Reactive Programming in Modern Java using Project Reactor

Reactor execution model 4 - overview

Reactor execution model 이해를 바탕으로 실습해보기

PreviousReactor execution model 3 - parallelismNextTransform

Last updated 1 year ago

카카오Tech 에 이에 관해서 매우 정리가 잘된 글()을 발견해서 똑같이 실습해보면서 코드를 점점 개선해 나가보았다.

위 포스팅 전반에서 개선 포인트들을 짚어주고 설명해주는데, 이 과정 그대로 따라하면서 실제로 실행해서 로그를 관찰해보았다.

public class FruitExample1 {
    public static void main(String[] args) {
        final List<String> basket1 = Arrays.asList(new String[]{"kiwi", "orange", "lemon", "orange", "lemon", "kiwi"});
        final List<String> basket2 = Arrays.asList(new String[]{"banana", "lemon", "lemon", "kiwi"});
        final List<String> basket3 = Arrays.asList(new String[]{"strawberry", "orange", "lemon", "grape", "strawberry"});
        final List<List<String>> baskets = Arrays.asList(basket1, basket2, basket3);
        final Flux<List<String>> basketFlux = Flux.fromIterable(baskets);

        basketFlux.concatMap(basket -> {
            final Mono<List<String>> distinctFruits = Flux.fromIterable(basket).distinct().collectList();
            final Mono<Map<String, Long>> countFruitsMono = Flux.fromIterable(basket)
                    .groupBy(fruit -> fruit) // 바구니로 부터 넘어온 과일 기준으로 group을 묶는다.
                    .concatMap(groupedFlux -> groupedFlux.count()
                            .map(count -> {
                                final Map<String, Long> fruitCount = new LinkedHashMap<>();
                                fruitCount.put(groupedFlux.key(), count);
                                return fruitCount;
                            }) // 각 과일별로 개수를 Map으로 리턴
                    ) // concatMap으로 순서보장
                    .reduce((accumulatedMap, currentMap) -> new LinkedHashMap<String, Long>() {
                        {
                            putAll(accumulatedMap);
                            putAll(currentMap);
                        }
                    }); // 그동안 누적된 accumulatedMap에 현재 넘어오는 currentMap을 합쳐서 새로운 Map을 만든다. // map끼리 putAll하여 하나의 Map으로 만든다.
            return Flux.zip(distinctFruits, countFruitsMono, (distinct, count) -> new FruitInfo(distinct, count));
        }).subscribe(System.out::println);
    }
}

위 코드는 각 basket의 과일의 종류를 distincti 처리해주고, 각 종류마다 개수를 파악해주는 일을 수행한다. 각각 distinctFruits, countFruitsMono가 그 역할을 하는데, 위 코드의 문제점은 두 가지이다.

  • reactor의 장점인 비동기, 논블로킹이 전혀 쓰이지 않고 있다. 특별히 병렬처리를 지시하지 않았기 때문에 기본적인 execution 원칙에 의해서 구독을 시작한 thread가 모든 것을 처리할 것이기 때문이다.

  • basketFlux가 emit하는 basket 을 distinctFruits, countFruitsMono 가 각각 순회한다. 어차피 basket 을 순회하며 무엇인가 처리를 할 것이라면 한번만 순회하면 되는데 두번씩 돌리는 것이다.(비효율)

먼저 아래와 같은 코드로 두 작업을 각각 비동기로 병렬 수행시켜서 로직을 개선할 수 있다.

public class FruitExample2 {
    public static void main(String[] args) throws InterruptedException {
        final List<String> basket1 = Arrays.asList(new String[]{"kiwi", "orange", "lemon", "orange", "lemon", "kiwi"});
        final List<String> basket2 = Arrays.asList(new String[]{"banana", "lemon", "lemon", "kiwi"});
        final List<String> basket3 = Arrays.asList(new String[]{"strawberry", "orange", "lemon", "grape", "strawberry"});
        final List<List<String>> baskets = Arrays.asList(basket1, basket2, basket3);
        final Flux<List<String>> basketFlux = Flux.fromIterable(baskets);

        basketFlux.concatMap(basket -> {
            final Mono<List<String>> distinctFruits = Flux.fromIterable(basket).log().distinct().collectList().subscribeOn(Schedulers.parallel());
            final Mono<Map<String, Long>> countFruitsMono = Flux.fromIterable(basket).log().groupBy(fruit -> fruit) // 바구니로 부터 넘어온 과일 기준으로 group을 묶는다.
                    .concatMap(groupedFlux -> groupedFlux.count().map(count -> {
                                final Map<String, Long> fruitCount = new LinkedHashMap<>();
                                fruitCount.put(groupedFlux.key(), count);
                                return fruitCount;
                            }) // 각 과일별로 개수를 Map으로 리턴
                    ) // concatMap으로 순서보장
                    .reduce((accumulatedMap, currentMap) -> new LinkedHashMap<String, Long>() {
                        {
                            putAll(accumulatedMap);
                            putAll(currentMap);
                        }
                    }) // 그동안 누적된 accumulatedMap에 현재 넘어오는 currentMap을 합쳐서 새로운 Map을 만든다. // map끼리 putAll하여 하나의 Map으로 만든다.
                    .subscribeOn(Schedulers.parallel());
            return Flux.zip(distinctFruits, countFruitsMono, (distinct, count) -> new FruitInfo(distinct, count));
        }).subscribe(System.out::println);

        Thread.sleep(5000L);
    }

}

별달리 해준 것은 없고 각각의 distinctFruits, countFruitsMono 에 대해서 subscribeOn()을 통해 thread를 별도로 지정해주었다. 로그를 확인해보면 main thread가 아니라 parallel 에서 꺼내온 thread가 처리에 개입하고 있음을 확인할 수 있다.

Thread.sleep()을 해준 이유는, 초기 코드와는 달리 병렬 수행을 위해서 데몬 thread에게 처리를 지시함으로써 데몬 thread 가 백그라운드로 작업을 처리해주는데, 이와중에 main thread가 종료됨으로써 applicaion이 끝나버리기 때문이다.

JVM doesn’t wait for daemon thread to finish but it waits for User Thread

이제 비동기 문제는 개선을 해보았으니 동일한 source 에 대해서 두 번 순회하는 비효율을 거둬보자.


public class FruitExample3 {

    public static void main(String[] args) throws InterruptedException {

        final List<String> basket1 = Arrays.asList(new String[]{"kiwi", "orange", "lemon", "orange", "lemon", "kiwi"});
        final List<String> basket2 = Arrays.asList(new String[]{"banana", "lemon", "lemon", "kiwi"});
        final List<String> basket3 = Arrays.asList(new String[]{"strawberry", "orange", "lemon", "grape", "strawberry"});
        final List<List<String>> baskets = Arrays.asList(basket1, basket2, basket3);
        final Flux<List<String>> basketFlux = Flux.fromIterable(baskets);


        basketFlux.concatMap(basket -> {
            final Flux<String> source = Flux.fromIterable(basket).log().publish().autoConnect(2);
            final Mono<List<String>> distinctFruits = source.distinct().collectList().log().subscribeOn(Schedulers.parallel());
            final Mono<Map<String, Long>> countFruitsMono = source
                    .groupBy(fruit -> fruit) // 바구니로 부터 넘어온 과일 기준으로 group을 묶는다.
                    .concatMap(groupedFlux -> groupedFlux.count()
                            .map(count -> {
                                final Map<String, Long> fruitCount = new LinkedHashMap<>();
                                fruitCount.put(groupedFlux.key(), count);
                                return fruitCount;
                            }) // 각 과일별로 개수를 Map으로 리턴
                    ) // concatMap으로 순서보장
                    .reduce((accumulatedMap, currentMap) -> new LinkedHashMap<String, Long>() {
                        {
                            putAll(accumulatedMap);
                            putAll(currentMap);
                        }
                    }) // 그동안 누적된 accumulatedMap에 현재 넘어오는 currentMap을 합쳐서 새로운 Map을 만든다. // map끼리 putAll하여 하나의 Map으로 만든다.
                    .log()
                    .subscribeOn(Schedulers.parallel());
            return Flux.zip(distinctFruits, countFruitsMono, (distinct, count) -> new FruitInfo(distinct, count));
        }).subscribe(e -> System.out.println(Thread.currentThread() + " || " + e.toString()));

        Thread.sleep(5000L);
    }

}

distinctFruits, countFruitsMono 두 작업은 데몬 thread를 할당해주어 병렬 수행하면서도 source를 hot으로 변경하여 한번만 순회하도록 처리된 코드이다.

즉, 이전에는 cold 로 관리가 되어 구독자가 구독을 시작하는 시점에 각각 스트림을 발생 시켰다면, 이번에는 hot으로 전환하여 특정 조건, 특정 시점이 되었을때 스트림을 발생시켜서 이를 구독하는 구독자들에게 동일한 데이터를 일괄적으로 흘려준 것이다. 이렇게 쓸데없이 동일한 스트림을 두 번 발생시키던 비효율을 한 번만 발생시키도록 개선할 수 있다.

hot과 cold 에 대해서는 udemy 강의를 듣고 더 정리할 생각이다.

이 부분에 대한 이해를 하는 과정에서 이 글() 이 도움이 좀 되었다. 애초에 JVM 에서 thread의 구분을 두고 있고 각 thread 에 대해서 JVM이 인식하고 처리해주는 방식이 다르다. 여기서 적용되는 원칙은 아래와 같다.

https://tech.kakao.com/2018/05/29/reactor-programming/
https://www.geeksforgeeks.org/difference-between-daemon-threads-and-user-threads-in-java/