본문 바로가기

게으른개발자

카프카 스트림즈(Kafka Streams) 설명과 간단 예제

카프카 스트림즈(Kafka Streams)란?

  • Kafka 에서 공식적으로 제공하는 Kafka 클라이언트 자바 라이브러리  (JVM 기반의 언어(자바/스칼라/코틀린 등) 중 하나를 선택해서 개발 가능)
  • 토픽(topic)에 있는 데이터를 낮은 지연빠른 속도로 처리
  • 토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리

 

Kafka Consumer 와 차이가 잘 안느껴질수 있지만, 그림으로 보자면 아래와 같이 차이가 있다.

Kafka Consumer Kafka Streams

 

 

카프카 스트림즈(Kafka Streams) 장점은?

  1. 카프카와 완벽한 호환
    • Kafka에서 공식적으로 제공되는 라이브러리임에 따라 로그스태시, 스파크와 같은 오픈소스툴과 다르게 매 Kafka버전에 맞춰서 호환을 제공
  2. 딱 1번만(Exactly-once)  처리 보장
    • 일반적으로 유실&중복처리를 보장하기 위해선 중복 방지 처리가 같이 구현되어하고 이는 서비스 로직에 집중하지 못하고  로직이 복잡해진다는 문제점 발생
  3. 스케쥴링 도구가 별도로 필요없다.
    • 일반적으로 Kafka와 연동하여 마이크로 배치처리를 하는 이벤트 데이터 애플리케이션을 만들기위해 스파크 스트리밍을 주로 사용하지만, 이를 위해 클러스터 관리자, 리소스 관리자가 필요하고 또한 서버들도 필요합니다. 하지만 Kafka Streams는 다른 것 필요없이 스트림즈 어플리케이션만 가지고 사용할 수 있습니다.
    • (아직 까진 정확히 와닿지는 않는다..)  
  4. Streams DSL(Domain Specific Language), Processor API 제공
    • 이벤트 기반 데이터 처리에 필요한 기능들을 제공하기 때문에 스트림즈를 구현하기 편하다.  (쉽게 말하자면 우리가 주로 사용하려는 기능들이 이미 만들어져있다.)
    • Streams DSL
      • 미리 제공되는 함수들을 이용하여 토폴로지를 정의하는 방식
      • 대부분의 변환 로직을 어렵지 않게 개발할 수 있도록 스트림 프로세싱에 쓰일만한 다양한 기능들을 자체 API로 제공
        • 이벤트 기반 데이터 처리를 할 때 필요한 다양한 기능들(map, join, window 등)을 대부분 제공
      • Stream DSL은 Processor API보다 비교적 추상적이며 사용하기 쉬움
      • Stream DSL만의 독특한 스트림 처리 개념 3가지가 있음
        • 요약 : 카프카를 스트림 데이터 처리 뿐만 아니라 대규모 key-value 저장소로도 사용할 수 있는  기능 제공
          레코드 흐름의 추상화 개념 3가지 (KStream, KTable, GlobalKTable)
    • Processor API
      • Streams DSL에 없는 기능이 있다면 프로세서 API 사용
      • Processor API는 Streams DSL보다 복잡한 코드를 가지지만, 데이터 처리를 토폴로지를 기반으로 수행한다는 면에서 같은 역할
      • Processor API를 사용할 때는 정의된 함수를 사용하는 것이 아니라 직접 구현해야 하기 때문에 사용하기 어려움
        • KStream, KTable, GlobalKTable 개념이 없음
      • Stream DSL보다 더 정교한 로직을 구현할 수 있는 장점이 있음
  5. 자체 로컬 상태 저장소 사용
    • 상태 기반 처리를 위해 RocksDB를 로컬에서 사용하여 상태를 저장
    • 로컬 DB에 저장한 상태에 대한 변환 정보는 카프카 변경로그에 저장
    • 이를 통해 프로세스에 장애가 발생하더라도 상태가 모두 안전하게 저장되기 때문에 장애 복구 가능

 

 

카프카 스트림즈(Kafka Streams) 구조

카프카 스트림즈의 구조는 토폴로지로 구성이 된다.

토폴리지란 2개 이상의 노드들과 선으로 이루어진 집합을 뜻함.


토폴로지에 카프카 스트림즈의 개념을 접목시키면, 
노드는 프로세서(Processor), 선은 스트림(Stream)을 의미합니다.
즉, 데이터를 처리하는 프로세서(Processor)가 노드이고 다음 노드로 넘어가는 데이터(Stream)를 선이라고 보시면 될 것 같습니다.

따라서, 카프카 스트림즈의 구조를 도식화 해보면 아래와 같이 표현 할 수 있다.

 

큰 흐름으로 봤을때 아래와 같은 흐름으로 구성된다.

  1. Source Processor가 데이터를 가져오면
  2. Stream Processor가 데이터에 대한 처리를 진행하고
  3. 처리된 데이터는 Sink Processor가 토픽에 저장
  • 소스 프로세서 (Source Processor)
    1. 소스 프로세서는 토폴로지의 시작 노드
    2. 데이터를 처리하기 위해 최초로 선언해야 하는 노드
    3. 카프카와 연결된 프로세서이며, 하나 이상의 토픽에서 데이터를 가져오는 역할
  • 스트림 프로세서 (Stream Processor)
    1. 다른 프로세서(소스 프로세서, 스트림 프로세서)가 반환한 데이터를 처리하는 역할
  • 싱크 프로세서 (Sink Processor)
    1. 토폴로지의 마지막 노드
    2. 데이터 전달을 위해 마지막에 선언해야 하는 노드
    3. 카프카와 연결된 프로세서이며, 데이터를 카프카의 특정 토픽으로 저장하는 역할

 

카프카 스트림즈(Kafka Streams) 구현 방법

카프카 스트림즈는 위에서 언급한 Streams DSL, Processor API 로 구현할 수 있다.

1. 스트림즈 DSL (Streams DSL)

레코드의 흐름을 추상화한 3가지 개념인 Kstream, Ktable, GlobalKTable 등이 있다. (스트림즈 DSL 한정)

  • KStream
    • KStream으로 데이터를 조회하면 토픽에 존재하는 모든 데이터가 출력.
    • 토픽에 존재하는 데이터(레코드)의 key가 동일해도 key의 중복을 허용하며 데이터를 모두 가져옴.
    • 일반적인 consumer 와 비슷한 느낌
  • KTable
    • 주어진 메세지 키에 대한 최신 값을 보유한다. 즉, 동일한 메세지 키가 들어오면 제일 최신값으로 대체.
  • GlobalKTable
    • KTable 과 동일하게 메시지 키를 기준으로 묶어서 사용.
    • 그러나 KTable로 선언된 토픽은 1개 파티션 -> 1개 태스크에 할당
    • GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용되는 차이점이 존재.
    • KStream-KTable 조인 시 코파티셔닝이 되어 있지 않은 경우 Global KTable을 이용하면 해결 가능  → 이건 밑에서 좀더 설명

 

2. 프로세서 DSL (Processor DSL)

앞서 사용하였던 스트림즈DSL에 존재했던 데이터 흐름 추상화 개념인 KStream, KTable, GlobalKTable을 사용하지 않음.
스트림즈 DSL에서 제공하는 메서드외에  추가적인 작업이 필요할 때 활용  (많이 쓰일경우는 없는것으로 보임, 웬만하면 스트림DSL로 구현이 가능)

 

 

 

카프카 스트림즈(Kafka Streams) 구현 예제

 

KStream 예제

public class Main {
    static final String KAFKA_BROKER_IP = "127.0.0.1:9092";

    public static void main(String[] args) {
        topic1Stream();
        topic2Stream();
        producerTest();
    }

    private static Properties getKafkaStreamsProperties(String appId) {
// streams config
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
        //메시지의 키, 값의 직렬화/역직렬화를 위한 설정
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        return props;
    }

    private static void topic1Stream() {

        //스트림 토폴로지 정의
        //StreamsBuilder를 build 하면 Topology 반환
        StreamsBuilder builder = new StreamsBuilder();

        //Source Processor
        KStream<String, String> stream = builder.stream("streamTopic");

        stream
                //Stream Processor
                .mapValues(String::trim)
                .filter((String v1, String v2) -> (v1 + v2).length() > 5)
                .mapValues((String v1) -> v1 + " 토픽1 처리완료 ")
                //Sink Processor
                .to("streamTopic2");

        stream.foreach((key, value) -> System.out.println("토픽1 출력: " + key + " - " + value)); // 출력문 추가

        //스트림 생성
        KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("SecondId"));
        streams.start();
    }

    private static void topic2Stream() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("streamTopic2");
        stream
                .filter((key, value) -> value.contains("3"))
                .mapValues((String v1) -> v1 + " 토픽2 처리완료 ")
                .to("streamTopic3");

        stream.foreach((key, value) -> System.out.println("토픽2 출력: " + key + " - " + value)); // 출력문 추가


        KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("LastId"));

        streams.start();
    }


    private static void producerTest() {
        //1. Properties 만들기
        Properties kafkaProducerProperties = new Properties();
        kafkaProducerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
        kafkaProducerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.put(ProducerConfig.ACKS_CONFIG, "-1");

        // 2. Producer 생성
        Producer<String, String> producer = new KafkaProducer<>(kafkaProducerProperties);

        // 3. 전송
        for (int index = 0; index < 3; index++) {
            // 해당 토픽은 stream1에 이어지도록 설정
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("streamTopic", "stream", "stream-test-text : " + index);

            System.out.println("Produced message: " + producerRecord.value()); // 출력문 추가
            producer.send(producerRecord);
        }

        // 4. 닫기
        producer.close();
    }
}
오후 4:10:00: Executing ':Main.main()'...

> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE

> Task :Main.main()
Produced message: stream-test-text : 0
Produced message: stream-test-text : 1
Produced message: stream-test-text : 2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
토픽1 출력: stream - stream-test-text : 0
토픽1 출력: stream - stream-test-text : 1
토픽1 출력: stream - stream-test-text : 2
토픽2 출력: stream - stream-test-text : 0 토픽1 처리완료 
토픽2 출력: stream - stream-test-text : 1 토픽1 처리완료 
토픽2 출력: stream - stream-test-text : 2 토픽1 처리완료

 

 

KTable 예제

public class Main {
    static final String KAFKA_BROKER_IP = "127.0.0.1:9092";

    public static void main(String[] args) {
        countStream();
        producerTest();
    }


    private static Properties getKafkaStreamsProperties(String appId) {
		// streams config
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
        //메시지의 키, 값의 직렬화/역직렬화를 위한 설정
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        return props;
    }

    private static void countStream() {
        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("plaintext-input", Consumed.with(stringSerde, stringSerde));

        KTable<String, Long> wordCounts = textLines
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .count();
        wordCounts.toStream().to("word-count-output", Produced.with(stringSerde, longSerde));
        KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("LastId3"));
        streams.start();
    }


    private static void producerTest() {
        //1. Properties 만들기
        Properties kafkaProducerProperties = new Properties();
        kafkaProducerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP);
        kafkaProducerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.put(ProducerConfig.ACKS_CONFIG, "-1");

        // 2. Producer 생성
        Producer<String, String> producer = new KafkaProducer<>(kafkaProducerProperties);

        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("plaintext-input", "stream2", "Hello World");
        ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("plaintext-input", "stream2", "Hello NEXON");
        ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("plaintext-input", "stream2", "Hello BYE");
        ProducerRecord<String, String> producerRecord4 = new ProducerRecord<>("plaintext-input", "stream2", "HI NEXON");

        System.out.println("Produced message: " + producerRecord.value()); // 출력문 추가
        System.out.println("Produced2 message: " + producerRecord2.value()); // 출력문 추가
        System.out.println("Produced3 message: " + producerRecord3.value()); // 출력문 추가
        System.out.println("Produced4 message: " + producerRecord4.value()); // 출력문 추가
        producer.send(producerRecord);
        producer.send(producerRecord2);
        producer.send(producerRecord3);
        producer.send(producerRecord4);

        // 4. 닫기
        producer.close();
    }
}
#실행 결과 Consumer 조회값


PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-consumer.bat --topic word-count-output --bootstrap-server 127.0.0.1:9092       --from-beginning --formatter kafka.tools.DefaultMessageFormatter       --property print.key=true --property print.value=true       --property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer      --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
world   1
hello   3
bye     1
hi      1
nexon   2

 

 

 

카프카 스트림즈(Kafka Streams) 조인(join)

대부분의 DB는 정적으로 저장된 데이터를 조인하지만 카프카 스트림즈에서는 실시간으로 들어오는 데이터를 조인할 수 있다.
예를 들어, 이름을 메시지 키, 주소를 메시지 값으로 갖고 있는 KTable과 이름을 메시지 키, 주문한 물품을 메시지 값으로 가지고 있는 KStream이 존재한다고 하면 사용자가 물품을 주문하면 이미 토픽에 저장된 이름:주소 형식의 KTable과 조인하여 물품과 주소가 결합된 데이터를 새로 생성할 수 있다.
즉, 사용자의 이벤트 데이터를 데이터베이스에 저장하지 않고도 조인하여 스트리밍 처리할 수 있다.
이를 통해 이벤트 기반 스트리밍 데이터 파이프라인을 구성할 수 있다.

 

 

- 코파티셔닝(co-partitioning) 되어 있는 토픽 join

 

KTable과 KStream로  join  테스트

토픽 생성

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic address
Created topic address.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order
Created topic order.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order_join
Created topic order_join.

 

예제코드

public class Main {
    private static String APPLICATION_NAME = "order-join-application";
    private static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
    private static String ADDRESS_TABLE = "address";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // KTable과 KStream 인스턴스 생성
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressTable, // (1)
                        (order, address) -> order + " send to " + address) // (2)
                .to(ORDER_JOIN_STREAM); // (3)

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}

데이터 produce

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic address --property "parse.key=true" --property "key.separator=:"
>backtony:seoul

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic order --property "parse.key=true" --property "key.separator=:"
>backtony:iphone

 

데이터 consume

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic order_join --property print.key=true --property key.separator=":" --from-beginning
backtony:iphone send to seoul

 

 

- 코파티셔닝(co-partitioning) 되어 있지 않은 토픽 join

코파티셔닝되지 않은 데이터를 조인하는 방법은 두 가지가 있다.

  1. 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리   → 이 경우 복잡해짐
  2. KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용
    • GlobalKTable는 데이터가 스트림즈 어플리케이션의 모든 태스크에 동일하게 공유되어있기 때문
    • GlobalKTable로 정의된 모든 데이터를 저장하고 사용하는 것은 브로커에 많은 부하를 유발하므로 작은 용량의 데이터일 경우에만 사용하는 것을 권장
    • 많은 양의 데이터를 가진 토픽을 조인하는 경우 리파티셔닝을 통해 KTable을 사용하는 것을 권장.

 

GlobalKTable과 KStream로  join  테스트

 

토픽 생성  (파티션 크기를 다르게 생성하는 것이 포인트)

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 2 --topic address2
Created topic address.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order
Created topic order.
PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic order_join
Created topic order_join.

 

예제 코드

public class Main {
    private static String APPLICATION_NAME = "global-table-join-application";
    private static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
    private static String ADDRESS_GLOBAL_TABLE = "address2";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        // GlobalKTable 인스턴스 생성
        GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressGlobalTable, // (1)
                        (orderKey, orderValue) -> orderKey, // (2)
                        (order, address) -> order + " send to " + address) // (3)
                .to(ORDER_JOIN_STREAM); // (4)

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

 

데이터 produce

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic address2 --property "parse.key=true" --property "key.separator=:"
>kim:gangnam

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic order --property "parse.key=true" --property "key.separator=:"
>kim:card

 

데이터 consume

PS D:\develop\kafka_2.13-3.5.0\bin\windows> .\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic order_join --property print.key=true --property key.separator=":" --from-beginning
kim:card send to gangnam

 

 

 

 

 

 

 

참고

https://kafka.apache.org/26/documentation/streams/core-concepts

https://marrrang.tistory.com/63

https://2zzangho.tistory.com/88

https://unit-15.tistory.com/146

https://velog.io/@ehdrms2034/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88-DSL-%EA%B0%9C%EB%85%90

https://www.devkuma.com/docs/apache-kafka/strems/

https://findmypiece.tistory.com/342

https://gunju-ko.github.io/kafka/kafka-stream/2018/05/28/Stream-DSL.html

https://velog.io/@jwpark06/Kafka-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88-%EC%95%8C%EC%95%84%EB%B3%B4%EA%B8%B0

https://developer.confluent.io/courses/kafka-streams/ktable/

https://journal-home.s3.ap-northeast-2.amazonaws.com/site/2023w/abs/0640-NUKEC.pdf