<Kafka> 3. 카프카 컨슈머 (Consumer)
by BFine출처&참고 : https://product.kyobobook.co.kr/detail/S000201464167
가. 컨슈머 알아보기
a. 컨슈머와 컨슈머그룹 (Group Consumer)
- 프로듀서가 어플리케이션이 검사할수 있는 속도보다 더 빠른 속도로 토픽에 메세지를 쓰게 된다면 컨슈머의 메세지의 처리가 뒤로 밀리게 될 것 이다.
=> 그렇기 때문에 토픽으로부터 데이터를 읽어오는 작업을 확장(scale-out)할 수 있어야 하는데 이를 컨슈머그룹을 통해 할 수 있다.
- 컨슈머그룹에 컨슈머를 추가해서 카프카 토픽에 읽어오는 데이터 양을 확장 할 수 있다.
- 카프카 컨슈머는 보통 컨슈머 그룹의 일부로서 작동한다.
=> 동일 컨슈머그룹에 속한 여러 개의 컨슈머들이 동일토픽을 구독할경우 각각의 컨슈머는 서로 다른 피티션의 메세지를 받는다.
- 토픽의 파티션 수보다 더 많은 컨슈머가 있거나 추가한다면 몇몇은 유휴 상태가 되어 메세지를 전혀 받지 못할 수 있다.
- 일반적으로 토픽에 쓰여진 데이터를 전체 조직안에서 여러 용도로 사용할수 있도록 만드는 것이기 때문에 컨슈머그룹을 추가해
동일 데이터를 여러곳에서 받을 수 있도록 디자인 되었다.
b. 파티션 리밸런스 (Partition Rebalance)
- 컨슈머는 컨슈머그룹의 그룹코디네이터 역할을 하는 브로커에 하트비트를 전송하여 파티션에 대한 소유권을 유지 한다.
=> 하트비트는 컨슈머의 백그라운드 스레드에 의해 전송되며 일정한 간격을 두고 연결을 유지한다.
- 컨슈머그룹에 컨슈머를 추가하거나 컨슈머에 문제가 생겨 다운되거나 하는 경우 컨슈머의 파티션을 재할당하는 작업을 리밸런스라고 한다.
=> 그룹코디네이터에 하트비트가 몇 초 이상 들어오지 않는 것을 컨슈머가 죽었다고 판단한 뒤 리밸런스를 실행시킨다.
- eager 리밸런스는 모든 컨슈머는 읽기 작업을 멈추고 모든 파티션에 대한 소유권을 포기한뒤에 다시 그룹에 참여하여 새로운 파티션을 할당하는 방식이다.
=> 주의할점은 컨슈머 그룹의 모든 컨슈머들이 중단 되기 때문에 전체 작업이 중단되는 부분이 발생한다.
- copperative 리밸런스는 한 컨슈머에게 할당되어 있던 파티션만 다른 컨슈머에 재할당하는 방식이다. (점진적)
1. 컨슈머 그룹 리더가 파티션이 재할당 될것이라고 통보하면 컨슈머들은 해당 파티션에 대한 작업을 멈추고 소유권을 포기한다.
2. 컨슈머 그룹 리더가 이 포기한 파티션들을 새로운 컨슈머에게 할당한다.
- 3.1 버전 이후로는 copperative 리밸런스가 기본값이 되었다고 한다.
- 번외) 컨슈머가 그룹에 참여하고 싶을때는 그룹 코디네이터에게 JoinGrop 요청을 보내며 가장 먼저 참여한 컨슈머가 그룹리더가 된다.
리더는 그룹코디네이터로부터 그룹 안에 모든 컨슈머 목록을 받아서 각 컨슈머에 파티션을 할당하는 역할을 한다.
c. 정적 그룹 멤버십 (Static Group Membership)
- 기본적으로 컨슈머가 가지는 컨슈머그룹의 멤버로서 자격(멤버십)은 일시적이다.
=> 컨슈머가 컨슈머그룹을 떠나는 순간 할당된 파티션들은 해제되고 다시 참여하면 새로운 멤버 ID가 발급되면서 새로운 파티션들이 할당된다.
- 정적 멤버로서 컨슈머가 컨슈머그룹에 참여하면 꺼지더라도 자동으로 그룹을 떠나지는 않는다. 다시 조인하면 멤버십이 유지되어 할당받았던
파티션들을 그대로 재할당 받는다. (세션 타임아웃이 경과될떄까지 여전히 그룹멤버로 남아있게 된다)
=> 그룹코디네이터가 각 멤버에 대한 파티션 할당을 캐시해두고 있기 때문에 리밸런싱이 발생하지 않는다.
- group.instance.id를 설정하면 정적 멤버로서 컨슈머그룹에 컨슈머로서 참여할 수 있다.
=> 같은 값을 가진 두 개의 컨슈머가 같은 그룹에 조인하는 경우 에러가 발생한다.
- session.timeout.ms 설정으로 정적멤버를 종료시킬수도 있다.
=> 어플리케이션 재시작이 리밸런스를 작동시키지 않을만큼 충분히 크면서 실제로 멈춘경우 리밸런싱이 발생할수 있는 작은값으로 설정하는 것이 좋다.
나. 컨슈머 만들기
a. 기본예제
implementation("org.apache.kafka:kafka-clients:3.8.0")
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.*
fun main(args: Array<String>) {
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ConsumerConfig.GROUP_ID_CONFIG] = "group-id"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
val consumer = KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singleton("score"))
while (true) {
val records : ConsumerRecords<String, String> = consumer.poll(Duration.ofSeconds(1))
for (recode: ConsumerRecord<String, String> in records) {
println(recode.value())
}
}
}
- 카프카 클라이언트 의존성을 추가하고 간단한 설정으로 토픽의 메세지를 구독하는 컨슈머를 만들수 있다
- 반드시 지정해야하는 속성은 bootstrap.servers, key.deserializer, value.deserializer 3개 이다.
=> deserializer는 바이트 배열을 자바 객체로 변환하는 클래스를 지정한다.
- 반드시 지정해야하는 것은 아니지만 일반적으로 사용되는 속성으로 group.id는 컨슈머그룹 id 를 지정하는 설정이다.
=> 어떤 컨슈머그룹에도 속하지 않는 컨슈머를 생성하는 것도 가능하지만 일반적이지 않다.
- 카프카API를 통해서 간단하게 토픽을 구독할 수 있는데 정규식으로도 가능하다. 토픽이 많은 경우에는 클라이언트에서 필터링해야한다.
=> 정규식으로 지정하는 경우에는 전체토픽과 파티션에 대한 정보를 브로커에 일정한 간격으로 요청해 오버헤드가 발생할수 있기 때문이다.
- 설정하고 실행해보면 카프카서버에 위에서 설정한 컨슈머그룹ID가 표시되는것을 볼 수 있다.
- 메세지를 생성해서 테스트 해보면 발행한 메세지를 작성한 어플리케이션에서 소비하는 것을 볼 수 있다.
b. 폴링
- 컨슈머는 카프카를 계속해서 폴링하지 않으면 죽은 것으로 간주되어 이 컨슈머가 읽던 파티션들을 다른 그룹 내의 컨슈머에게 넘긴다.
=> max.poll.interval.ms 에 지정된 시간 설정으로 관리된다.
- poll() 메서드에 전달하는 타임아웃 시간은 컨슈머 버퍼에 데이터가 없을 경우 poll()이 block 될 수 있는 최대 시간을 결정한다.
=> 0으로 지정되거나 버퍼 안에 레코드가 이미 있는 경우 poll()은 즉시 리턴 된다.
c. 스레드 안정성
- 하나의 스레드에 동일한 그룹 내의 여러 개의 컨슈머는 생성할 수 없고 같은 컨슈머를 다수의 스레드가 안전하게 사용할 수 도 없다.
=> 하나의 스레드당 하나의 컨슈머가 원칙이다.!
- 동일한 그룹에 속하는 여러 개의 컨슈머를 운용하고 싶다면 스레드를 여러개 생성해 각각의 컨슈머를 하나씩 돌리는 수 밖에 없다.
- 또 다른 방법으로는 이벤트를 받아서 큐에 넣는 컨슈머 하나와 이 큐에서 이벤트를 꺼내서 처리하는 여러 개의 워커 스레드를 사용하는 방법이 있다.
다. 다양한 컨슈머 설정들
- 대부분의 매개변수는 합리적인 기본값을 가지고 있기 떄문에 딱히 변경할 필요는 없다고 한다. 물론 도메인이나 상황에 따라서 적절하게 판단해야할것 같다.
a. fetch.min.bytes , fetch.max.bytes , fetch.max.wait.ms
- fetch.min.bytes는 컨슈머가 브로커로부터 레코드를 얻어올때 받는 데이터의 최소량을 지정한다. 기본값은 1byte 이다.
- fetch.max.bytes는 카프카가 리턴하는 최대 바이트 수를 지정한다. 기본값은 50MB 이다.
- fetch.max.wait.ms 는 카프카가 컨슈머에게 응답하기 전 충분한 데이터가 모일때까지 시간을 지정한다. 기본값은 500ms 이다.
=> fetch.min.bytes 두 조건중 하나가 만족되는 대로 리턴한다.
b. sesstion.timeout.ms, heartbeat.interval.ms, max.poll.interval.ms
- sesstion.timeout.ms 는 컨슈머와 브로커가 신호를 주고받지 않고도 살아있는 것으로 판정되는 최대 시간이다. 기본값은 45s 이다.
=> 기본값이 10초 -> 45초로 변경되었는데 이는 온프레미스 기준이었던것이 요즘은 클라우드 환경많이 사용하여 그에 대한 비용이 반영된것이다.
- heartbeat.interval.ms는 컨슈머가 브로커에 신호를 보내는 주기 값이다. 기본값은 3s 이다.
- max.poll.interval.ms는 컨슈머가 폴링하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간이다. 기본값은 5m 이다.
=> 하트비트는 백그라운드 스레드에서 돌아가기 때문에 하트비트는 정상이지만 처리하는 스레드에 문제가 생길 경우가 있기 때문에 필요하다.
d. enable.auto.commit, auto.commit.interval.ms, auto.offset.reset
- enable.auto.commit은 컨슈머가 자동으로 오프셋을 커밋할지 여부를 결정한다. 기본값은 true 이다.
=> true일때 auto.commit.interval.ms가 마지막으로 처리한 오프셋을 주기적으로 커밋하는 설정이다. 기본값은 5s이다.
- auto.offset.reset은 컨슈머가 오프셋을 커밋한적이 없거나 커밋된 오프셋이 유효하지 않을때의 작동을 정의한다. 기본값은 lastest 이다.
=> 유효하지 않은 오프셋을 만났을때 lastest는 가장 최신값을 읽고 earliest는 파티션 맨 처음부터 읽고 none은 예외를 발생시킨다.
e. partition.assignment.stragy
- partition.assignment.stragy는 어느 컨슈머에게 어느 파티션이 할당될지에 대한 전략을 결정한다. 기본값은 Range 이다.
1. Range는 각 토픽의 파티션들을 연속된 그룹으로 나눠서 할당한다.
=> consumer1 : partition0, partion1 / consumer2 : partition2, partion3
2. RoundRobin은 모든 파티션을 순차적으로 하나씩 컨슈머에게 할당한다
=> consumer1 : partition0, partion2 / consumer2 : partition1, partion3
3. Sticky은 가능한 균등하게 할당하면서 리밸런싱이 발생할때 파티션 재배치를 최소화 하는 전략으로 할당한다.
=> Cooperative Sticky 는 기본적으로 동일하지만 Cooperative 리밸런싱 기능을 지원한다.
f. client.id , client.rack
- client.id는 브로커가 요청을 보낸 클라이언트(프로듀서 or 컨슈머)를 식별하는데 쓰인다. 로깅 모니터링,지표, 쿼터에도 사용된다.
=> 여기서 쿼터는 카프카에서 사용되는 클라이언트가 사용할수 있는 리소스를 제한을 설정하는데 쓰인다.
- client.rack은 클라이언트가 속한 랙이나 가용영역을 지정하는 설정이다.
=> 기본적으로 컨슈머는 파티션의 리더 레플리카로부터 읽어오는데 같은 영역에 있는 레플리카로부터 메세지를 읽는게 성능,비용에서 더 유리하다.
g. group.instance.id , offsets.retention.minutes
- group.instance.id 는 컨슈머에 정적그룹멤버십 기능을 적용하기 위해 사용되는 설정이다.
- offsets.retention.minutes 는 브로커 설정으로 오프셋정보를 얼마동안 유지하는지에 사용되는 설정이다.
=> 더이상 그룹에 컨슈머가 없거나 더이상 토픽을 읽지 않는 경우
라. 기타
a. 오프셋과 커밋(Offset, Commit)
- 카프카의 고유 특성 중 하나는 컨슈머가 카프카를 사용해서 각 파티션에서의 위치를 추적할수 있는 부분이다.
- 카프카에서는 파티션에서의 현재 위치를 업데이트 하는 작업을 오프셋 커밋이라고 부르며 카프카는 레코드를 개별적으로 커밋하지 않는다.
=> 컨슈머는 파티션에서 성공적으로 처리한 마지막 메세지를 커밋함으로써 그 앞의 모든 메세지들 역시 성공적으로 처리된것을 암묵적으로 나타낸다.
- 오프셋 커밋 방법은 카프카에 특수 토픽인 __comsumer_offsets 토픽에 각 파티션별로 커밋된 오프셋을 업데이트 하도록 메세지를 보내서 이루어진다.
- 만약 커밋된 오프셋이 처리한 마지막 메세지의 오프셋보다 작을 경우 마지막 처리된 오프셋과 커밋된 오프셋 사이의 메세지들은 두번 처리된다.
=> 실제로 업무하면서 카프카 관련 모듈을 재배포 할때 중복이 발생하는 이유였다.
- 반대로 커밋된 오프셋이 마지막 처리된 오프셋 보다 클 경우 사이에 있는 모든 메세지들은 컨슈머 그룹에서 누락 된다!
- 오프셋 커밋을 수동으로 관리하는 경우 commitSync()가 가장 간단하고 실뢰성 있는 커밋 API 이다.
=> 모든 레코드가 처리되기 전에 호출하는 경우 장애가 발생했을때 처리되지 않은 메세지들이 누락될수 있는 위험이 있다.
- 수동으로 관리 단점 중 하나는 브로커가 커밋 요청에 응답할때까지 어플리케이션이 블록 된다는 점이 있다.
=> commitAsync()를 사용해 비동기로 처리할 수 있다. 응답을 받지 않기 때문에 재시도는 하지 않는다. (콜백 존재)
- commitAsync()과 commitSync() 를 같이 사용하는 방법도 있다. (개별 루프에서는 Async, 루프 종료에서는 Sync)
b. 리밸런스 리스너 (Rebalance Listener)
- 컨슈머는 종료하기 전이나 리밸런싱이 시작되기전에 정리하는 작업이 필요하다. 예를들어 마지막 처리한 이벤트의 오프셋 커밋 등
- 컨슈머 API는 컨슈머에 파티션이 할당되거나 해제될 때 사용자의 코드가 실행되도록하는 메커니즘을 리스너 설정을 통해 제공한다.
c. 디시리얼라이저(Deserializer)
- 컨슈머는 카프카로부터 받을 바이트배열을 자바객체로 변환하기 위해 디시리얼라이저를 필요로 한다.
- 같은 타입의 시리얼라이저와 디시리얼라이저를 묶어놓은 것을 제공해주는 클래스가 Serdes 이고 이것을 활용하면 좋을것 같다.
//props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = Serdes.String().serializer()
//props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = Serdes.String().serializer()
'공부 > Kafka' 카테고리의 다른 글
<Kafka> 5. 카프카 내부 메커니즘 살펴보기 (3) | 2024.11.03 |
---|---|
<Kafka> 4. 카프카 프로듀서 (Producer) (1) | 2024.10.28 |
<Kafka> 2. 카프카 설치 및 예제 만들기 (3) | 2024.10.09 |
<Kafka> 1. 카프카 기초 알아보기 (4) | 2024.10.06 |
블로그의 정보
57개월 BackEnd
BFine