Mini Kafka 직접 구현하기: Outbox 다음 단계에서 본 메시지 큐 구조
Outbox만 구현한 채 “그 다음은 Kafka죠”라고 말하는 게 늘 찝찝했다. 이벤트 소싱이니 뭐니 하기 전에, 실제로 메시지 큐 구조를 직접 만들어보고 싶었다. 재미 삼아 속을 까보자는 마음이 Mini Kafka 프로젝트의 출발점이었다.
Mini Kafka를 다시 만든 이유
Outbox 패턴은 “이벤트를 안전하게 DB에 남긴다”까지만 책임진다. 하지만 실제 운영 환경에서는 이벤트가 곧바로 외부 시스템으로 흘러가야 한다. “왜 Topic이 필요한가?”, “왜 Consumer Group을 써야 안전한가?”를 내 입으로 설명하려면 실증이 필요했다. 문서만 읽고서는 항상 어딘가를 짚지 못한 느낌이 남아서 아쉬웠다.
처음에는 Kafka 공식 문서와 블로그 글을 읽으면서 개념을 익혔다. 하지만 문서를 읽는 것과 실제로 부딪히는 것은 완전히 다른 경험이었다. 예를 들어, “Consumer Group이 리밸런싱된다”라는 문장은 이해했지만, 리밸런싱이 왜 위험한지, 언제 발생하는지, 어떻게 대응해야 하는지는 직접 겪어보기 전까지 감이 없었다.
그래서 아래 원칙을 세우고 직접 구현해 봤다.
- 문제 가설부터 명시한다. 설계 전에 “어떤 상황에서 병목이 생길까”를 먼저 적고, 구현 중 해당 현상이 실제로 나타나는지 검증한다.
- 의사결정 로그를 남긴다. 설계 선택 이유와 포기한 대안까지 기록한다. “왜 A가 아닌 B를 선택했는가”를 나중에 설명할 수 있어야 한다.
- 발견과 감정을 분리한다. 무슨 근거로 어떤 결론을 얻었는지 명확히 쓰고, 마지막에 느낀 점을 정리한다.
- 트래픽 이슈 대응 관점으로 정리한다. 구현을 마친 후 “실제 장애가 나면 어떤 순서로 점검할 것인가”를 적는다.
아래는 Topic → Producer → Consumer → Partition → Replication → Consumer Group → Offset 순서로 정리한 내용이다. 각 섹션은 왜 고민했나 → 예상한 문제 → 구현하며 발견한 것 → 의사결정 과정 → 트래픽 이슈 대응 포인트 → 느낀 점 → 코드 순서를 따른다.
1. Topic
왜 이걸 고민했나
Outbox만 있던 시스템에서는 주문/결제/알림 이벤트가 한 큐에 들어왔다. Consumer 코드를 열어보면 if (event.type == "ORDER"), else if (event.type == "PAYMENT"), else if (event.type == "NOTIFICATION") 같은 분기가 수십 줄 이어졌다. 새로운 이벤트 타입이 추가될 때마다 이 분기문은 더 길어졌고, 한 번은 결제 로직 버그가 알림까지 영향을 미치는 사고가 났다.
스스로 “Topic을 나누자”고 주장하면서도, 정작 근거를 내놓지 못했던 시절이 있었다. 결국 “Kafka에서는 원래 이렇게 한다”라는 말뿐이었다. 이게 답답했다. Topic을 왜 나눠야 하는지, 어떤 기준으로 나눠야 하는지, 나누면 뭐가 좋아지는지를 코드로 증명하고 싶었다.
어떤 문제를 예상했나
가설 1. 장애 전파 범위 서로 다른 도메인이 같은 큐를 쓰면, 특정 기능의 장애가 전체 Consumer에게 전파될 것이라 예상했다. 예를 들어, 결제 처리 로직에서 예외가 발생하면 해당 메시지에서 Consumer가 멈추고, 뒤에 쌓인 알림 메시지들도 처리되지 않는 상황이 생길 것이다.
이 가설을 검증하기 위해 의도적으로 Consumer 스레드를 30초간 블로킹시키는 실험을 했다. 그 사이 알림 메시지 500개가 밀리는 모습을 보고, “주문과 알림이 같은 큐에 있으면 이런 일이 생기겠구나”라는 확신을 얻었다.
가설 2. 레이스 컨디션 Topic을 동적으로 만들다 보면 동시에 같은 이름으로 생성 요청이 들어올 수 있다. 이 경우 기존 Topic 참조가 덮어씌워지면서 메시지가 유실될 위험이 있다고 정리했다.
이 가설은 mutableMapOf로 Topic을 관리하는 초기 설계를 보면서 떠올렸다. map[name] = Topic(name)을 두 스레드가 동시에 실행하면 어떻게 될까? 먼저 생성된 Topic 인스턴스에 이미 메시지가 쌓여 있는데, 두 번째 스레드가 새 인스턴스로 덮어쓰면 그 메시지들은 어디로 가는 걸까?
구현하며 발견한 것
발견 1. 실제로 메시지가 증발했다
처음에는 mutableMapOf와 synchronized로 Topic을 관리했다.
// 초기 구현 (문제 있음)
private val topics = mutableMapOf<String, Topic>()
fun createTopic(name: String): Topic {
synchronized(topics) {
if (!topics.containsKey(name)) {
topics[name] = Topic(name)
}
return topics[name]!!
}
}겉보기에는 synchronized로 감쌌으니 안전해 보였다. 하지만 테스트를 돌려보니 문제가 드러났다. 100개의 스레드가 동시에 createTopic("orders")를 호출하는 테스트에서, 간헐적으로 NullPointerException이 발생했다.
원인을 추적해보니 synchronized 블록 바깥에서 topics[name]을 읽는 다른 코드가 있었다. 예를 들어 getTopic() 메서드가 락 없이 topics[name]을 읽고 있었고, 쓰기와 읽기가 동시에 일어나면서 불일치가 발생했다.
발견 2. ConcurrentHashMap + computeIfAbsent가 해결책이었다
// 개선된 구현
private val topics = ConcurrentHashMap<String, Topic>()
fun createTopic(name: String, partitionCount: Int = 1): Topic =
topics.computeIfAbsent(name) { Topic(it, partitionCount) }computeIfAbsent는 키가 없을 때만 람다를 실행하고, 이 과정이 원자적으로 이루어진다. 같은 테스트를 돌렸을 때 100만 번을 반복해도 메시지 유실이 발생하지 않았다.
발견 3. Topic 분리의 효과를 수치로 확인했다
주문/결제/정산 Topic을 분리하고 각각에 부하를 걸어봤다. 결제 Topic의 Consumer를 의도적으로 느리게 만들어 지연을 유발했을 때, 주문과 정산 Topic의 처리량은 영향을 받지 않았다. 반면 단일 Topic으로 운영했을 때는 결제 지연이 발생하면 모든 이벤트의 처리가 느려졌다.
| 구성 | 결제 지연 발생 시 주문 처리량 | 결제 지연 발생 시 알림 지연 |
|---|---|---|
| 단일 Topic | 30% 감소 | 평균 5초 |
| 분리 Topic | 영향 없음 | 영향 없음 |
의사결정 과정
대안 1. 단일 Topic + 이벤트 타입별 분기 장점은 단순함이다. Topic 관리 오버헤드가 없고, 모든 이벤트를 한 곳에서 볼 수 있다. 단점은 장애 전파와 코드 복잡도다. 위에서 실험한 대로 한 도메인의 문제가 다른 도메인에 영향을 준다.
대안 2. 도메인별 Topic 분리 장점은 장애 격리와 코드 분리다. 각 Consumer가 자기 도메인만 책임지니 코드도 단순해진다. 단점은 Topic 관리 복잡도다. Topic이 많아지면 모니터링 대상도 늘어난다.
최종 선택. 도메인별 Topic 분리 장애 격리의 이점이 관리 복잡도보다 크다고 판단했다. 특히 “결제 장애가 알림에 영향을 주면 안 된다”는 비즈니스 요구사항이 명확했기 때문이다.
Topic을 나눌 때 기준으로 삼은 것:
- SLA가 다른가? 결제는 3초 내 응답, 알림은 1분 내 전송이 목표였다. SLA가 다르면 분리한다.
- 담당 조직이 다른가? 결제팀과 마케팅팀이 같은 Consumer를 수정하면 배포 충돌이 생긴다.
- 장애 전파를 막아야 하는가? 한쪽 장애가 다른 쪽에 영향을 주면 안 되는 경우 분리한다.
트래픽 이슈 대응 포인트
이 구현을 통해 Topic 관련 장애가 발생했을 때 아래 순서로 점검할 수 있게 됐다.
- 어떤 Topic에서 문제가 발생했나? 분리된 Topic이면 영향 범위를 빠르게 파악할 수 있다.
- Topic 생성 시점에 레이스 컨디션이 있었나?
ConcurrentHashMap을 쓰고 있다면 이 가능성은 낮다. - Topic 분리 기준이 적절한가? 한 Topic의 장애가 다른 곳에 영향을 주고 있다면 분리 기준을 재검토한다.
느낀 점
이제는 “Topic 수를 어떻게 정할지”를 스스로 묻고, 서비스별 SLA·소유 조직·장애 전파 면적 같은 지표로 답을 낸다. 예전에는 “원래 이렇게 한다”라고 말했지만, 지금은 “결제 지연이 알림에 영향을 주지 않으려면 분리해야 한다”라는 근거를 숫자와 함께 제시할 수 있다.
Topic 생성을 동적 API로 열어두면 운영자가 실수로 비슷한 이름의 Topic을 여러 개 만들 수 있다는 것도 배웠다. orders, order, Orders가 각각 다른 Topic으로 생성되는 실수를 막으려면 생성 시점에 네이밍 컨벤션을 검증하거나, 메타데이터(담당팀, 목적, 예상 처리량)를 필수로 입력받는 게 좋겠다.
코드
class Broker {
private val topics = ConcurrentHashMap<String, Topic>()
fun createTopic(name: String, partitionCount: Int = 1): Topic =
topics.computeIfAbsent(name) { Topic(it, partitionCount) }
fun getTopic(name: String): Topic? = topics[name]
fun listTopics(): List<String> = topics.keys().toList()
fun deleteTopic(name: String): Boolean = topics.remove(name) != null
}2. Producer
왜 이걸 고민했나
Outbox 테이블에 INSERT하고 같은 트랜잭션에서 메시지를 발행하는 구조를 쓰고 있었다. 코드로 보면 이랬다.
@Transactional
fun createOrder(order: Order) {
orderRepository.save(order)
outboxRepository.save(OutboxEvent(order.toEvent()))
producer.send("orders", order.toEvent()) // 여기서 실패하면?
}문제는 producer.send()에서 예외가 발생하면 전체 트랜잭션이 롤백된다는 것이었다. 주문 데이터도, Outbox 레코드도 모두 사라진다. 그러면 클라이언트에게 “주문 실패”를 응답하고 재시도를 유도해야 하는데, 실제로는 Kafka가 잠깐 느려진 것뿐이라 재시도하면 중복 주문이 생길 수 있다.
장애가 발생했을 때 “DB가 문제인지 Kafka가 문제인지”를 분리하는 데만 30분 이상 걸린 적이 있다. 로그를 뒤져보면 Connection refused, Timeout 같은 메시지가 섞여 있었는데, 이게 DB 커넥션 문제인지 Kafka 브로커 문제인지 한눈에 알 수 없었다.
어떤 문제를 예상했나
가설 1. 무한 재시도 루프
send()에서 예외가 나면 트랜잭션이 롤백되고, Outbox 폴러가 다시 해당 레코드를 읽어서 재시도한다. 그런데 Kafka가 계속 응답하지 않으면? 롤백 → 재시도 → 롤백 → 재시도의 무한 루프가 발생할 것이다.
이 가설은 Kafka 응답을 의도적으로 30분 동안 막아둔 실험에서 확인됐다. 그 사이 Outbox 폴러가 같은 메시지를 수천 번 재시도하면서 DB 커넥션 풀을 고갈시켰다.
가설 2. 재시도 정책 변경의 어려움 Producer가 비즈니스 로직과 강하게 결합되어 있으면, 재시도 정책을 바꾸거나 DLQ(Dead Letter Queue)를 붙이는 게 어려울 것이다. 예를 들어 “3번 실패하면 DLQ로 보내자”라는 정책을 추가하려면 비즈니스 코드를 수정해야 한다.
가설 3. 메시지 순서 문제 같은 주문 ID에 대한 이벤트(생성 → 결제 → 배송)가 여러 파티션에 흩어지면 순서가 뒤바뀔 수 있다. Consumer가 “배송” 이벤트를 먼저 받고 “생성” 이벤트를 나중에 받으면 로직이 깨진다.
구현하며 발견한 것
발견 1. 무한 루프가 실험으로 확인됐다
테스트 환경에서 Kafka 브로커를 강제로 중단시키고 Outbox 폴러를 돌렸다. 예상대로 아래 루프가 발생했다.
[10:00:01] Polling outbox... found 1 message
[10:00:01] Sending to Kafka... failed (Connection refused)
[10:00:01] Transaction rolled back
[10:00:02] Polling outbox... found 1 message (같은 메시지)
[10:00:02] Sending to Kafka... failed (Connection refused)
[10:00:02] Transaction rolled back
... (무한 반복)CPU 사용률이 100%까지 치솟았고, DB 커넥션 풀이 고갈되어 다른 API 요청도 실패하기 시작했다.
발견 2. Producer 분리가 답이었다
Producer를 Outbox 트랜잭션과 완전히 분리했다. Outbox 폴러는 메시지를 읽어서 Producer에게 전달만 하고, 성공/실패 여부와 관계없이 해당 레코드를 “처리 중”으로 표시한다. Producer는 비동기로 메시지를 전송하고, 실패하면 DLQ Topic으로 보낸다.
// 개선된 구조
class OutboxPoller {
fun poll() {
val messages = outboxRepository.findPending()
for (message in messages) {
outboxRepository.markAsProcessing(message.id)
producer.sendAsync(message) // Fire-and-forget
}
}
}
class Producer {
fun sendAsync(message: OutboxMessage) {
try {
broker.getTopic(message.topic)?.let { topic ->
topic.selectPartition(message.key).publish(message)
}
} catch (e: Exception) {
broker.getTopic("dlq")?.selectPartition(null)?.publish(message)
}
}
}이 구조에서는 Kafka가 응답하지 않아도 Outbox 폴러는 멈추지 않는다. 실패한 메시지는 DLQ에 쌓이고, 나중에 운영자가 수동으로 재처리하거나 별도 Consumer가 처리한다.
발견 3. key 기반 파티셔닝으로 순서 보장
같은 주문 ID를 key로 사용하면 해당 주문의 모든 이벤트가 같은 파티션에 들어간다. 파티션 내에서는 순서가 보장되므로, Consumer는 항상 “생성 → 결제 → 배송” 순서로 이벤트를 받는다.
producer.send(topicName = "orders", key = order.id, message = event)의사결정 과정
대안 1. 동기 전송 + 트랜잭션 묶기 장점: 단순하다. 메시지 전송 실패 시 전체가 롤백되므로 “전송되지 않은 메시지”가 Outbox에 남지 않는다. 단점: Kafka 장애가 비즈니스 로직에 직접 영향을 준다. 위에서 본 무한 루프 문제가 발생한다.
대안 2. 비동기 전송 + DLQ 장점: Kafka 장애가 비즈니스 로직과 분리된다. 실패한 메시지만 DLQ에 격리되어 나머지는 정상 처리된다. 단점: DLQ 모니터링과 재처리 로직이 추가로 필요하다.
최종 선택. 비동기 전송 + DLQ “주문 생성”은 성공했는데 “주문 이벤트 전송”만 실패한 경우, 고객에게는 “주문 완료”를 보여주고 이벤트는 나중에 재처리하는 게 더 나은 UX라고 판단했다. DLQ 모니터링 대시보드를 만드는 비용보다 “주문이 자꾸 실패해요”라는 CS 비용이 더 크다.
트래픽 이슈 대응 포인트
Producer 관련 장애가 발생했을 때 아래 순서로 점검한다.
- DLQ에 메시지가 쌓이고 있나? DLQ 모니터링 대시보드를 확인한다.
- 어떤 Topic으로 전송이 실패했나? DLQ 메시지의 원본 Topic을 확인한다.
- key가 null인 메시지가 있나? 순서가 중요한 메시지인데 key가 없으면 파티션이 랜덤 배정되어 순서가 꼬일 수 있다.
- Kafka 브로커 상태는 정상인가? 브로커 메트릭(CPU, 메모리, 디스크)을 확인한다.
느낀 점
Producer를 Fire-and-forget으로 만들어야 “장애 지점”을 정확히 짚을 수 있다는 걸 깨달았다. 예전에는 “어디서 실패했지?”를 찾느라 시간을 많이 썼는데, 이제는 DLQ만 보면 실패한 메시지가 모여 있으니 원인 파악이 빨라졌다.
DLQ를 붙여두면 파티션 장애가 나더라도 어떤 메시지가 영향을 받았는지 추적하기 쉽다. 예전에는 “몇 개의 메시지가 유실됐는지”도 알 수 없었는데, 이제는 DLQ에 쌓인 개수만 세면 된다.
코드
class Producer(private val broker: Broker) {
private val dlqTopicName = "dlq"
fun send(topicName: String, key: String?, message: Message) {
val topic = broker.getTopic(topicName)
?: throw IllegalArgumentException("Topic not found: $topicName")
val partition = topic.selectPartition(key)
partition.publish(message)
}
fun sendAsync(topicName: String, key: String?, message: Message) {
try {
send(topicName, key, message)
} catch (e: Exception) {
sendToDlq(message, topicName, e)
}
}
private fun sendToDlq(message: Message, originalTopic: String, error: Exception) {
val dlqMessage = DlqMessage(
original = message,
originalTopic = originalTopic,
errorMessage = error.message,
failedAt = System.currentTimeMillis()
)
broker.getTopic(dlqTopicName)?.selectPartition(null)?.publish(dlqMessage)
}
}
data class DlqMessage(
val original: Message,
val originalTopic: String,
val errorMessage: String?,
val failedAt: Long
)3. Consumer
왜 이걸 고민했나
초기 Consumer 구현은 단순했다. while(true) 루프 안에서 큐를 계속 polling하는 구조였다.
// 초기 구현 (문제 있음)
while (true) {
val message = queue.poll()
if (message != null) {
process(message)
}
}이 코드를 돌렸을 때 CPU 사용률이 400%까지 치솟았다. 큐에 메시지가 없어도 poll()을 쉬지 않고 호출하니 당연한 결과였다. “그러면 Thread.sleep()을 넣으면 되지 않나?”라고 생각했지만, sleep 시간을 얼마로 잡아야 하는지가 문제였다.
담당자에게 “Consumer 스레드를 늘리면 해결된다”는 제안을 받았지만, 스레드를 늘려도 각 스레드가 CPU를 100%씩 먹으면 의미가 없다는 걸 설명할 근거가 필요했다.
어떤 문제를 예상했나
가설 1. sleep 시간과 지연의 trade-off
Thread.sleep(1)을 넣으면 CPU는 낮아지겠지만, 그 1ms 동안 들어온 메시지는 다음 루프까지 처리되지 않는다. sleep 시간을 10ms, 100ms로 늘리면 CPU는 더 낮아지지만 메시지 처리 지연은 그만큼 늘어날 것이다.
가설 2. Backpressure 부재 Consumer가 느려지면 어떻게 되나? Producer는 계속 메시지를 밀어넣고, 큐 길이가 끝없이 늘어나고, 결국 메모리가 터질 것이다. “Consumer가 느리니까 Producer도 좀 천천히 보내라”는 신호(Backpressure)가 없으면 장애가 확대된다.
가설 3. Consumer 수 증가의 한계 Consumer 스레드를 10개로 늘려도 파티션이 1개면 9개는 놀게 된다. Kafka에서 “파티션 수 ≥ Consumer 수”여야 한다는 제약의 이유를 직접 확인하고 싶었다.
구현하며 발견한 것
발견 1. CPU가 진짜 녹았다
while(true) + poll() 구조로 10초간 돌렸더니 CPU 사용률이 100%를 찍었다(단일 코어 기준). Thread.sleep(1)을 넣으니 30%로 떨어졌지만, 여전히 높았다.
| 구성 | CPU 사용률 | 평균 처리 지연 |
|---|---|---|
while(true) + poll() | 100% | 0.1ms |
while(true) + sleep(1) | 30% | 1.5ms |
while(true) + sleep(10) | 5% | 12ms |
BlockingQueue.take() | 0.1% | 0.2ms |
발견 2. BlockingQueue가 답이었다
LinkedBlockingQueue의 poll(timeout) 또는 take()를 쓰면 큐가 비어 있을 때 스레드가 블로킹되어 CPU를 거의 쓰지 않는다. 메시지가 들어오면 즉시 깨어나므로 지연도 최소화된다.
// 개선된 구현
class Partition(val id: Int) {
private val queue = LinkedBlockingQueue<Message>()
fun publish(message: Message) {
queue.put(message)
}
fun poll(timeout: Long): Message? {
return queue.poll(timeout, TimeUnit.MILLISECONDS)
}
}발견 3. Backpressure의 필요성
Backpressure 없이 운영했을 때 시나리오를 시뮬레이션했다. Producer가 초당 1000개를 보내고, Consumer가 초당 500개를 처리하면, 1분 후 큐에 30,000개가 쌓인다. 10분이면 300,000개. LinkedBlockingQueue의 기본 용량은 Integer.MAX_VALUE이므로 메모리가 허용하는 한 계속 쌓인다.
용량 제한을 걸면 Backpressure가 자연스럽게 생긴다.
private val queue = LinkedBlockingQueue<Message>(10000) // 최대 10000개
fun publish(message: Message) {
if (!queue.offer(message, 100, TimeUnit.MILLISECONDS)) {
throw QueueFullException("Queue is full, cannot accept more messages")
}
}Producer가 QueueFullException을 받으면 잠시 대기하거나 DLQ로 보내는 식으로 대응할 수 있다.
발견 4. Consumer 수와 파티션 수의 관계
파티션이 4개인 Topic에 Consumer를 8개 붙여봤다. 결과적으로 4개의 Consumer만 파티션을 할당받고, 나머지 4개는 아무 일도 하지 않았다. Consumer를 늘린다고 처리량이 늘어나는 게 아니었다.
의사결정 과정
대안 1. Busy-wait + sleep 장점: 구현이 단순하다. 단점: CPU 낭비, sleep 시간 튜닝이 어렵다.
대안 2. BlockingQueue 장점: CPU 효율적, 지연 최소화. 단점: 블로킹 구조라 스레드가 묶일 수 있다.
대안 3. Non-blocking (Reactor/Coroutine) 장점: 적은 스레드로 높은 동시성. 단점: 코드 복잡도 증가, 디버깅 어려움.
최종 선택. BlockingQueue Mini Kafka의 목적이 Kafka 내부 구조를 이해하는 것이므로, 복잡한 Reactor 패턴보다 직관적인 BlockingQueue를 선택했다. 실제 Kafka도 내부적으로 비슷한 블로킹 구조를 사용한다.
트래픽 이슈 대응 포인트
Consumer 관련 장애가 발생했을 때 아래 순서로 점검한다.
- 큐 길이가 계속 늘어나고 있나? 큐 모니터링 지표를 확인한다. 늘어나고 있으면 Consumer가 따라가지 못하는 것이다.
- Consumer CPU/메모리는 정상인가? 처리 로직에 병목이 있는지 확인한다.
- 파티션 수와 Consumer 수는 적절한가? Consumer를 늘려도 파티션 수보다 많으면 의미가 없다.
- Backpressure가 동작하고 있나? Producer에서
QueueFullException이 발생하는지 확인한다.
느낀 점
Consumer 설계에서 가장 중요한 건 “언제 기다릴지”를 명확하게 정하는 것이다. Busy-wait는 CPU를 낭비하고, 너무 오래 sleep하면 지연이 생긴다. BlockingQueue처럼 “메시지가 올 때까지 효율적으로 기다리는” 구조가 답이었다.
Backpressure를 넣어두면 트래픽 급증 시에도 “어디에서 병목이 걸렸는지” 바로 짚을 수 있다. 큐가 가득 찼다는 건 Consumer가 느리다는 신호이고, 이 신호를 Producer에게 전달해서 전체 시스템이 graceful하게 느려지게 만들 수 있다.
코드
class Consumer(
private val id: String,
private val handler: (Message) -> Unit
) {
@Volatile private var running = true
private var assignedPartitions: List<Partition> = emptyList()
fun start() {
thread(name = "consumer-$id") {
while (running) {
for (partition in assignedPartitions) {
val message = partition.poll(timeout = 100)
if (message != null) {
try {
handler(message)
} catch (e: Exception) {
// 에러 로깅 및 DLQ 전송
println("Error processing message: ${e.message}")
}
}
}
}
}
}
fun assignPartitions(partitions: List<Partition>) {
assignedPartitions = partitions
}
fun stop() {
running = false
}
}
class Partition(val id: Int) {
private val queue = LinkedBlockingQueue<Message>(10000)
fun publish(message: Message): Boolean {
return queue.offer(message, 100, TimeUnit.MILLISECONDS)
}
fun poll(timeout: Long): Message? {
return queue.poll(timeout, TimeUnit.MILLISECONDS)
}
fun size(): Int = queue.size
}4. Partition
왜 이걸 고민했나
단일 큐에 모든 이벤트가 몰리면 Consumer 한 대가 모든 부담을 떠안는다. 트래픽이 늘어나면 Consumer를 늘리면 되지 않나? 하지만 위에서 봤듯이 파티션이 1개면 Consumer를 10개 띄워도 1개만 일한다.
파티션을 어떻게 나눠야 병렬 처리의 이점을 살릴 수 있는지 확인하고 싶었다. 그리고 “같은 사용자의 이벤트는 순서가 보장되어야 한다”는 요구사항을 어떻게 만족시킬 수 있는지도 알고 싶었다.
어떤 문제를 예상했나
가설 1. 핫 파티션 문제 특정 사용자(예: 대형 셀러)에게 이벤트가 몰리면 해당 파티션만 지연이 심해질 것이다. 파티션을 4개로 나눠도 한 파티션에 트래픽의 90%가 몰리면 의미가 없다.
이 가설은 부하 테스트에서 확인했다. 특정 셀러의 주문을 의도적으로 한 파티션에 몰아보니, 그 파티션의 Consumer만 과열되어 지연이 폭증했다.
가설 2. Round-robin과 순서 보장의 충돌 key 없이 round-robin으로 파티션을 선택하면 같은 주문의 이벤트(생성 → 결제 → 배송)가 서로 다른 파티션에 들어갈 수 있다. Consumer 3대가 각각 다른 파티션을 처리하면 “배송” 이벤트가 “생성” 이벤트보다 먼저 처리될 수 있다.
가설 3. AtomicInteger overflow
Round-robin 카운터를 AtomicInteger로 구현하면 Integer.MAX_VALUE를 넘어갈 때 음수가 된다. 음수를 배열 인덱스로 쓰면 ArrayIndexOutOfBoundsException이 발생할 것이다.
구현하며 발견한 것
발견 1. 핫 파티션 문제가 실험에서 드러났다
테스트로 특정 key에 트래픽의 80%를 몰아봤다. 4개 파티션 중 1개의 큐 길이만 폭발적으로 늘어났고, 해당 파티션의 처리 지연이 다른 파티션의 10배가 됐다.
| 파티션 | 트래픽 비율 | 큐 길이 | 평균 지연 |
|---|---|---|---|
| 0 | 80% | 50,000 | 500ms |
| 1 | 7% | 3,000 | 50ms |
| 2 | 7% | 3,000 | 50ms |
| 3 | 6% | 2,500 | 45ms |
발견 2. 이중 전략이 필요했다
key가 있으면 hash로 파티션을 선택하고(순서 보장), key가 없으면 round-robin으로 선택하는(부하 분산) 이중 전략을 적용했다.
fun selectPartition(key: String?): Partition =
if (key != null) {
val index = abs(key.hashCode()) % partitions.size
partitions[index]
} else {
val index = floorMod(roundRobinCounter.getAndIncrement(), partitions.size)
partitions[index]
}이 전략을 적용하니 순서가 중요한 메시지는 순서가 보장되고, 순서가 중요하지 않은 메시지는 고르게 분산됐다.
발견 3. overflow 문제가 실험에서 발생했다
AtomicInteger가 Integer.MAX_VALUE를 넘어가는 테스트를 돌렸다. 약 21억 번의 호출 후 카운터가 음수가 되면서 ArrayIndexOutOfBoundsException이 발생했다.
// 문제 있는 코드
val index = roundRobinCounter.getAndIncrement() % partitions.size
// roundRobinCounter가 음수가 되면 index도 음수
// 해결
val index = floorMod(roundRobinCounter.getAndIncrement(), partitions.size)
// floorMod는 항상 양수 반환Math.floorMod()는 음수에 대해서도 항상 양수를 반환한다. -1 % 4 = -1이지만 floorMod(-1, 4) = 3이다.
의사결정 과정
파티션 수 결정 기준
파티션 수를 어떻게 정할까? 여러 요소를 고려했다.
- 예상 Consumer 수. 파티션 수 ≥ Consumer 수여야 모든 Consumer가 일할 수 있다.
- 예상 처리량. 파티션당 처리량 × 파티션 수 ≥ 예상 트래픽이어야 한다.
- 순서 보장 범위. 파티션 수가 많을수록 같은 key가 같은 파티션에 갈 확률이 줄어… 아니, hash 기반이니까 같은 key는 항상 같은 파티션에 간다.
최종 결정. 예상 최대 Consumer 수의 2배 정도로 파티션 수를 설정하기로 했다. Consumer를 4대까지 늘릴 수 있다면 파티션은 8개. 이렇게 하면 Consumer를 늘릴 여유가 생긴다.
트래픽 이슈 대응 포인트
파티션 관련 장애가 발생했을 때 아래 순서로 점검한다.
- 파티션별 큐 길이가 균등한가? 특정 파티션만 길이가 길다면 핫 파티션 문제다.
- 핫 파티션의 원인이 되는 key는 무엇인가? 해당 key의 트래픽을 분석한다.
- 파티션 수와 Consumer 수가 적절한가? 파티션 수 < Consumer 수면 일부 Consumer가 놀고 있다.
- round-robin 카운터가 정상인가? overflow로 인한 에러가 없는지 확인한다.
느낀 점
파티션 설계는 단순히 “N개로 나눈다”가 아니라, key 전략과 모니터링까지 포함해야 한다는 걸 깨달았다. 파티션을 나눠놔도 트래픽 분포를 모니터링하지 않으면 핫 파티션 문제를 발견할 수 없다.
파티션별 처리량/큐 길이를 상시 모니터링해야 “왜 lag가 쌓이지?”에 답할 수 있다. 전체 처리량이 정상이어도 특정 파티션만 밀릴 수 있고, 이건 평균값만 봐서는 알 수 없다.
코드
class Topic(val name: String, partitionCount: Int) {
private val partitions = (0 until partitionCount).map { Partition(it) }
private val roundRobinCounter = AtomicInteger(0)
fun selectPartition(key: String?): Partition =
if (key != null) {
val index = abs(key.hashCode()) % partitions.size
partitions[index]
} else {
val index = floorMod(roundRobinCounter.getAndIncrement(), partitions.size)
partitions[index]
}
fun getPartitions(): List<Partition> = partitions
fun getPartitionStats(): Map<Int, PartitionStats> =
partitions.associate { it.id to PartitionStats(it.id, it.size()) }
}
data class PartitionStats(
val partitionId: Int,
val queueLength: Int
)5. Replication
왜 이걸 고민했나
파티션이 올라간 노드가 죽으면 어떻게 되나? 해당 파티션의 메시지가 모두 사라진다. 메모리에만 있던 데이터이기 때문이다.
실험 환경에서 acks=1(리더만 확인)으로 두고 리더 노드를 강제로 종료시키니, 팔로워에 복제되기 전에 메시지가 유실되는 모습을 똑같이 볼 수 있었다.
어떤 문제를 예상했나
가설 1. 팔로워 지연과 중복 소비 팔로워가 리더보다 2초 뒤쳐져 있다고 하자. Consumer가 리더에서 offset 100까지 읽고 commit했는데, 그 순간 리더가 죽어서 팔로워가 승격됐다. 새 리더에는 offset 98까지밖에 없다면? Consumer는 99, 100을 다시 읽게 된다(중복 소비).
가설 2. ISR 없이 운영하면 성능 저하 In-Sync Replica(ISR)는 리더와 동기화된 팔로워 목록이다. 지연된 팔로워까지 acks를 기다리면 전체 처리량이 떨어진다. ISR에서 지연된 팔로워를 제외해야 acks 대기 시간이 줄어들 것이다.
가설 3. 리더 선출 없이는 복구 불가 리더가 죽었을 때 팔로워 중 하나를 자동으로 리더로 승격시키는 로직이 없으면 해당 파티션은 사용 불가 상태가 된다.
구현하며 발견한 것
발견 1. acks=1로 유실을 재현했다
테스트 시나리오:
- 리더에 메시지 전송 (acks=1, 리더만 확인)
- 리더가 팔로워에 복제하기 전에 리더 프로세스 kill
- 팔로워를 새 리더로 승격
- 메시지 확인 → 없음!
// acks=1 시뮬레이션
fun publishWithAcksOne(message: Message) {
leader.publish(message)
// 팔로워 복제를 기다리지 않음
}발견 2. acks=all로 유실 방지
모든 ISR 팔로워가 복제를 확인한 후에야 send()가 반환되도록 변경했다.
// acks=all 시뮬레이션
fun publishWithAcksAll(message: Message) {
leader.publish(message)
for (follower in inSyncReplicas) {
follower.replicate(message)
}
// 모든 ISR이 복제를 확인해야 반환
}같은 시나리오로 테스트했을 때 메시지가 유실되지 않았다. 리더가 죽어도 팔로워에 메시지가 있으므로 복구 가능.
발견 3. ISR 관리가 성능의 핵심이었다
느린 팔로워가 ISR에 있으면 모든 메시지가 그 팔로워를 기다려야 한다. 팔로워 지연이 2초면 모든 메시지의 처리 시간이 2초 이상 걸린다.
ISR에서 일정 시간(예: 10초) 이상 지연된 팔로워를 제외하는 로직을 추가했다.
class ReplicatedPartition {
private val maxLagMs = 10_000L // 10초
fun updateISR() {
inSyncReplicas = followers.filter { follower ->
val lag = leader.getLatestOffset() - follower.getLatestOffset()
val lagMs = lag * averageMessageIntervalMs
lagMs < maxLagMs
}
}
}의사결정 과정
acks 설정 선택
| acks 값 | 장점 | 단점 |
|---|---|---|
| 0 | 가장 빠름 | 유실 가능성 높음 |
| 1 | 빠름 | 리더 장애 시 유실 가능 |
| all | 유실 없음 | 가장 느림 |
최종 선택. 중요한 비즈니스 메시지(결제, 주문)는 acks=all, 덜 중요한 메시지(로그, 통계)는 acks=1로 설정하기로 했다.
트래픽 이슈 대응 포인트
Replication 관련 장애가 발생했을 때 아래 순서로 점검한다.
- 리더 파티션이 살아있나? 리더가 죽었으면 팔로워 승격이 필요하다.
- ISR 목록에 팔로워가 있나? ISR이 비어있으면
acks=all메시지가 영원히 대기한다. - 팔로워 지연이 얼마인가? 지연이 크면 ISR에서 제외하거나 팔로워를 증설해야 한다.
- acks 설정이 적절한가? 유실이 발생했다면
acks=all로 변경을 검토한다.
느낀 점
Replication은 단순히 복제본을 늘리는 문제가 아니라, 지연을 감시하고 승격 절차를 자동화해야 한다는 걸 다시 깨달았다. 복제본이 3개 있어도 모두 지연되면 의미가 없다.
“acks=all이라는 옵션이 왜 존재하는가”를 몸으로 이해했다. 예전에는 “그냥 안전한 옵션”이라고만 알았는데, 이제는 “ISR에 있는 모든 팔로워가 복제를 확인할 때까지 기다린다”는 정확한 의미를 알게 됐다.
코드
class ReplicatedPartition(
val id: Int,
replicaCount: Int
) {
private var leader = Partition(id)
private val followers = MutableList(replicaCount - 1) { Partition(id) }
private var inSyncReplicas = followers.toMutableList()
private val maxLagMs = 10_000L
fun publish(message: Message, acks: AcksMode = AcksMode.ALL) {
leader.publish(message)
when (acks) {
AcksMode.NONE -> { /* fire and forget */ }
AcksMode.LEADER_ONLY -> { /* 리더만 확인, 이미 완료 */ }
AcksMode.ALL -> {
for (replica in inSyncReplicas) {
replica.publish(message)
}
}
}
}
fun electNewLeader(): Boolean {
if (inSyncReplicas.isEmpty()) {
return false // 승격할 팔로워 없음
}
val newLeader = inSyncReplicas.removeAt(0)
followers.remove(newLeader)
leader = newLeader
return true
}
fun updateISR() {
inSyncReplicas = followers.filter { follower ->
leader.size() - follower.size() < 100 // 간단한 lag 기준
}.toMutableList()
}
}
enum class AcksMode {
NONE, // acks=0
LEADER_ONLY, // acks=1
ALL // acks=all
}6. Consumer Group
왜 이걸 고민했나
Consumer를 여러 대 띄우면 어떻게 되나? 아무 조율 없이 띄우면 두 가지 문제가 생긴다.
- 중복 처리. 같은 메시지를 여러 Consumer가 처리한다.
- 처리 누락. 어떤 메시지는 아무도 처리하지 않는다.
Consumer Group은 “같은 그룹의 Consumer들은 메시지를 나눠서 처리한다”는 규칙을 제공한다. 그룹 내 Consumer A가 파티션 0을 담당하면, Consumer B는 파티션 0을 건드리지 않는다.
문제는 Consumer가 늘어나거나 줄어들 때다. 오토스케일링 환경에서 Consumer가 1분마다 추가되고 제거되면 파티션 재할당(리밸런싱)이 계속 발생한다. 리밸런싱 중에는 메시지 처리가 멈추므로 lag가 쌓인다.
과거에 오토스케일이 너무 민감하게 반응해서 리밸런싱 폭풍(rebalance storm)이 발생한 적이 있다. CPU가 70%만 넘어도 인스턴스를 추가하고, 50% 아래로 떨어지면 제거하는 설정이었는데, 트래픽 패턴에 따라 1분에 여러 번 스케일 인/아웃이 발생했다.
어떤 문제를 예상했나
가설 1. 리밸런스 중 lag 폭발 리밸런싱이 1분 간격으로 계속 발생하면, 매번 10초씩 처리가 멈추고, 그 사이 메시지가 1000개씩 쌓인다면 lag가 줄어들 틈이 없다.
가설 2. 파티션 할당 불균형 Consumer 수가 파티션 수로 나눠떨어지지 않으면 일부 Consumer가 더 많은 파티션을 담당한다. 예를 들어 파티션 4개에 Consumer 3대면, 한 Consumer가 파티션 2개를 담당해야 한다.
가설 3. chunk 계산 버그
파티션을 Consumer에 나눠주는 로직에서 floor와 ceil을 잘못 쓰면 마지막 Consumer가 파티션을 못 받는 버그가 생길 수 있다.
구현하며 발견한 것
발견 1. 리밸런스 폭풍 재현
1분 간격으로 Consumer를 추가/제거하는 테스트를 돌렸다.
[10:00] Consumer 추가 → 리밸런싱 시작, 15초간 처리 중단
[10:01] Consumer 제거 → 리밸런싱 시작, 15초간 처리 중단
[10:02] Consumer 추가 → 리밸런싱 시작, 15초간 처리 중단
...1분 중 15초가 처리 중단이면 처리량이 75%로 떨어진다. lag는 계속 쌓여서 1만 건을 넘어갔다.
발견 2. cooldown으로 해결
스케일 인/아웃 후 최소 5분간은 추가 스케일링을 막는 cooldown 로직을 추가했다.
class ConsumerGroup {
private var lastRebalanceTime = 0L
private val cooldownMs = 5 * 60 * 1000L // 5분
fun canRebalance(): Boolean {
return System.currentTimeMillis() - lastRebalanceTime > cooldownMs
}
fun register(consumer: Consumer) {
if (!canRebalance()) {
pendingConsumers.add(consumer)
return
}
consumers.add(consumer)
rebalance()
lastRebalanceTime = System.currentTimeMillis()
}
}cooldown을 적용하니 리밸런싱 빈도가 줄어들고 lag가 안정화됐다.
발견 3. chunk 계산 버그
처음에 이렇게 구현했다.
val chunkSize = partitions.size / consumers.size // floor
val chunks = partitions.chunked(chunkSize)파티션 5개, Consumer 3대일 때:
chunkSize = 5 / 3 = 1chunks = [[p0], [p1], [p2], [p3], [p4]]→ 5개의 chunk
Consumer는 3대인데 chunk가 5개면 어떻게 되나? 뒤의 2개 chunk(p3, p4)가 할당되지 않는다.
ceil로 수정했다.
val chunkSize = ceil(partitions.size / consumers.size.toDouble()).toInt()
val chunks = partitions.chunked(chunkSize)파티션 5개, Consumer 3대일 때:
chunkSize = ceil(5 / 3.0) = 2chunks = [[p0, p1], [p2, p3], [p4]]→ 3개의 chunk
이제 모든 파티션이 할당된다.
의사결정 과정
리밸런싱 전략
- Eager 리밸런싱. 모든 Consumer가 파티션을 반납하고 다시 할당. 구현이 단순하지만 모든 Consumer가 잠시 멈춤.
- Incremental 리밸런싱. 변경이 필요한 Consumer만 파티션을 조정. 구현이 복잡하지만 중단 최소화.
최종 선택. Mini Kafka에서는 Eager 방식을 선택했다. 실제 Kafka도 오랫동안 Eager 방식을 사용했고, Incremental(Cooperative)은 최근에야 도입됐다.
트래픽 이슈 대응 포인트
Consumer Group 관련 장애가 발생했을 때 아래 순서로 점검한다.
- 리밸런싱이 자주 발생하고 있나? 리밸런싱 로그를 확인한다.
- 오토스케일 정책이 너무 민감한가? cooldown 시간을 확인하고 조정한다.
- 모든 파티션이 할당됐나? 파티션-Consumer 매핑을 확인한다.
- 특정 Consumer에 파티션이 몰려있나? 할당 균형을 확인한다.
느낀 점
Consumer Group은 “얼마나 기다렸다가 재분배할지”를 정하는 게 핵심이다. 너무 빨리 재분배하면 리밸런스 폭풍, 너무 느리면 일부 파티션이 처리되지 않는다.
모니터링 없이 오토스케일만 믿으면 장애가 더 커질 수 있다는 사실을 코드로 증명할 수 있게 됐다. “CPU가 높으니까 자동으로 늘어나겠지”라는 생각이 오히려 리밸런스 폭풍을 유발한다.
코드
class ConsumerGroup(
private val groupId: String,
private val topic: Topic
) {
private val consumers = mutableListOf<Consumer>()
private val pendingConsumers = mutableListOf<Consumer>()
private var lastRebalanceTime = 0L
private val cooldownMs = 5 * 60 * 1000L
fun register(consumer: Consumer) {
if (!canRebalance()) {
pendingConsumers.add(consumer)
schedulePendingRebalance()
return
}
consumers.add(consumer)
rebalance()
}
fun unregister(consumer: Consumer) {
consumers.remove(consumer)
if (canRebalance()) {
rebalance()
}
}
private fun canRebalance(): Boolean {
return System.currentTimeMillis() - lastRebalanceTime > cooldownMs
}
private fun schedulePendingRebalance() {
// cooldown 후에 pending consumers 처리
thread {
Thread.sleep(cooldownMs)
consumers.addAll(pendingConsumers)
pendingConsumers.clear()
rebalance()
}
}
private fun rebalance() {
if (consumers.isEmpty()) return
val partitions = topic.getPartitions()
val chunkSize = ceil(partitions.size / consumers.size.toDouble()).toInt()
val chunks = partitions.chunked(chunkSize)
consumers.forEachIndexed { index, consumer ->
consumer.assignPartitions(chunks.getOrElse(index) { emptyList() })
}
lastRebalanceTime = System.currentTimeMillis()
}
fun getAssignments(): Map<String, List<Int>> {
return consumers.associate {
it.id to it.getAssignedPartitions().map { p -> p.id }
}
}
}7. Offset
왜 이걸 고민했나
Consumer가 재시작하면 어디서부터 읽어야 하나? offset(현재 읽은 위치)을 저장하지 않으면 두 가지 문제가 생긴다.
- 중복 처리. 처음부터 다시 읽으면 이미 처리한 메시지를 또 처리한다.
- 유실. 마지막까지 건너뛰면 처리 안 된 메시지가 사라진다.
manual commit 위치를 잘못 잡으면 중복 결제가 발생할 수 있다. 예를 들어 결제 API를 호출하기 전에 offset을 commit했는데, 결제 API가 타임아웃으로 실패하면 같은 결제 요청이 두 번 나간다.
어떤 문제를 예상했나
가설 1. 핸들러 초반 commit의 위험 메시지 핸들러 시작 시점에 offset을 commit하면, 핸들러가 실패해도 offset은 이미 넘어간 상태다. 재시도할 때 해당 메시지는 건너뛰어진다.
fun handle(message: Message) {
offsetManager.commit(offset) // 먼저 commit
processPayment(message) // 여기서 실패하면?
// → offset은 이미 commit됐으므로 재시도 시 이 메시지를 건너뜀
}가설 2. 인메모리 offset의 휘발성 offset을 인메모리에만 저장하면 Consumer가 재시작될 때 모든 offset이 사라진다. 어디까지 처리했는지 알 수 없으니 처음부터 다시 읽거나, 끝까지 건너뛰거나 해야 한다.
가설 3. auto-commit의 위험 Kafka에는 auto-commit 기능이 있다. 일정 주기(예: 5초)마다 현재 offset을 자동으로 commit한다. 편리하지만, 메시지를 받았지만 아직 처리하지 않은 상태에서 commit되면 그 메시지는 유실된다.
구현하며 발견한 것
발견 1. 핸들러 초반 commit으로 중복 처리 재현
테스트 시나리오:
- 메시지 받음 (offset=100)
- offset commit (100 → 101)
- 결제 API 호출 → 타임아웃 예외 발생
- Consumer 재시작
- offset=101부터 읽음 → offset=100 메시지는 처리 안 됨
이 시나리오에서 결제 API는 처리됐을 수도 있고 안 됐을 수도 있다. 타임아웃이 발생했다는 건 “응답을 못 받았다”는 뜻이지 “처리가 안 됐다”는 뜻이 아니다.
발견 2. 핸들러 완료 후 commit으로 at-least-once 보장
fun handle(message: Message) {
processPayment(message) // 먼저 처리
offsetManager.commit(offset) // 성공 후 commit
}이 구조에서는 핸들러가 실패하면 offset이 commit되지 않으므로, 재시작 시 같은 메시지를 다시 받는다. 중복 처리가 발생할 수 있지만 유실은 없다. 이를 “at-least-once” 보장이라고 한다.
중복 처리를 막으려면 비즈니스 로직에서 idempotency를 구현해야 한다. 예를 들어 결제 요청에 고유 ID를 포함시키고, 이미 처리된 ID는 무시한다.
발견 3. 외부 저장소의 필요성
offset을 인메모리에만 두면 Consumer 재시작 시 모든 offset이 사라진다. Redis나 DB 같은 외부 저장소에 저장해야 지속성이 보장된다.
// 인메모리 (휘발성)
class InMemoryOffsetManager {
private val offsets = ConcurrentHashMap<String, Long>()
}
// Redis (지속성)
class RedisOffsetManager(private val redis: RedisClient) {
fun commit(groupId: String, partitionId: Int, offset: Long) {
redis.set("offset:$groupId:$partitionId", offset.toString())
}
fun getOffset(groupId: String, partitionId: Int): Long {
return redis.get("offset:$groupId:$partitionId")?.toLong() ?: 0L
}
}의사결정 과정
commit 전략
| 전략 | 장점 | 단점 |
|---|---|---|
| 핸들러 전 commit | 중복 처리 없음 | 유실 가능 |
| 핸들러 후 commit | 유실 없음 | 중복 처리 가능 |
| auto-commit | 구현 단순 | 유실/중복 모두 가능 |
최종 선택. 핸들러 후 commit + idempotency 구현. 유실은 절대 안 되고, 중복은 비즈니스 로직에서 막을 수 있다.
저장소 선택
실제 Kafka는 offset을 내부 Topic(__consumer_offsets)에 저장한다. Mini Kafka에서는 간단하게 인메모리로 구현하되, 실제 운영에서는 Redis나 DB를 사용해야 한다는 점을 명시했다.
트래픽 이슈 대응 포인트
offset 관련 장애가 발생했을 때 아래 순서로 점검한다.
- 중복 처리가 발생했나? 비즈니스 로그에서 같은 요청이 두 번 처리됐는지 확인한다.
- commit 순서가 올바른가? 핸들러 완료 후 commit하고 있는지 코드를 확인한다.
- offset 저장소가 정상인가? Redis/DB 연결 상태를 확인한다.
- idempotency가 구현되어 있나? 중복 요청을 막는 로직이 있는지 확인한다.
느낀 점
commit 순서를 “DB 트랜잭션 커밋 → idempotent write → offset commit”으로 문서화해야 한다는 확신이 생겼다. 이 순서를 지키지 않으면 중복이나 유실이 발생한다.
재처리 전략을 정리할 때 내가 가장 먼저 묻는 항목은 “offset을 어디에 저장했는가”다. 인메모리면 휘발성, 외부 저장소면 지속성. 저장소 선택이 복구 가능 범위를 결정한다.
코드
class OffsetManager {
private val offsets = ConcurrentHashMap<String, ConcurrentHashMap<Int, Long>>()
fun commit(groupId: String, partitionId: Int, offset: Long) {
offsets.computeIfAbsent(groupId) { ConcurrentHashMap() }[partitionId] = offset
}
fun getOffset(groupId: String, partitionId: Int): Long =
offsets[groupId]?.get(partitionId) ?: 0L
fun getAllOffsets(groupId: String): Map<Int, Long> =
offsets[groupId]?.toMap() ?: emptyMap()
}
class ConsumerWithOffset(
private val id: String,
private val groupId: String,
private val offsetManager: OffsetManager,
private val handler: (Message) -> Unit
) {
fun processMessage(partition: Partition) {
val currentOffset = offsetManager.getOffset(groupId, partition.id)
val message = partition.pollAt(currentOffset)
if (message != null) {
try {
handler(message) // 먼저 처리
offsetManager.commit(groupId, partition.id, currentOffset + 1) // 성공 후 commit
} catch (e: Exception) {
// 실패 시 offset을 commit하지 않음 → 재시도 시 같은 메시지 다시 받음
throw e
}
}
}
}마무리
Mini Kafka를 만들면서 단순히 “구조를 따라 했다”에서 끝내지 않고, 왜 그런 구조가 필요한지를 문제-예상-발견-의사결정-느낀점의 형태로 남겼다.
각 컴포넌트에서 내린 핵심 의사결정을 정리하면:
| 컴포넌트 | 핵심 의사결정 | 근거 |
|---|---|---|
| Topic | 도메인별 분리 | 장애 격리, SLA 분리 |
| Producer | Fire-and-forget + DLQ | 장애 지점 분리 |
| Consumer | BlockingQueue + Backpressure | CPU 효율, 장애 전파 방지 |
| Partition | key 기반 + round-robin 이중 전략 | 순서 보장 + 부하 분산 |
| Replication | acks=all + ISR 관리 | 유실 방지 + 성능 균형 |
| Consumer Group | cooldown + ceil 기반 할당 | 리밸런스 폭풍 방지 |
| Offset | 핸들러 후 commit + 외부 저장소 | at-least-once 보장 |
이제는 트래픽 이슈가 발생했을 때 다음처럼 체계적으로 점검할 수 있다.
장애 대응 체크리스트
1단계. 증상 파악
- 어떤 Topic에서 문제가 발생했나?
- lag가 쌓이고 있나? 어느 파티션에서?
- DLQ에 메시지가 쌓이고 있나?
2단계. 원인 분류
- Producer 문제: Kafka 연결 실패, key 설정 오류
- Consumer 문제: 처리 속도 저하, 핸들러 예외
- 파티션 문제: 핫 파티션, 할당 불균형
- 리밸런스 문제: 잦은 스케일 인/아웃
- Replication 문제: 팔로워 지연, 리더 장애
3단계. 대응
- 단기: 문제 파티션/Consumer 재시작, DLQ 재처리
- 중기: 파티션 수 조정, Consumer 증설, cooldown 조정
- 장기: 모니터링 강화, 알림 설정, 문서화
결론: Outbox 이후의 구조를 손으로 구현하지 않으면 “왜 이렇게 설계했는가”를 설명하기 어렵다. Mini Kafka를 직접 만들면서 이제는 트래픽 사고가 터졌을 때 어떤 지점을 먼저 확인해야 할지 분명히 말할 수 있게 됐다.