<Kafka> 4. 카프카 프로듀서 (Producer)
by BFine출처&참고 : https://product.kyobobook.co.kr/detail/S000201464167
가. 프로듀서(Producer) 알아보기
a. 언제 쓸까?
- 사용자의 행동 기록, 성능 메트릭 기록, 로그 메세지 저장, 스마트 가전에서 정보 수집, 다른 어플리케이션과 비동기적 통신, DB 저장하기전 버퍼링 등이 있다.
=> 유실이 용납되지 않는지, 허용되는지, 중복이 허용되도 상관없는지, 반드시 지켜야할 지연or처리율이 있는지 등 요구조건도 다양하다.
b. 카프카에 데이터를 전송할 때 수행되는 주요 단계
1. 발생처에서 메세지를 발행
=> ProducerRecord 객체를 생성 / 토픽과 value 지정은 필수, 키와 파티션 지정은 선택
2. 카프카 API 내의 시리얼라이저가 네트워크 상에 전송될 수 있도록 직렬화
3. 파티션을 명시적으로 지정하지 않은 경우 파티셔너가 파티션을 결정
4. 메세지가 전송 될 토픽과 파티션이 확정되면 레코드를 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치에 추가(버퍼)
5. 별도 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송한다. 성공적으로 저장되었을 경우 RecordMetaData 객체를 수신
c. 메세지 전송 방식
- Fire and Forget : 메세지를 서버에 전송만 하고 성공 혹은 실패여부는 신경쓰지 않는 방식
- Sync send : 카프카 프로듀서는 언제나 비동기적으로 작동하는데 다음 메세지를 전송하기 전에 작업이 완료되길 기다리고 성공여부를 확인하는 방식
- Async send : 메세지를 전송하고 응답받는 시점에 지정한 콜백 함수를 실행하는 방식
나. 프로듀서 예제 만들기
a. 기본예제
implementation("org.apache.kafka:kafka-clients:3.8.0")
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.serialization.Serdes
import java.util.*
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
fun main() {
val props = Properties()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = Serdes.String().serializer().javaClass.name
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = Serdes.String().serializer().javaClass.name
val producer = KafkaProducer<String, String>(props)
val record1 = ProducerRecord<String, String>("score", "Hello")
val record2 = ProducerRecord<String, String>("score", "World")
// Sync
val result1 : Future<RecordMetadata> = producer.send(record1)
val result2 : Future<RecordMetadata> = producer.send(record2)
println("[sync] ${result1.get()}")
println("[sync] ${result2.get()}")
// Async
producer.send(record1) { metadata, _ -> println("[async] $metadata") }
producer.send(record2) { metadata, _ -> println("[async] $metadata") }
TimeUnit.SECONDS.sleep(1)
}
// 결과
[sync] score-0@11
[sync] score-0@12
[async] score-0@13
[async] score-0@14
- 동기식으로 전송하는 경우 Future<RecordMetedata> 객체를 반환한다. (get할때 블로킹이 발생하기 때문에 주의가 필요하다)
- 비동기식 같은 경우 Callback를 지정하여 사용할 수 있다. 여기서 Callback 인터페이스는 카프카에서 제공하는 것이 사용된다.
다. 다양한 프로듀서 설정들
- 대부분의 매개변수는 합리적인 기본값을 가지고 있기 떄문에 딱히 변경할 필요는 없다고 한다. 물론 도메인이나 상황에 따라서 적절하게 판단해야할것 같다.
a. client.id, acks
- client.id : 카프카 브로커가 프로듀서가 보낸 메세지를 서로 구분하기 위한 값
- acks : 쓰기작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야하는지 결정한다.
1. acks=0일때 : 성공했다고 간주하고 브로커 응답을 기다리지 않음
2. acks=1일때 : 리더 레플리카가 메세지를 받는 순간 브로커로부터 성공했다는 응답을 받음
3. acks=all 일때 : 메세지가 모든 인-싱크 레플리카에 전달된 뒤에야 브로커로부터 성공했다는 응답을 받음
#. 컨슈머가 읽을때까지의 지연시간은 세값이 모두 똑같다. ( 카프카는 일관성 유지 때문에 인-싱크 레플리카까지 완료 이후 컨슈머가 읽기 가능하다.)
b. max.block.ms, delivery.timeout.ms , request.timeout.ms, linger.ms
- max.block.ms : 데이터 전송이 지연되거나 블로킹될수 있는 최대시간 설정이다. 기본값은 1분
=> 버퍼가 가득차면 새 메세지가 버퍼에 공간이 생길 때까지 대기해야한다.
- delivery.timeout.ms : 레코드가 배치에 저장된 시점부터 브로커에게 전달되기 까지의 제한시간을 결정한다. 기본값은 2분
- request.timeout.ms : 프로듀서가 데이터를 전송할때 서버로부터 응답을 받는데까지 걸리는 제한시간을 결정한다. 기본값은 30초
- linger.ms : 배치를 전송하기 전까지 대기하는 시간을 결정한다. 기본값은 0ms ㅇ다.
=> 배치가 가득 차거나 제한시간이 되었을 때 메세지 배치를 전송한다.
c. retries, retry.backoff.ms
- retries : 프로듀서가 메세지 전송을 포기하고 에러를 발생시킬때까지 메세지를 재전송하는 횟수이다. 기본값은 0
- retry.backoff.ms : 메세지 전송 실패시 재시도 간의 대기 시간을 설정한다. 기본값은 100ms
d. buffer.memory, batch.size, compression.type
- buffer.memory : 프로듀서가 메세지를 전송하기 전에 메세지를 대기시키는 버퍼의 크기를 결정한다. 기본값 32MB
- batch.size : 같은 파티션에 다수의 레코드가 전송될 경우 배치단위로 모아서 한꺼번에 전송한다. 기본값 16KB
- compression.type : 메세지를 전송할때 사용하는 압축 알고리즘 유형을 설정한다. 기본값은 NONE
=> snappy 알고리즘은 CPU 부하가 작으면서도 성능이 좋다. gzip은 CPU&시간을 더 쓰지만 압축율이 더좋다.
e. max.in.flight.requests.per.connection
- max.in.flight.requests.per.connection : 서버로 부터 응답받지 못한 상태에서 전송할수 있는 최대 메세지 수를 결정한다. 기본값은 5
=> 값을 올리면 메모리 사용량이 증가하지만 처리량도 증가한다.
f. enable.idempotence
- enalde.idempotence : 메세지 전송의 일관성과 실뢰성 보장하기 위해 사용하는 옵션 기본값 false
=> 이 기능이 활성화 되면 프로듀서는 레코드를 보낼때마다 순차적인 번호를 붙여서 보내며 브로커는 번호 중복이 있어도 하나만 저장한다.
다. 그 외
a. 아파치 에이브로(Avro)
- 만약 특정필드의 타입을 변경하는 경우(ex. int -> long) 기존 형식과 새형식 사이의 직렬화 호환성을 유지해야하는 경우가 발생할 것이다.
=> 이런 문제점을 해결하기 위해 커스텀 시리얼라이저를 사용하는데 다양한 범용화 직렬화 라이브러리가 있다. (Avro, Thrift, Protobuf 등)
- 에이브로는 언어 중립적인 데이터 직렬화 형식을 사용하며 직렬화 할때 에이프로 파일 자체에 스키마를 내장하는 방법을 쓴다.
=> 새로운 스키마로 전환 하더라도 기존 스키마와 호환성을 유지한다.
b. 스키마 레지스트리(Schema Registry)
- 스키마 레지스트리는 아파치 카프카의 일부가 아니고 일반적으로 여러 오픈소스 구현체 중 하나를 사용한다.
- 카프카에 데이터를 쓰기 위해 사용되는 모든 스키마를 레지스트리에 저장한다
=> 카프카 브로커에 저장되는 레코드에는 스키마 ID만을 가진다. (고유 식별자정보)
- 컨슈머는 위의 식별자 정보를 스키마 레지스트리에서 가져와서 데이터를 역직렬화 할 수 있다.
=> 위의 작업은 모두 시리얼라이저,디시리얼라이저에서 이루어진다.
c. 파티션(Partition) 결정하기
- 레코드안에 Key의 역할은 두가지로 추가적인 정보 역할과 하나의 토픽에 속한 파티션 중 저장될 파티션을 결정 짓는 기준역할을 한다.
- 기본 파티셔너는 키값이 null 인 경우 sticky 처리를 하기 위해 라운드 로빈 알고리즘을 사용한다.
=> sticky 처리는 같은 파티션에 저장하는 레코드들을 접착시켜서 한번에 배치처리 하는것을 의미한다
- 저장되는 파티션이 변경되어서는 안되는 경우 가장 쉬운 방법은 파티션을 크게 잡아서 더 이상 파티션을 추가하지 않는 방법이 있다.
- 항상 Key값 해시를 통해 파티션을 결정할 필요는 없다. 커스텀 파티셔서를 구현해서 다른 방법으로도 파티션을 결정해줄 수 있다.
d. 헤더, 인터셉터, 쿼터&스로틀링
- 레코드에 헤더 설정도 해줄 수 있는데 키/밸류값을 건드리지 않고 추가 메타데이터를 심을 때 사용한다.
=> 주된 용도는 메세지의 전달 내역을 기록하여 이를 이용해 라우팅하거나 출처를 추적하는 역할로 사용한다.
- 인터셉터 설정도 가능한데 일반적인 사례로 모니터링, 정보 추적, 표준헤더 삽입 등 용도로 사용한다.
- 카프카 브로커는 쓰기/읽기 속도를 제한 할 수 있는 쿼터 기능이 있다. (client.id를 이용해 특정사용자만 제한도 가능)
=> 쓰기쿼터/읽기쿼터/요청쿼터 3가지 쿼터타입에 대해 설정이 가능하다.
- 카프카 브로커는 클라이언트가 할당량을 다 채운 경우 요청에 대한 스로틀링을 시작하여 할당량을 초과하지 않도록 한다.
'공부 > Kafka' 카테고리의 다른 글
<Kafka> 5. 카프카 내부 메커니즘 살펴보기 (3) | 2024.11.03 |
---|---|
<Kafka> 3. 카프카 컨슈머 (Consumer) (0) | 2024.10.20 |
<Kafka> 2. 카프카 설치 및 예제 만들기 (3) | 2024.10.09 |
<Kafka> 1. 카프카 기초 알아보기 (4) | 2024.10.06 |
블로그의 정보
57개월 BackEnd
BFine