You will be fine

<Kafka> 3. 카프카 컨슈머 (Consumer)

by BFine
반응형

출처&참고 : https://product.kyobobook.co.kr/detail/S000201464167

 

카프카 핵심 가이드 | 그웬 샤피라 - 교보문고

카프카 핵심 가이드 | 카프카를 창시한 사람들이 쓰고, 카프카 개발에 참여한 한국인 개발자가 옮긴 핵심 실무서모든 엔터프라이즈 애플리케이션은 로그 메시지, 지표, 사용자 행동 혹은 외부로

product.kyobobook.co.kr

가. 컨슈머 알아보기

 a. 컨슈머와 컨슈머그룹 (Group Consumer)

  -  프로듀서가 어플리케이션이 검사할수 있는 속도보다 더 빠른 속도로 토픽에 메세지를 쓰게 된다면 컨슈머의 메세지의 처리가 뒤로 밀리게 될 것 이다. 

      => 그렇기 때문에 토픽으로부터 데이터를 읽어오는 작업을 확장(scale-out)할 수 있어야 하는데 이를 컨슈머그룹을 통해 할 수 있다.

  -  컨슈머그룹에 컨슈머를 추가해서 카프카 토픽에 읽어오는 데이터 양을 확장 할 수 있다.

  -  카프카 컨슈머는 보통 컨슈머 그룹의 일부로서 작동한다.

      => 동일 컨슈머그룹에 속한 여러 개의 컨슈머들이 동일토픽을 구독할경우 각각의 컨슈머는 서로 다른 피티션의 메세지를 받는다.

  -  토픽의 파티션 수보다 더 많은 컨슈머가 있거나 추가한다면 몇몇은 유휴 상태가 되어 메세지를 전혀 받지 못할 수 있다.

  -  일반적으로 토픽에 쓰여진 데이터를 전체 조직안에서 여러 용도로 사용할수 있도록 만드는 것이기 때문에 컨슈머그룹을 추가해

      동일 데이터를 여러곳에서 받을 수 있도록 디자인 되었다. 

 

 b. 파티션 리밸런스 (Partition Rebalance)

  -  컨슈머는 컨슈머그룹의 그룹코디네이터 역할을 하는 브로커에 하트비트를 전송하여 파티션에 대한 소유권을 유지 한다. 

      => 하트비트는 컨슈머의 백그라운드 스레드에 의해 전송되며 일정한 간격을 두고 연결을 유지한다.

  -  컨슈머그룹에 컨슈머를 추가하거나 컨슈머에 문제가 생겨 다운되거나 하는 경우 컨슈머의 파티션을 재할당하는 작업을 리밸런스라고 한다.  

       => 그룹코디네이터에 하트비트가 몇 초 이상 들어오지 않는 것을 컨슈머가 죽었다고 판단한 뒤 리밸런스를 실행시킨다. 

  -  eager 리밸런스는 모든 컨슈머는 읽기 작업을 멈추고 모든 파티션에 대한 소유권을 포기한뒤에 다시 그룹에 참여하여 새로운 파티션을 할당하는 방식이다.

       => 주의할점은 컨슈머 그룹의 모든 컨슈머들이 중단 되기 때문에 전체 작업이 중단되는 부분이 발생한다.

  -  copperative 리밸런스는 한 컨슈머에게 할당되어 있던 파티션만 다른 컨슈머에 재할당하는 방식이다. (점진적)

      1.  컨슈머 그룹 리더가 파티션이 재할당 될것이라고 통보하면 컨슈머들은 해당 파티션에 대한 작업을 멈추고 소유권을 포기한다.

      2. 컨슈머 그룹 리더가 이 포기한 파티션들을 새로운 컨슈머에게 할당한다. 

  -  3.1 버전 이후로는 copperative 리밸런스가 기본값이 되었다고 한다.

  -  번외) 컨슈머가 그룹에 참여하고 싶을때는 그룹 코디네이터에게 JoinGrop 요청을 보내며 가장 먼저 참여한 컨슈머가 그룹리더가 된다.

             리더는 그룹코디네이터로부터 그룹 안에 모든 컨슈머 목록을 받아서 각 컨슈머에 파티션을 할당하는 역할을 한다. 

 

 c. 정적 그룹 멤버십 (Static Group Membership)

  -  기본적으로 컨슈머가 가지는 컨슈머그룹의 멤버로서 자격(멤버십)은 일시적이다.

      => 컨슈머가 컨슈머그룹을 떠나는 순간 할당된 파티션들은 해제되고 다시 참여하면 새로운 멤버 ID가 발급되면서 새로운 파티션들이 할당된다.

  -  정적 멤버로서 컨슈머가 컨슈머그룹에 참여하면 꺼지더라도 자동으로 그룹을 떠나지는 않는다. 다시 조인하면 멤버십이 유지되어 할당받았던

      파티션들을 그대로 재할당 받는다. (세션 타임아웃이 경과될떄까지 여전히 그룹멤버로 남아있게 된다)

      => 그룹코디네이터가 각 멤버에 대한 파티션 할당을 캐시해두고 있기 때문에 리밸런싱이 발생하지 않는다.

  -  group.instance.id를 설정하면 정적 멤버로서 컨슈머그룹에 컨슈머로서 참여할 수 있다.

      => 같은 값을 가진 두 개의 컨슈머가 같은 그룹에 조인하는 경우 에러가 발생한다. 

  -  session.timeout.ms 설정으로 정적멤버를 종료시킬수도 있다. 

      => 어플리케이션 재시작이 리밸런스를 작동시키지 않을만큼 충분히 크면서 실제로 멈춘경우 리밸런싱이 발생할수 있는 작은값으로 설정하는 것이 좋다.

 

나. 컨슈머 만들기

 a. 기본예제

implementation("org.apache.kafka:kafka-clients:3.8.0")
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.*

fun main(args: Array<String>) {
    val props = Properties()
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
    props[ConsumerConfig.GROUP_ID_CONFIG] = "group-id"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name

    val consumer = KafkaConsumer<String, String>(props)
    consumer.subscribe(Collections.singleton("score"))

    while (true) {
        val records : ConsumerRecords<String, String> = consumer.poll(Duration.ofSeconds(1))
        for (recode: ConsumerRecord<String, String> in records) {
            println(recode.value())
        }
    }
}

  -  카프카 클라이언트 의존성을 추가하고 간단한 설정으로 토픽의 메세지를 구독하는 컨슈머를 만들수 있다

  -  반드시 지정해야하는 속성은 bootstrap.servers, key.deserializer, value.deserializer 3개 이다.

      => deserializer는 바이트 배열을 자바 객체로 변환하는 클래스를 지정한다.

  -  반드시 지정해야하는 것은 아니지만 일반적으로 사용되는 속성으로 group.id는 컨슈머그룹 id 를 지정하는 설정이다. 

      => 어떤 컨슈머그룹에도 속하지 않는 컨슈머를 생성하는 것도 가능하지만 일반적이지 않다. 

  -  카프카API를 통해서 간단하게 토픽을 구독할 수 있는데 정규식으로도 가능하다. 토픽이 많은 경우에는 클라이언트에서 필터링해야한다.

      => 정규식으로 지정하는 경우에는 전체토픽과 파티션에 대한 정보를 브로커에 일정한 간격으로 요청해 오버헤드가 발생할수 있기 때문이다.  

  -  설정하고 실행해보면 카프카서버에 위에서 설정한 컨슈머그룹ID가 표시되는것을 볼 수 있다.

  -  메세지를 생성해서 테스트 해보면 발행한 메세지를 작성한 어플리케이션에서 소비하는 것을 볼 수 있다.

 

 b. 폴링

  -  컨슈머는 카프카를 계속해서 폴링하지 않으면 죽은 것으로 간주되어 이 컨슈머가 읽던 파티션들을 다른 그룹 내의 컨슈머에게 넘긴다.

      => max.poll.interval.ms 에 지정된 시간 설정으로 관리된다. 

  -  poll() 메서드에 전달하는 타임아웃 시간은 컨슈머 버퍼에 데이터가 없을 경우 poll()이 block 될 수 있는 최대 시간을 결정한다.

      => 0으로 지정되거나 버퍼 안에 레코드가 이미 있는 경우 poll()은 즉시 리턴 된다.

 

 c. 스레드 안정성

  -  하나의 스레드에 동일한 그룹 내의 여러 개의 컨슈머는 생성할 수 없고 같은 컨슈머를 다수의 스레드가 안전하게 사용할 수 도 없다.

      => 하나의 스레드당 하나의 컨슈머가 원칙이다.! 

  -  동일한 그룹에 속하는 여러 개의 컨슈머를 운용하고 싶다면 스레드를 여러개 생성해 각각의 컨슈머를 하나씩 돌리는 수 밖에 없다. 

  -  또 다른 방법으로는 이벤트를 받아서 큐에 넣는 컨슈머 하나와 이 큐에서 이벤트를 꺼내서 처리하는 여러 개의 워커 스레드를 사용하는 방법이 있다. 

 

다. 다양한 컨슈머 설정들 

  -  대부분의 매개변수는 합리적인 기본값을 가지고 있기 떄문에 딱히 변경할 필요는 없다고 한다. 물론 도메인이나 상황에 따라서 적절하게 판단해야할것 같다.

 

 a. fetch.min.bytes , fetch.max.bytes , fetch.max.wait.ms 

  -  fetch.min.bytes는 컨슈머가 브로커로부터 레코드를 얻어올때 받는 데이터의 최소량을 지정한다. 기본값은 1byte 이다. 

  -  fetch.max.bytes는 카프카가 리턴하는 최대 바이트 수를 지정한다. 기본값은 50MB 이다.

  -  fetch.max.wait.ms 는 카프카가 컨슈머에게 응답하기 전 충분한 데이터가 모일때까지 시간을 지정한다. 기본값은 500ms 이다.

      => fetch.min.bytes 두 조건중 하나가 만족되는 대로 리턴한다. 

 

 b. sesstion.timeout.ms, heartbeat.interval.ms, max.poll.interval.ms

  -  sesstion.timeout.ms 는 컨슈머와 브로커가 신호를 주고받지 않고도 살아있는 것으로 판정되는 최대 시간이다. 기본값은 45s 이다.

      => 기본값이 10초 -> 45초로 변경되었는데 이는 온프레미스 기준이었던것이 요즘은 클라우드 환경많이 사용하여 그에 대한 비용이 반영된것이다. 

  -  heartbeat.interval.ms는 컨슈머가 브로커에 신호를 보내는 주기 값이다. 기본값은 3s 이다.

  -  max.poll.interval.ms는 컨슈머가 폴링하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간이다. 기본값은 5m 이다.

      => 하트비트는 백그라운드 스레드에서 돌아가기 때문에 하트비트는 정상이지만 처리하는 스레드에 문제가 생길 경우가 있기 때문에 필요하다.

 

 d. enable.auto.commit, auto.commit.interval.ms, auto.offset.reset

  -  enable.auto.commit은 컨슈머가 자동으로 오프셋을 커밋할지 여부를 결정한다. 기본값은 true 이다.

       => true일때 auto.commit.interval.ms가 마지막으로 처리한 오프셋을 주기적으로 커밋하는 설정이다. 기본값은 5s이다.

  -  auto.offset.reset은 컨슈머가 오프셋을 커밋한적이 없거나 커밋된 오프셋이 유효하지 않을때의 작동을 정의한다. 기본값은 lastest 이다.

      => 유효하지 않은 오프셋을 만났을때 lastest는 가장 최신값을 읽고 earliest는 파티션 맨 처음부터 읽고 none은 예외를 발생시킨다.

 

 e. partition.assignment.stragy

  -  partition.assignment.stragy는 어느 컨슈머에게 어느 파티션이 할당될지에 대한 전략을 결정한다. 기본값은 Range 이다.

      1. Range는 각 토픽의 파티션들을 연속된 그룹으로 나눠서 할당한다.

          => consumer1 : partition0, partion1 / consumer2 : partition2, partion3

      2. RoundRobin은 모든 파티션을 순차적으로 하나씩 컨슈머에게 할당한다

         => consumer1 : partition0, partion2 / consumer2 : partition1, partion3

      3. Sticky은 가능한 균등하게 할당하면서 리밸런싱이 발생할때 파티션 재배치를 최소화 하는 전략으로 할당한다.

         => Cooperative Sticky 는 기본적으로 동일하지만 Cooperative 리밸런싱 기능을 지원한다.

 

 f. client.id , client.rack 

  -  client.id는 브로커가 요청을 보낸 클라이언트(프로듀서 or 컨슈머)를 식별하는데 쓰인다. 로깅 모니터링,지표, 쿼터에도 사용된다.

      => 여기서 쿼터는 카프카에서 사용되는 클라이언트가 사용할수 있는 리소스를 제한을 설정하는데 쓰인다. 

  -  client.rack은 클라이언트가 속한 랙이나 가용영역을 지정하는 설정이다.

      =>  기본적으로 컨슈머는 파티션의 리더 레플리카로부터 읽어오는데 같은 영역에 있는 레플리카로부터 메세지를 읽는게 성능,비용에서 더 유리하다.

 

 g. group.instance.id , offsets.retention.minutes

  -  group.instance.id 는 컨슈머에 정적그룹멤버십 기능을 적용하기 위해 사용되는 설정이다.

  -  offsets.retention.minutes 는 브로커 설정으로 오프셋정보를 얼마동안 유지하는지에 사용되는 설정이다. 

      => 더이상 그룹에 컨슈머가 없거나 더이상 토픽을 읽지 않는 경우

 

라. 기타

 a. 오프셋과 커밋(Offset, Commit)

  -  카프카의 고유 특성 중 하나는 컨슈머가 카프카를 사용해서 각 파티션에서의 위치를 추적할수 있는 부분이다.

  -  카프카에서는 파티션에서의 현재 위치를 업데이트 하는 작업을 오프셋 커밋이라고 부르며 카프카는 레코드를 개별적으로 커밋하지 않는다.

       => 컨슈머는 파티션에서 성공적으로 처리한 마지막 메세지를 커밋함으로써 그 앞의 모든 메세지들 역시 성공적으로 처리된것을 암묵적으로 나타낸다.

  -  오프셋 커밋 방법은 카프카에 특수 토픽인 __comsumer_offsets 토픽에 각 파티션별로 커밋된 오프셋을 업데이트 하도록 메세지를 보내서 이루어진다.

  -  만약 커밋된 오프셋이 처리한 마지막 메세지의 오프셋보다 작을 경우 마지막 처리된 오프셋과 커밋된 오프셋 사이의 메세지들은 두번 처리된다.

      => 실제로 업무하면서 카프카 관련 모듈을 재배포 할때 중복이 발생하는 이유였다.

  -  반대로 커밋된 오프셋이 마지막 처리된 오프셋 보다 클 경우 사이에 있는 모든 메세지들은 컨슈머 그룹에서 누락 된다!

  -  오프셋 커밋을 수동으로 관리하는 경우 commitSync()가 가장 간단하고 실뢰성 있는 커밋 API 이다.

      => 모든 레코드가 처리되기 전에 호출하는 경우 장애가 발생했을때 처리되지 않은 메세지들이 누락될수 있는 위험이 있다.

  -  수동으로 관리 단점 중 하나는 브로커가 커밋 요청에 응답할때까지 어플리케이션이 블록 된다는 점이 있다.

      => commitAsync()를 사용해 비동기로 처리할 수 있다. 응답을 받지 않기 때문에 재시도는 하지 않는다. (콜백 존재)

  -  commitAsync()과 commitSync() 를 같이 사용하는 방법도 있다. (개별 루프에서는 Async, 루프 종료에서는 Sync) 

      

 b. 리밸런스 리스너 (Rebalance Listener)

  -  컨슈머는 종료하기 전이나 리밸런싱이 시작되기전에 정리하는 작업이 필요하다. 예를들어 마지막 처리한 이벤트의 오프셋 커밋 등

  -  컨슈머 API는 컨슈머에 파티션이 할당되거나 해제될 때 사용자의 코드가 실행되도록하는 메커니즘을 리스너 설정을 통해 제공한다.

 

 c. 디시리얼라이저(Deserializer) 

  -  컨슈머는 카프카로부터 받을 바이트배열을 자바객체로 변환하기 위해 디시리얼라이저를 필요로 한다.

  -  같은 타입의 시리얼라이저와 디시리얼라이저를 묶어놓은 것을 제공해주는 클래스가 Serdes 이고 이것을 활용하면 좋을것 같다. 

//props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = Serdes.String().serializer()
//props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = Serdes.String().serializer()

 

반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기