카프카 스트림즈(Kafka Streams)란?
- Kafka 에서 공식적으로 제공하는 Kafka 클라이언트 자바 라이브러리 (JVM 기반의 언어(자바/스칼라/코틀린 등) 중 하나를 선택해서 개발 가능)
- 토픽(topic)에 있는 데이터를 낮은 지연과 빠른 속도로 처리
- 토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리
Kafka Consumer 와 차이가 잘 안느껴질수 있지만, 그림으로 보자면 아래와 같이 차이가 있다.
Kafka Consumer | Kafka Streams |
![]() |
![]() |
카프카 스트림즈(Kafka Streams) 장점은?
- 카프카와 완벽한 호환
- Kafka에서 공식적으로 제공되는 라이브러리임에 따라 로그스태시, 스파크와 같은 오픈소스툴과 다르게 매 Kafka버전에 맞춰서 호환을 제공
- 딱 1번만(Exactly-once) 처리 보장
- 일반적으로 유실&중복처리를 보장하기 위해선 중복 방지 처리가 같이 구현되어하고 이는 서비스 로직에 집중하지 못하고 로직이 복잡해진다는 문제점 발생
- 스케쥴링 도구가 별도로 필요없다.
- 일반적으로 Kafka와 연동하여 마이크로 배치처리를 하는 이벤트 데이터 애플리케이션을 만들기위해 스파크 스트리밍을 주로 사용하지만, 이를 위해 클러스터 관리자, 리소스 관리자가 필요하고 또한 서버들도 필요합니다. 하지만 Kafka Streams는 다른 것 필요없이 스트림즈 어플리케이션만 가지고 사용할 수 있습니다.
- (아직 까진 정확히 와닿지는 않는다..)
- Streams DSL(Domain Specific Language), Processor API 제공
- 이벤트 기반 데이터 처리에 필요한 기능들을 제공하기 때문에 스트림즈를 구현하기 편하다. (쉽게 말하자면 우리가 주로 사용하려는 기능들이 이미 만들어져있다.)
- Streams DSL
- 미리 제공되는 함수들을 이용하여 토폴로지를 정의하는 방식
- 대부분의 변환 로직을 어렵지 않게 개발할 수 있도록 스트림 프로세싱에 쓰일만한 다양한 기능들을 자체 API로 제공
- 이벤트 기반 데이터 처리를 할 때 필요한 다양한 기능들(map, join, window 등)을 대부분 제공
- Stream DSL은 Processor API보다 비교적 추상적이며 사용하기 쉬움
- Stream DSL만의 독특한 스트림 처리 개념 3가지가 있음
- 요약 : 카프카를 스트림 데이터 처리 뿐만 아니라 대규모 key-value 저장소로도 사용할 수 있는 기능 제공
레코드 흐름의 추상화 개념 3가지 (KStream, KTable, GlobalKTable)
- 요약 : 카프카를 스트림 데이터 처리 뿐만 아니라 대규모 key-value 저장소로도 사용할 수 있는 기능 제공
- Processor API
- Streams DSL에 없는 기능이 있다면 프로세서 API 사용
- Processor API는 Streams DSL보다 복잡한 코드를 가지지만, 데이터 처리를 토폴로지를 기반으로 수행한다는 면에서 같은 역할
- Processor API를 사용할 때는 정의된 함수를 사용하는 것이 아니라 직접 구현해야 하기 때문에 사용하기 어려움
- KStream, KTable, GlobalKTable 개념이 없음
- Stream DSL보다 더 정교한 로직을 구현할 수 있는 장점이 있음
- 자체 로컬 상태 저장소 사용
- 상태 기반 처리를 위해 RocksDB를 로컬에서 사용하여 상태를 저장
- 로컬 DB에 저장한 상태에 대한 변환 정보는 카프카 변경로그에 저장
- 이를 통해 프로세스에 장애가 발생하더라도 상태가 모두 안전하게 저장되기 때문에 장애 복구 가능
카프카 스트림즈(Kafka Streams) 구조
카프카 스트림즈의 구조는 토폴로지로 구성이 된다.
토폴리지란 2개 이상의 노드들과 선으로 이루어진 집합을 뜻함.![]() 토폴로지에 카프카 스트림즈의 개념을 접목시키면, 노드는 프로세서(Processor), 선은 스트림(Stream)을 의미합니다. 즉, 데이터를 처리하는 프로세서(Processor)가 노드이고 다음 노드로 넘어가는 데이터(Stream)를 선이라고 보시면 될 것 같습니다. |
따라서, 카프카 스트림즈의 구조를 도식화 해보면 아래와 같이 표현 할 수 있다.

큰 흐름으로 봤을때 아래와 같은 흐름으로 구성된다.
- Source Processor가 데이터를 가져오면
- Stream Processor가 데이터에 대한 처리를 진행하고
- 처리된 데이터는 Sink Processor가 토픽에 저장
|
카프카 스트림즈(Kafka Streams) 구현 방법
카프카 스트림즈는 위에서 언급한 Streams DSL, Processor API 로 구현할 수 있다.
1. 스트림즈 DSL (Streams DSL)
레코드의 흐름을 추상화한 3가지 개념인 Kstream, Ktable, GlobalKTable 등이 있다. (스트림즈 DSL 한정)

- KStream
- KStream으로 데이터를 조회하면 토픽에 존재하는 모든 데이터가 출력.
- 토픽에 존재하는 데이터(레코드)의 key가 동일해도 key의 중복을 허용하며 데이터를 모두 가져옴.
- 일반적인 consumer 와 비슷한 느낌
- KTable
- 주어진 메세지 키에 대한 최신 값을 보유한다. 즉, 동일한 메세지 키가 들어오면 제일 최신값으로 대체.
- GlobalKTable
- KTable 과 동일하게 메시지 키를 기준으로 묶어서 사용.
- 그러나 KTable로 선언된 토픽은 1개 파티션 -> 1개 태스크에 할당
- GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용되는 차이점이 존재.
- KStream-KTable 조인 시 코파티셔닝이 되어 있지 않은 경우 Global KTable을 이용하면 해결 가능 → 이건 밑에서 좀더 설명
2. 프로세서 DSL (Processor DSL)
앞서 사용하였던 스트림즈DSL에 존재했던 데이터 흐름 추상화 개념인 KStream, KTable, GlobalKTable을 사용하지 않음.
스트림즈 DSL에서 제공하는 메서드외에 추가적인 작업이 필요할 때 활용 (많이 쓰일경우는 없는것으로 보임, 웬만하면 스트림DSL로 구현이 가능)
카프카 스트림즈(Kafka Streams) 구현 예제
KStream 예제
public class Main {
static final String KAFKA_BROKER_IP = "127.0.0.1:9092";
public static void main(String[] args) {
topic1Stream();
topic2Stream();
producerTest();
}
private static Properties getKafkaStreamsProperties(String appId) {
// streams config
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
//메시지의 키, 값의 직렬화/역직렬화를 위한 설정
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private static void topic1Stream() {
//스트림 토폴로지 정의
//StreamsBuilder를 build 하면 Topology 반환
StreamsBuilder builder = new StreamsBuilder();
//Source Processor
KStream<String, String> stream = builder.stream("streamTopic");
stream
//Stream Processor
.mapValues(String::trim)
.filter((String v1, String v2) -> (v1 + v2).length() > 5)
.mapValues((String v1) -> v1 + " 토픽1 처리완료 ")
//Sink Processor
.to("streamTopic2");
stream.foreach((key, value) -> System.out.println("토픽1 출력: " + key + " - " + value)); // 출력문 추가
//스트림 생성
KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("SecondId"));
streams.start();
}
private static void topic2Stream() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("streamTopic2");
stream
.filter((key, value) -> value.contains("3"))
.mapValues((String v1) -> v1 + " 토픽2 처리완료 ")
.to("streamTopic3");
stream.foreach((key, value) -> System.out.println("토픽2 출력: " + key + " - " + value)); // 출력문 추가
KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("LastId"));
streams.start();
}
private static void producerTest() {
//1. Properties 만들기
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
kafkaProducerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.ACKS_CONFIG, "-1");
// 2. Producer 생성
Producer<String, String> producer = new KafkaProducer<>(kafkaProducerProperties);
// 3. 전송
for (int index = 0; index < 3; index++) {
// 해당 토픽은 stream1에 이어지도록 설정
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("streamTopic", "stream", "stream-test-text : " + index);
System.out.println("Produced message: " + producerRecord.value()); // 출력문 추가
producer.send(producerRecord);
}
// 4. 닫기
producer.close();
}
}
오후 4:10:00: Executing ':Main.main()'...
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :Main.main()
Produced message: stream-test-text : 0
Produced message: stream-test-text : 1
Produced message: stream-test-text : 2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
토픽1 출력: stream - stream-test-text : 0
토픽1 출력: stream - stream-test-text : 1
토픽1 출력: stream - stream-test-text : 2
토픽2 출력: stream - stream-test-text : 0 토픽1 처리완료
토픽2 출력: stream - stream-test-text : 1 토픽1 처리완료
토픽2 출력: stream - stream-test-text : 2 토픽1 처리완료
KTable 예제
public class Main {
static final String KAFKA_BROKER_IP = "127.0.0.1:9092";
public static void main(String[] args) {
countStream();
producerTest();
}
private static Properties getKafkaStreamsProperties(String appId) {
// streams config
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
//메시지의 키, 값의 직렬화/역직렬화를 위한 설정
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private static void countStream() {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("plaintext-input", Consumed.with(stringSerde, stringSerde));
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count();
wordCounts.toStream().to("word-count-output", Produced.with(stringSerde, longSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("LastId3"));
streams.start();
}
private static void producerTest() {
//1. Properties 만들기
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
kafkaProducerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.ACKS_CONFIG, "-1");
// 2. Producer 생성
Producer<String, String> producer = new KafkaProducer<>(kafkaProducerProperties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("plaintext-input", "stream2", "Hello World");
ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("plaintext-input", "stream2", "Hello NEXON");
ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("plaintext-input", "stream2", "Hello BYE");
ProducerRecord<String, String> producerRecord4 = new ProducerRecord<>("plaintext-input", "stream2", "HI NEXON");
System.out.println("Produced message: " + producerRecord.value()); // 출력문 추가
System.out.println("Produced2 message: " + producerRecord2.value()); // 출력문 추가
System.out.println("Produced3 message: " + producerRecord3.value()); // 출력문 추가
System.out.println("Produced4 message: " + producerRecord4.value()); // 출력문 추가
producer.send(producerRecord);
producer.send(producerRecord2);
producer.send(producerRecord3);
producer.send(producerRecord4);
// 4. 닫기
producer.close();
}
}
#실행 결과 Consumer 조회값
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-consumer.bat --topic word-count-output --bootstrap-server 127.0.0.1:9092 --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
world 1
hello 3
bye 1
hi 1
nexon 2
카프카 스트림즈(Kafka Streams) 조인(join)
대부분의 DB는 정적으로 저장된 데이터를 조인하지만 카프카 스트림즈에서는 실시간으로 들어오는 데이터를 조인할 수 있다.
예를 들어, 이름을 메시지 키, 주소를 메시지 값으로 갖고 있는 KTable과 이름을 메시지 키, 주문한 물품을 메시지 값으로 가지고 있는 KStream이 존재한다고 하면 사용자가 물품을 주문하면 이미 토픽에 저장된 이름:주소 형식의 KTable과 조인하여 물품과 주소가 결합된 데이터를 새로 생성할 수 있다.
즉, 사용자의 이벤트 데이터를 데이터베이스에 저장하지 않고도 조인하여 스트리밍 처리할 수 있다.
이를 통해 이벤트 기반 스트리밍 데이터 파이프라인을 구성할 수 있다.
- 코파티셔닝(co-partitioning) 되어 있는 토픽 join
KTable과 KStream로 join 테스트
토픽 생성
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic address
Created topic address.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order
Created topic order.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order_join
Created topic order_join.
예제코드
public class Main {
private static String APPLICATION_NAME = "order-join-application";
private static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
private static String ADDRESS_TABLE = "address";
private static String ORDER_STREAM = "order";
private static String ORDER_JOIN_STREAM = "order_join";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// KTable과 KStream 인스턴스 생성
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressTable, // (1)
(order, address) -> order + " send to " + address) // (2)
.to(ORDER_JOIN_STREAM); // (3)
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
데이터 produce
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic address --property "parse.key=true" --property "key.separator=:"
>backtony:seoul
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic order --property "parse.key=true" --property "key.separator=:"
>backtony:iphone
데이터 consume
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic order_join --property print.key=true --property key.separator=":" --from-beginning
backtony:iphone send to seoul
- 코파티셔닝(co-partitioning) 되어 있지 않은 토픽 join
코파티셔닝되지 않은 데이터를 조인하는 방법은 두 가지가 있다.
- 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리 → 이 경우 복잡해짐
- KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용
- GlobalKTable는 데이터가 스트림즈 어플리케이션의 모든 태스크에 동일하게 공유되어있기 때문
- GlobalKTable로 정의된 모든 데이터를 저장하고 사용하는 것은 브로커에 많은 부하를 유발하므로 작은 용량의 데이터일 경우에만 사용하는 것을 권장
- 많은 양의 데이터를 가진 토픽을 조인하는 경우 리파티셔닝을 통해 KTable을 사용하는 것을 권장.
GlobalKTable과 KStream로 join 테스트
토픽 생성 (파티션 크기를 다르게 생성하는 것이 포인트)
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 2 --topic address2
Created topic address.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order
Created topic order.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order_join
Created topic order_join.
예제 코드
public class Main {
private static String APPLICATION_NAME = "global-table-join-application";
private static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
private static String ADDRESS_GLOBAL_TABLE = "address2";
private static String ORDER_STREAM = "order";
private static String ORDER_JOIN_STREAM = "order_join";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// GlobalKTable 인스턴스 생성
GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressGlobalTable, // (1)
(orderKey, orderValue) -> orderKey, // (2)
(order, address) -> order + " send to " + address) // (3)
.to(ORDER_JOIN_STREAM); // (4)
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
데이터 produce
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic address2 --property "parse.key=true" --property "key.separator=:"
>kim:gangnam
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic order --property "parse.key=true" --property "key.separator=:"
>kim:card
데이터 consume
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic order_join --property print.key=true --property key.separator=":" --from-beginning
kim:card send to gangnam
참고
https://kafka.apache.org/26/documentation/streams/core-concepts
https://marrrang.tistory.com/63
https://2zzangho.tistory.com/88
https://unit-15.tistory.com/146
https://www.devkuma.com/docs/apache-kafka/strems/
https://findmypiece.tistory.com/342
https://gunju-ko.github.io/kafka/kafka-stream/2018/05/28/Stream-DSL.html
https://developer.confluent.io/courses/kafka-streams/ktable/
https://journal-home.s3.ap-northeast-2.amazonaws.com/site/2023w/abs/0640-NUKEC.pdf