Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (3)

2026. 1. 22. 04:47·dev/infra

도커 환경에서의 Kafka 설정

Kafka의 로그 서버는 스프링 애플리케이션과 분리된 외부 브로커로 동작하며, 프로듀서와 컨슈머 클라이언트를 통해 애플리케이션과 통신한다. 따라서 일반적으로 도커 컨테이너나 별도의 서버 환경에서 운영된다. 여기서는 Kafka를 도커 컨테이너 안에서 실행하는 방식으로 설계했다.

 

스프링 애플리케이션은 로컬 환경에서는 IDE에서, 배포 환경에서는 도커 컨테이너 안에서 실행되므로 어느 환경에서 접속하더라도 도커 내부의 Kafka 브로커를 정확히 식별해 연결해야 한다. 이를 위해 Kafka는 도커 환경에서 포트를 2개 열어 두고, 각각에 리스너를 설정하여 두 가지 방식으로 접근하는 클라이언트와 모두 통신할 수 있도록 구성한다. 도커 내부에서는 클라이언트끼리 직접 연결 가능하므로 외부에서 접속할 포트의 매핑만 명시했다.

  • 개발 환경 (도커 밖): 로컬 호스트에서 IDE로 실행 중인 Spring 애플리케이션은 KAFKA_BOOTSTRAP_SERVERS=localhost:29092로 접속
  • 배포 환경 (도커 안): 도커 컨테이너에서 돌아가는 Spring 애플리케이션은 도커 내부 네트워크를 통해 KAFKA_BOOTSTRAP_SERVERS=kafka:9092로 직접 접속

다음은 도커 컨테이너에서 돌아가는 단일 브로커 Kafka 설정이다.

    kafka:
      image: confluentinc/cp-kafka:latest
      container_name: kafka
      ports:
        - "29092:29092"    # 로컬 호스트 접속 매핑용
      environment:
        # 필수 KRaft 설정
        CLUSTER_ID: "hjeeg3q1SoCw7IKoRw-rMQ"
        KAFKA_NODE_ID: 1
        KAFKA_PROCESS_ROLES: "broker,controller"  # 브로커와 컨트롤러 역할
        KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"  # 컨트롤러 통신용 주소

        # 리스너 설정 (KRafts 모드용 CONTROLLER)
        KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:29092'
        KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092'
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
        KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

        # Mac(ARM) 호환 설정 및 성능 최적화
        _JAVA_OPTIONS: "-XX:UseSVE=0"
        KAFKA_HEAP_OPTS: "-Xms512M -Xmx512M"   # JVM 힙 메모리
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
        KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      volumes:
        - kafka_data:/var/lib/kafka/data   # Docker 내부 볼륨만 사용
      restart: unless-stopped

  volumes:
    kafka_data:

 

앞서 언급한 ZooKeeper 기반 구조와의 차이로 인해 KRaft 모드에서는 전용 설정 항목들이 존재한다.

  • 각 브로커는 CLUSTER_ID와 KAFKA_NODE_ID를 통해 소속 클러스터와 노드 식별자를 명시하며 PROCESS_ROLES 설정에 controller를 포함함으로써 컨트롤러 역할을 수행할 수 있게 한다.
  • 외부 코디네이터(ZooKeeper) 없이 Raft 합의 알고리즘을 통해 활성 컨트롤러를 선출하므로, 노드 간 통신을 위한 격리된 전용 채널이 필수적이다. 이를 위해 CONTROLLER_LISTENER_NAMES에 채널 명칭을 정의하고, LISTENERS 설정에 실제 대기 포트(주로 9093)를 할당한다.
  • 컨트롤러 채널은 브로커 간 내부 통신 전용이다. 모든 브로커는 도커 내부에서 실행되며, 도커 환경에서는 컨테이너 이름을 통해 즉시 식별 및 라우팅이 가능하다. 따라서 외부 클라이언트에게 자신의 주소를 알리는 용도인 ADVERTISED_LISTENERS에는 컨트롤러 주소를 추가할 필요는 없다
  • LISTENER QUORUM_VOTERS에는 클러스터 내에서 컨트롤러 투표권을 가진 노드들의 ID와 주소가 명시된다.

KRaft 모드의 Kafka는 구동 전 CLUSTER_ID 생성을 통한 스토리지 초기화 과정을 거치는데, 따로 설정해두지 않았다면 다음 오류 메시지가 뜨며 실행조차 되지 않는다.


 

다음은 다중 브로커 구조에서의 설정이다.

version: "3.9"
services:
  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name: kafka1
    ports:
      - "29092:29092"
    environment:
      CLUSTER_ID: "hjeeg3q1SoCw7IKoRw-rMQ" # 모든 노드 동일 필수
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"	# controller 역할 추가
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:29092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
    volumes:
      - kafka1_data:/var/lib/kafka/data

  kafka2:
    image: confluentinc/cp-kafka:latest
    container_name: kafka2
    ports:
      - "29093:29093"
    environment:
      CLUSTER_ID: "hjeeg3q1SoCw7IKoRw-rMQ"
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:29093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:29093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
    volumes:
      - kafka2_data:/var/lib/kafka/data

  kafka3:
    image: confluentinc/cp-kafka:latest
    container_name: kafka3
    ports:
      - "29094:29094"
    environment:
      CLUSTER_ID: "hjeeg3q1SoCw7IKoRw-rMQ"
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:29094"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9092,PLAINTEXT_HOST://localhost:29094"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
    volumes:
      - kafka3_data:/var/lib/kafka/data

volumes:
  kafka1_data:
  kafka2_data:
  kafka3_data:

 

다중 브로커 구조에서 추가로 신경쓸 점은 다음과 같다.

  • 같은 클러스터를 구성하는 브로커들은 Cluster_ID 를 통일한다.
  • 각 브로커는 외부 클라이언트와의 통신을 위한 포트와, 내부 컨트롤러 간 합의를 위한 전용 제어 채널에 각각 독립된 LISTENER를 바인딩한다.
  • KAFKA_CONTROLLER_QUORUM_VOTERS 설정에는 클러스터 내 활성 컨트롤러 투표권을 가진 모든 노드의 정보를 명시한다. 예를 들어, 1@kafka1:9093은 노드 ID 1번에 해당하는 컨트롤러의 주소가 kafka1:9093임을 의미하며 노드들은 이 명부를 바탕으로 상호 연결 및 코디네이팅을 수행한다.

KRaft 모드 Kafka 클러스터 구조
Zookeeper 기반 Kafka 클러스터 구조


Kafka 클라이언트 및 인프라 구성

다음은 애플리케이션에 Kafka 도입을 위한 기본적인 클라이언트와 인프라 구성이다

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ProducerFactory<String, DomainEvent> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, DomainEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, DomainEvent> consumerFactory() {
        JsonDeserializer<DomainEvent> deserializer = new JsonDeserializer<>(DomainEvent.class);
        deserializer.addTrustedPackages("*");		//보안상 설정 필요
        deserializer.setUseTypeMapperForKey(false);

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    // 재시도 + DLQ 설정
    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<String, DomainEvent> kafkaTemplate) {
        DeadLetterPublishingRecoverer recoverer =
                new DeadLetterPublishingRecoverer(kafkaTemplate,
                        (record, ex) -> new TopicPartition(record.topic() + "-dlq", record.partition()));

        // 1초 간격, 최대 3회 재시도
        FixedBackOff backOff = new FixedBackOff(1000L, 3);

        return new DefaultErrorHandler(recoverer, backOff);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> kafkaListenerContainerFactory(KafkaTemplate<String, DomainEvent> kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, DomainEvent> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // 컨슈머 병렬 처리 스레드 수
        factory.setCommonErrorHandler(errorHandler(kafkaTemplate)); //에러 발생시 처리 로직
        return factory;
    }


}

 

기본적으로 애플리케이션과 Kafka 브로커 간의 안정적인 메시지 송수신을 위해 ProducerFactory와 ConsumerFactory를 정의하여 각 인스턴스의 핵심 설정을 구성한다. 분산 환경에서 이벤트 객체는 네트워크를 통해 전송되어야 하므로, 데이터의 규격화와 호환성을 위해 JSON 형식의 직렬화 및 역직렬화 과정을 거친다. Kafka 클라이언트의 기본 설정을 주입받고 문자열 Key와 JSON 형식 Value를 처리하는 Serializer/Deserializer를 명시적으로 설정하였다.

 

Producer가 발행하고, Consumer가 소비할 이벤트 객체를 담는 KafkaTemplate은 ProducerFactory를 인자로 받는다. Kafka에서 메시지는 일단 Producer가 생성하면 구독한 Consumer가 알아서 받아가는 구조이므로 메시지에게 Consumer 정보는 필요없다.

 

이전에 언급했듯이 Kafka는 시스템 장애 발생 시 애플리케이션이 자체적인 복구 메커니즘을 구현할 수 있는 환경을 제공하는데 DefaultErrorHandler로 이를 구현할 수 있다. 처리에 실패한 데이터를 유실하지 않고 별도의 토픽으로 분리하여  DLQ(Dead Letter Queue)라는 곳에 저장함으로써 장애 원인 분석 및 사후 재처리를 위한 물리적 환경을 제공한다. 이를 활용해 애플리케이션은 일부 이벤트 처리 실패가 전체 이벤트 흐름을 중단시키지 않도록 하는 부분 실패 허용 메커니즘을 구현할 수 있다.

  • 재시도 메커니즘 (FixedBackOff): 일시적인 네트워크 장애나 일시적 오류 시 1초 간격으로 3번 재시도하도록 한 재처리 로직
  • DLQ (Dead Letter Queue): 3번의 재시도 후에도 실패한 작업은 원본토픽-dlq라는 별도의 스토리지로 격리, 일시적이지 않은 장애나 서버 재시작에 상황에 대응하여 완료하지 못한 작업을 기억

ConcurrentKafkaListenerContainerFactory는 KafkaListener의 실행 환경을 구성하는 핵심 설정이다. 컨슈머 그룹 내에서 파티션의 병렬 처리 수준을 설정하고, 메시지 전달 및 복구 매커니즘을 제어함으로써 Kafka 컨슈머의 생명주기를 오케스트레이션한다.

 

concurrency를 3으로 설정하면 리스너 내에서 브로커와 통신하는 전용 컨슈머 워커 스레드 3개가 할당된다. 이 스레드는 각자 전용 파티션을 담당해서 폴링(Polling)을 수행하며 파티션 단위 병렬 처리를 가능하게 한다. 이 설정을 주입받은 컨슈머 그룹 수 X concurrency 값 만큼의 스레드 자원이 할당되므로 서버의 자원을 고려하여 설정해야한다. 특히 토픽의 파티션 개수보다 큰 값을 설정하는 경우 항상 유휴 상태인 스레드가 생기므로 주의해야한다. 

 

기본적으로 이 설정은 Kafka 전용 이벤트의 추상화인 DomainEvent 인터페이스를 기준으로 공통 적용되도록 햇다. 애플리케이션에서 상황에 따라 발행되는 실제 이벤트 객체는 모두 DomainEvent를 상속받아 각 도메인별 비즈니스 로직에 맞게 확장되도록 설계했다.


Kafka를 적용한 이벤트 처리 구조

공연 도메인 서비스의 등록 승인 메서드

이제 Kafka를 도입함으로써, 기존 예시의 등록 승인 흐름에서 세 종류의 알림 생성이 어떻게 처리되는지 살펴보자.

    @Transactional
    public AdminAmateurShowSummaryResponseDTO approveShow(Long showId) {
        
        AmateurShow show = amateurShowRepository.findById(showId)
                .orElseThrow(() -> new GeneralException(ErrorStatus.AMATEURSHOW_NOT_FOUND));

        show.approve();

        Member performer  = show.getMember();

        // 승인 트랜잭션 커밋에 대해 이벤트 발행
        eventPublisher.publishEvent(
                new ApproveCommitEvent(show.getId(), performer.getId()
                )
        );

        return AdminAmateurShowSummaryResponseDTO.from(show);
    }

 

먼저 도메인 로직상 순서 보장을 위해 @TransactionalEventListener의 phase=AFTER_COMMIT을 이용한다. approveShow()는 먼저 커밋에 대한 Spring 이벤트인 ApproveCommitEvent를 발행해서 알림 생성 로직이 승인 트랜잭션 커밋 이후에 호출되도록 한다.


도메인 이벤트 리스너

@Service
@RequiredArgsConstructor
public class ApproveCommitEventListener {
    private final ApprovalShowProducer approvalShowProducer;

    @Async
    @TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT)
    public void onApproveCommit(ApproveCommitEvent event) {

        //APPROVED 수정 트랜잭션 커밋 이벤트 감지 후 kafka 이벤트 발송
        try {
            approvalShowProducer.publish(
                    new ApprovalShowEvent(event.amateurShowId(), event.performerId())
            );
        } catch (Exception e) {
            throw new IllegalStateException(
                    "승인 이벤트 Kafka 발행 실패",
                    e
            );
        }
    }
}

 

AFTER_COMMIT 설정으로 승인 트랜잭션 후에 호출된 onApproveCommit 리스너는 Kafka 전용 이벤트 ApprovalShowEvent를 생성해서 approvalShowProducer.publish()를 호출한다. 

 

이때 Kafka 프로듀서의 publish 메서드는 내부적으로 KafkaTemplate.send()를 호출하는데 이는 별도의 자원을 사용해 외부 브로커와의  통신을 수반하는 작업이다. 일반적으로 이런 외부 연동 로직은 핵심 도메인 흐름과 격리하기 위해 @Async를 적용하여 비동기로 처리한다. 


Kafka 이벤트

public record ApprovalShowEvent(
        Long amateurShowId,
        Long performerId
) implements DomainEvent {

    @Override
    public DomainEventType getEventType() {
        return DomainEventType.SHOW_APPROVED;
    }
}

 

 ApprovalShowEvent는 앞서 설계한 Kafka 전용 이벤트 DomainEvent의 구현체이다.


Kafka 프로듀서

ApprovalShowEvent를 Kafka 브로커에게 전송하는 프로듀서 클라이언트인 approvalShowProducer는 다음과 같다.

@Component
@RequiredArgsConstructor
public class ApprovalShowProducer {

    private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
    private static final String TOPIC = "approval-show-topic";

    public void publish(ApprovalShowEvent event) {
        if (event == null) return;

        // amateurShowId로 파티션
        kafkaTemplate.send(TOPIC, event.amateurShowId().toString(), event);
    }

}

 

KafkaTemplate에 ApprovalShowEvent를 담아 approval-show-topic을 토픽을 붙여 브로커에게 전송한다. 이때 파티션 결정에 쓰일 key로 amateurShowId를 보내는데, 이는 Kafka 서버에서 파티셔닝을 위한 해시 계산에 사용되기 때문에 String으로 변환하여 전달한다. 이 메시지를 받은 Kafka 브로커는 ApprovalShowEvent를 Topic에 맞게 분류하고 전달받은 amateurShowId로 저장될 파티션을 결정한다.


Kafka 메시지 토픽 설정

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic approvalShowTopic() {
        return TopicBuilder.name("approval-show-topic")
                .partitions(3)
                .replicas(3)	//브로커 수보다 클 수 없음
                .build();
    }
}

 

Kafka 서버가 3개의 브로커 구조라고 가정하자. 위의 설정에 따라 approval-show-topic은 3개의 파티션과 3개의 replica로 구성된다. 예를 들어 amateuerShowId가 34, 15, 60인 이벤트가 연속적으로 들어왔다고 하면 브로커별 approval-show-topic의 구조는 다음과 같다. 

id=34인 이벤트 전송 시
id=15인 이벤트 전송 시
id=60인 이벤트 전송 시


Kafka 컨슈머

이제 이렇게 Kafka 브로커에 저장된 이벤트를 컨슈머가 어떻게 소비하는지 알아보자.

@Component
@RequiredArgsConstructor
public class ApprovalConsumerForSubscribers {

    private final NoticeService noticeService;

    @KafkaListener(
            topics = "approval-show-topic",
            groupId = "subscriber-group",
            containerFactory = "kafkaListenerContainerFactory"
    )

    @Transactional
    public void consume(ApprovalShowEvent event) {
        if (event == null) return;

        noticeService.notifySubscribers(event);

    }
}

 

@Component
@RequiredArgsConstructor
public class ApprovalConsumerForPerformer {

    private final NoticeService noticeService;

    @KafkaListener(
            topics = "approval-show-topic",
            groupId = "approved-group",
            containerFactory = "kafkaListenerContainerFactory"
    )

    @Transactional
    public void consume(ApprovalShowEvent event) {
        if (event == null) return;

        noticeService.notifyPerformer(event);

    }
}

 

@Component
@RequiredArgsConstructor
public class ApprovalConsumerForRecommendation {

    private final NoticeService noticeService;

    @KafkaListener(
            topics = "approval-show-topic",
            groupId = "recommended-group",
            containerFactory = "kafkaListenerContainerFactory"
    )
    @Transactional
    public void consume(ApprovalShowEvent event) {
        if (event == null) return;

        noticeService.notifyOthers(event);
    }
}

 

@KafkaListener로 approval-show-topic 토픽을 구독하면 브로커로부터 해당 토픽의 메시지를 받아올 수 있다. groupId로 묶인 컨슈머 그룹은 물리적으로 분리된 환경에서도 일관성이 보장된 데이터를 제공받아 병렬 처리가 가능하다. Kafka는 동일 그룹 내의 컨슈머들에게 파티션을 중복 없이 배분하여 처리량을 수평적으로 확장함과 동시에 각 파티션에는 할당된  담당 컨슈머만 접근하도록 제어함으로써 파티션 단위의 순차 처리를 보장한다.


Kafka 적용 비동기 이벤트 처리 구조에서 approveShow의 스레드별 실행 흐름

 

위는 amateurShowId=8인 공연의 등록을 승인함으로써 프로듀서가 approval-show-topic 메시지를 발행하고, 이 토픽을 구독한 세 컨슈머 그룹이 받아 각자 스레드에서 후속 작업을 비동기로 처리하는 상황이다. T2, T3, T4 스레드를 할당받은 컨슈머 그룹은 각각 구독자 알림, 등록자 알림, 추천 알림 생성을 수행한다.


Kafka 적용 비동기 이벤트 처리 구조에서 approveShow후 rejectShow의 스레드별 실행 흐름

 

위는 amateurShowId=23인 공연의 등록을 승인하고, amateurShowId=26인 공연의 등록을 반려한 상황이다. 프로듀서가 approval-show-topic 메시지를 발행해서 Broker 1에게 전송하자마자 구독 중인 컨슈머는 이를 가져와 세 종류의 알림 생성을 비동기로 처리한다.

 

메인 애플리케이션은 곧바로 reject-show-topic을 발행해서 Broker 2에게 전송하고, rejected-group의 컨슈머는 브로커로부터 이를 받아와 T5 스레드에서 등록 반려에 따른 후속 작업을 처리한다. T2, T4 스레드에서는 등록 승인에 따른 후속 작업이 실행 중인 와중에 T5에서는 등록 거부의 후속 작업이 독립적으로 실행되며 높은 처리량을 확보한다. 


Kafka를 매개체로 활용하는 것만으로도 비동기 처리를 구현하고, 프로듀서와 컨슈머의 생명주기를 격리할 수 있다.

 

Producer-Message-Consumer 구조는 스프링 컨텍스트 내부에서 관리되던 객체를 직접 소비하는 방식이 아니라, 외부 브로커를 통해 유입된 데이터를 독립적으로 처리하는 구조이다. 따라서 클라이언트 설정에서 KafkaListenerContainerFactory는 JVM으로부터 각 컨슈머 그룹이 고유한 워커 스레드를 물리적으로 할당받도록 한다. @KafkaListener로 이 설정을 주입받은 각 컨슈머는 각자의 스레드를 할당받고 애플리케이션의 메인 흐름과 분리되어 실행된다.

 

따라서 같은 JVM 안에 있더라도 프로듀서와 컨슈머는 서로 다른 스레드에서 동작하며, 컨슈머는 프로듀서가 아닌 Kafka 브로커로부터 전달된 메시지를 비동기적으로 처리한다. 이로 인해 애플리케이션 재시작 상황에서도 유실 걱정 없이 안정적 재처리를 보장할 수 있다.

 

이 구조는 동일한 애플리케이션 내에 구현되어 있더라도 호출자와 피호출자의 생명주기를 논리적으로 격리한 효과를 내어 향후 물리적으로 격리되는 MSA 구조에서도 바로 적용 가능한 유연한 설계가 된다. 내구성 뿐만 아니라, 하드웨어의 추가만으로 무한한 처리량 증가를 보장하므로 앞서 정의한 5가지 문제(응답 지연 문제, 순서 문제, 롤백 전파 문제, 처리량 문제, 내구성 문제)를 모두 해결한다.


Kafka가 적용된 비동기 처리 구조의 가치

단일 애플리케이션

Kafka의 진정한 가치는 분산 애플리케이션에서 마이크로서비스의 병렬 처리에서 드러나지만 단일 애플리케이션이라고 그 체감이 아예 없진 않다. 특히 fan-out 작업과 같은 대규모 처리 상황에서 높은 안정성을 유지한다.

 

시스템 내 모든 공연의 등록을 취소하는 메서드가 있다고 가정하자. 이 메서드를 수행하면 DB에 저장된 모든 공연의 등록 상태 column이 REJECTED로 변경되며 이에 따른 후속 작업은 rejected-show-topic을 구독한 KafkaListener에 의해 수행된다. 시스탬에 10000개의 공연이 등록된 상태라고 하자.

Kafka 적용 비동기 이벤트 처리 구조의 10000개 이벤트 처리 동작

 

위와 같이 3개의 replica와 partition으로 설정된 rejected-show-topic과 concurrency=3으로 설정된 리스너 환경에서 메인 서비스는 비동기적으로 처리되므로 10000번째 이벤트 발행 즉시 응답을 반환할 수 있다. 이 메시지를 구독한 rejected-group의 3개의 워커 스레드들은 할당받은 파티션을 기반으로 리더 브로커를 폴링(polling)하며 메시지를 소비한다.

 

생성된 10000개의 이벤트들은 key에 따라 각 파티션에 균일하게(약 3333개씩) 분산 저장되며 컨슈머는 설정된 배치 단위(max.poll.records)만큼 퍼와서 각자 스레드에서 처리한다. 이때 각 워커 스레드는 자신에게 할당된 파티션의 메시지만을 순차적으로 처리하며, 이러한 구조를 통해 제한된 자원 내에서도 안정적인 병렬 처리가 가능하다.

 

Kafka 적용 vs 미적용

Kafka 구조에서는 메시지가 토픽과 파티션을 통해 디스크 기반으로 분산 저장되며 컨슈머는 자신의 처리 가능한 속도에 맞춰 메시지를 소비한다. 이로 인해 자연스럽게 안정적인 backpressure가 보장된다.

 backpressure : 시스템의 처리 속도를 초과하는 입력을 제어함으로써 전체 시스템의 안정성을 유지하는 메커니즘

 

Kafka는 Producer의 전송 속도가 Consumer의 처리 속도를 초과하더라도 밀린 메시지를 디스크 기반 로그에 저장하여 안정적으로 버퍼링한다. 또한 Consumer가 처리 가능한 양만큼 데이터를 가져오는 pull 방식으로 동작하기 때문에 시스템 전체의 처리 속도는 자연스럽게 Consumer의 처리 능력에 맞춰진다. 이와 같은 구조를 통해 Kafka는 유입 속도와 처리 속도간 불일치를 효과적으로 흡수하며 안정적인 backpressure를 제공하는 버퍼 역할을 수행한다.

반면 Spring이 제공하는 @Async 기반 이벤트 처리 구조는 스레드 풀과 메모리 큐를 기반으로 동작하며 동시에 실행 가능한 작업 수는 스레드 풀의 크기로 제한된다. 스레드 수를 초과한 작업은 JVM 힙 메모리 상의 큐에 적재되며 작업의 유입 속도가 처리 속도를 초과할 경우 큐에 작업이 지속적으로 누적된다.

 

이 과정에서 큐의 크기가 제한되어 있지 않거나 과도하게 크다면 메모리 사용량이 점점 증가하게 되고 결국 힙 메모리가 고갈되면서 OutOfMemoryError(OOM)에 의해 프로그램이 다운될 수 있다. 반대로 큐의 크기가 제한된 경우에는 이를 초과한 작업이 거부되거나 손실될 수 있어 안정적인 처리가 어려워진다.

 

또한 메모리 사용량이 증가하는 상황에서는 이를 회수하기 위한 GC(Garbage Collection)가 빈번하게 수행되고 이 과정에서 Stop-The-World(STW)가 발생하여 애플리케이션의 응답 지연이 나타난다. 이러한 현상이 반복되면 전체 시스템의 처리량이 급격히 저하되고 서비스 중단으로 이어질 수 있다.

 

결과적으로 @Async 기반 구조는 작업이 메모리에 의존하여 누적되기 때문에 대규모 fan-out과 같이 순간적으로 많은 이벤트가 발생하는 상황에서 안정적인 부하 제어가 어렵고 Kafka와 같은 메시지 큐 기반 구조에 비해 확장성과 안정성이 떨어진다.


분산 애플리케이션

Kafka의 진가가 가장 잘 드러나는 건 MSA 구조에서이다. 여러 개의 마이크로서비스들이 Kafka 브로커를 통해 정보를 전달하고 각자의 작업을 유기적으로 수행한다. 

 

이와 같은 예시 상황을 가정해보자.

다음은 커뮤니티 기능과 관련있는 3개의 마이크로서비스를 운영하는 분산 애플리케이션 환경이다.

먼저 Community 마이크로서비스는 게시글, 댓글, 대댓글을 관리하는 핵심적인 커뮤니티 도메인 기능을 담당한다. 새로운 댓글이 생성되면 comment-created-topic 메시지를 브로커에 전송한다.

Patrol 마이크로서비스는 커뮤니티의 건전성을 위해 악의적인 내용이 있는지 AI를 이용하여 주기적으로 모니터링한다. Community 마이크로서비스에서 발행한 모든 토픽을 구독하는 community-check-group 리스너를 운영한다. 만약 악성 댓글이 감지된다면 해당 댓글을 삭제하고 malicious-comment-topic 메시지를 브로커에 전송한다.

User Report 마이크로서비스는 사용자 행동 데이터를 수집하여 이상 행동 감지 및 통계 지표를 제공한다. 이상 행동이나 악의적인 행동을 한 유저는 블랙리스트에 기록해둔다. malicious가 붙은 모든 토픽을 구독하는 malicious-event-group 리스너를 운영한다.

 

분산 애플리케이션에서 커뮤니티 기능을 담당하는 마이크로서비스(community-check-group의 concurrency=1)
분산 애플리케이션에서 커뮤니티 기능을 담당하는 마이크로서비스(community-check-group의 concurrency=3)

 

위의 그림은 악성 댓글 2개를 작성했을 때 각 마이크로서비스가 어떻게 작동하는지를 나타낸 그림으로 community-check-group의 concurrency 설정이 각각 1, 3인 경우이다.

 

먼저 Community 마이크로서비스에서 2개의 댓글 생성에 대한 이벤트를 생성해서 Broker 1,2로 각각 전송했다.

 

Patrol 마이크로서비스의 community-check-group 리스너가 Broker 1로부터 첫번째 댓글에 대한 메시지를 받아와 내용을 검사하는 checkMalicious를 실행한다. 이에 악성 내용을 감지하고 삭제한 후 malicious-comment-topic 메시지를 Broker 2에 전송한다.

 

이후 리스너가 Broker 2로부터 받아온 두번째 댓글에 대해서도 checkMalicious를 실행한다. 두번째 댓글에서도 악성 내용을 감지하고 삭제 후 malicious-comment-topic 메시지를 Broker 2에 전송한다.(concurrency = 1 일때 동기, 3일때 비동기 처리)

 

User Report 마이크로서비스의 malicious-event-group 리스너는 Broker 2에서 malicious-comment-topic 메시지를 받아와 댓글 작성자를 블랙리스트에 올리는 blackList 메서드를 실행한다.


서버 장애로 인한 재시작 상황

서버 장애로 인한 재시작 상황(community-check- group의 concurrency=3)

 

Patrol 마이크로서비스가 첫번째 댓글에 대한 메시지 처리 후 두번째 댓글 이벤트에 대한 checkMalicious 실행 중 다운됐다고 하자. 

Patrol 마이크로서비스가 중단된 작업을 재개하는 과정은 다음과 같다.(concurrency=3)

  1. 서버 재시작 후, community-check-group 리스너의 워커 스레드(T1, T2, T3)는 그룹 코디네이터 역할을 수행하는 브로커에게 comment-created-topic과 각자 담당 파티션을 보내며 커밋 오프셋(해당 파티션에서 다음 순서로 처리해야 할 오프셋)을 물어본다. (T1은 parition 2, T2는 partition 1, T3는 partition 0 담당)
  2. 커밋 오프셋이 변한 과정은 다음과 같다.(partition 0, partition 1, parition 2), LEO(Log End Offset) 
    1. 첫번째 댓글 이벤트 생성, 처리 전: (3, 1, 2), LEO=(3, 1, 3)
    2. 첫번째 댓글 이벤트 처리 후 커밋: (3, 1, 3), LEO=(3, 1, 3)
    3. 두번째 댓글 이벤트 생성, 처리 전: (3, 1, 3), LEO=(3, 2, 3)
    4. 두번째 댓글 이벤트 처리 중 서버 재시작: (3, 1, 3), LEO=(3, 2, 3)
  3. 그룹 코디네이터 역할을 수행하는 브로커는 내부적으로 운영하는 컨슈머 오프셋 로그를 참조하여 각 워커 스레드에게 (담당 파티션, 커밋 오프셋, 리더 브로커 주소) 형식의 응답을 보낸다. 응답을 받은 각 워커 스레드는 자신이 담당하는 파티션의 리더 브로커에게 커밋 오프셋에 해당하는 메시지의 fetch 요청을 한다.(T1은 3, T2는 1, T3는 3 요청)
    • T1은 Broker 1에게 partition 2의 #3에 해당하는 메시지를 요청
    • T2는 Broker 2에게 partition 1의 #1에 해당하는 메시지를 요청
    • T3는 Broker 1에게 partition 0의 #3에 해당하는 메시지를 요청
  4. 각 브로커는 요청받은 오프셋의 메시지가 존재하면 바로 전송, 없으면 최대한 늦게 응답
데이터가 없을 때마다 "없음"이라고 즉시 응답할 경우 컨슈머는 바로 "지금은?"이라고 물어보는 무의미한 폴링이 발생하며 네트워크 자원이 낭비
  • Broker 1은 T1로부터 요청받은 partition 2의 #3인 메시지가 없음(0, 1, 2까지만 존재) -> 지연 응답
  • Broker 2는 T2로부터 요청받은 partition 1의 #1인 메시지가 존재(0, 1 존재) -> 즉시 응답
  • Broker 1은 T3로부터 요청받은 partition 0의 #3인 메시지가 없음(0, 1, 2까지만 존재) -> 지연 응답

 5. 서버 재시작 후 T2는 Broker 2로부터 partition 1의 #1인 메시지를 전달받고 두번째 댓글 이벤트에 대해 checkMalicious를 호출하여 중단된 작업을 안전하게 재처리하게 된다.

서버 재시작 상황에서 comment-created-topic의 파티션 상태

 

 

Kafka의 메시지 시스템은 기존 단일 애플리케이션에서 인메모리로 처리되던 서비스 간 데이터 전송과 메서드 호출 구조를 JVM 바깥으로 옮겨, MSA 환경에서 수평적 확장과 마이크로서비스 간 통신을 내구성 있게 보장한다.

 

 

 

Kafka를 적용한 이벤트 기반 비동기 작업 처리 구조로 구현한 알림 기능이 제대로 동작하는지는 다음 시간에 검증해보자.

'dev > infra' 카테고리의 다른 글

Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (4)  (0) 2026.01.22
Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (2)  (1) 2026.01.21
Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (1)  (0) 2026.01.19
'dev/infra' 카테고리의 다른 글
  • Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (4)
  • Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (2)
  • Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (1)
cusum26
cusum26
  • cusum26
    CUSUMlog
    cusum26
  • 전체
    오늘
    어제
    • 분류 전체보기 (18)
      • dev (15)
        • blockchain (1)
        • ai (6)
        • web (0)
        • infra (4)
        • app (4)
      • cs (1)
        • blockchain (1)
      • scalability (2)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    FanoutTask
    lazy fanout
    Kafka
    bccprm
    Merkle Trie
    min.insync.replicas
    group metadata
    kafkaListenerContainerFactory
    도메인 이벤트
    kafka ui
    KafkaConfig
    __consumer_offsets
    acks
    비동기
    consumer offset
    codex skill
    fanout-on-write
    컨슈머 오프셋
    fanout-on-read
    msa
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
cusum26
Kafka - 이벤트 기반의 비동기 작업 처리 구조로 알림 기능 구현하기 (3)
상단으로

티스토리툴바