본문 바로가기

개발

Spring Boot With Kafka Single Broker

728x90

Kafa 를 많이 사용한다고는 알고 있었지만, 일을 하면서 사용할 기회가 없어서(핑계!!)찾아 보지 못했다. 이번에 진행하는 프로젝트에서는 Kafka 를 사용할 일이 생겨서 Spring Boot 에서 Kafka 를 사용하는 방법을 정리해 보았습니다 :)

 

 

 

Kafka 란 분산 처리를 위해 개발된 Distributed streaming platform 으로 아주 아주 단순화 시키자면 고가용성이 보장되는 Message Queue 라고 이해하면 된다. 물론 Kafka 는 이외에도 더 많은 기능을 제공한다.

 

 

[Kafka 설치]

항상 그렇듯이 Kafka 설치 역시 Docker-Compose 로 쉽게 구성할 수 있다.

 

kafka 는 기본적으로 Kafka 의 Producer 와 Consumer 를 관리하는 Zookeeper 와 함께 구성하게 된다.

 

 

이렇게 구성하는 것은 단순하게 구성하는 것이고, 편하게 사용하기 위해서 아래의 서비스들을 Docker Compose 로 구성 되어 있습니다. 이번 구성에서는 kafka Broker 는 1개로만 구성합니다.

 

 

  • Schema Registry
    RESTful 인터페이스를 사용하여 스키마(Schema)를 관리하거나 조회하는 기능을 제공합니다.
  • Schema Registry UI
    schema-registry-ui는 스키마 레지스트리의 RESTful 기능을 UI 로 제공해주는 서비스입니다. Schema 조회, 생성, 수정, 삭제를 UI 로 제공합니다.

 

  • Kafka Rest Proxy
    Kafka 클러스터에 RESTful 인터페이스를 제공합니다. API를 사용하여 메시지를 생성해서 이용할 수 있습니다.
  • Kafka Topics UI
    Kafka Topic을 조회하고 Kafka Cluster 의 상태를 보여주는 UI 서비스로 Kafka Rest Proxy 를 위한 서비스입니다.

 

  • Ksqldb Server
    KSQL은 Kafka 클러스터와 연동 되며 Topic 에 대해 query 문을 작성해주는 서비스로 Schema Registry UI 와 연동 된다.

# Docker Compose yml 설정

version: '2.1'
services:
  zoo1:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./full-stack/zoo1/data:/data
      - ./full-stack/zoo1/datalog:/datalog
kafka1:
    image: confluentinc/cp-kafka:5.5.0
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./full-stack/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1
kafka-schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: kafka-schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    depends_on:
      - zoo1
      - kafka1
schema-registry-ui:
    image: landoop/schema-registry-ui:0.9.4
    hostname: kafka-schema-registry-ui
    ports:
      - "8001:8000"
    environment:
      SCHEMAREGISTRY_URL: http://kafka-schema-registry:8081/
      PROXY: "true"
    depends_on:
      - kafka-schema-registry
kafka-rest-proxy:
    image: confluentinc/cp-kafka-rest:5.5.0
    hostname: kafka-rest-proxy
    ports:
      - "8082:8082"
    environment:
      # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
      KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
      KAFKA_REST_HOST_NAME: kafka-rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry
kafka-topics-ui:
    image: landoop/kafka-topics-ui:0.9.4
    hostname: kafka-topics-ui
    ports:
      - "8000:8000"
    environment:
      KAFKA_REST_PROXY_URL: "http://kafka-rest-proxy:8082/"
      PROXY: "true"
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry
      - kafka-rest-proxy
ksqldb-server:
    image: confluentinc/cp-ksqldb-server:5.5.0
    hostname: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      KSQL_LISTENERS: http://0.0.0.0:8088/
      KSQL_KSQL_SERVICE_ID: ksqldb-server_
    depends_on:
      - zoo1
      - kafka1

 

[Spring Boot 에서 사용]

1. Spring Boot 에서 사용하기 위해 pom.xml 에 Kafka 관련 Dependency 를 추가합니다.

<!-- KAFKA -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

2. application.properties 에 Kafka에서 사용할 설정 값들을 추가합니다.

# KAFKA
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=new-topic
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=1000
spring.kafka.template.default-topic=new-topic

 

3.Kafka 로 Topic 을 보내는 Producer 클래스를 추가합니다.

package ml.jjeaby.kafkaTest
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import javax.annotation.PostConstruct;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

public @Component @Slf4j class Producer {

    private KafkaProducer<String, String> producer = null;

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.template.default-topic}")
    private String topicName;

    @PostConstruct
    public void build() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        producer = new KafkaProducer<>(properties);
    }

    public String send() {
        String result = "SEND FAIL";

        Date nowDate = new Date();
        SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
        String message = "payLoad:" + dateFormat.format(nowDate);
        ProducerRecord<String, String> prd = new ProducerRecord<>(this.topicName, message);


        try {
            producer.send(prd, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception != null) {
                        log.info(exception.getMessage());
                    }
                }
            });
            result = "SEND SUCCESS";
        }
        catch(Exception e) {
            log.info(e.getMessage());
            e.printStackTrace();
        }
        finally {
            log.info(result + " : " + message);
            producer.flush();
        }

        return result + " : " + message ;

    }
}

 

4. Kafka 에서 Topic 을 받는 Consumer 클래스를 추가합니다.

package ml.jjeaby.kafkaTest
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

public @Slf4j
@Component
class Consumer {

    private KafkaConsumer<String, String> consumer = null;

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupID;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String keyDeSerializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeSerializer;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.template.default-topic}")
    private String topicName;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

    @PostConstruct
    public void build() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeSerializer);
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeSerializer);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        consumer = new KafkaConsumer<>(properties);
    }


    @KafkaListener(topics = "${spring.kafka.template.default-topic}")
    public void consume(@Headers MessageHeaders headers, @Payload String payload) {
        log.info("CONSUME HEADERS : " + headers.toString());
        log.info("CONSUME PAYLOAD : " + payload);
    }

}

 

5. Producer 를 호출하여 Topic 을 Kafka 로 보내는 코드를 추가합니다.

public String getKafkaSendTest() {
      ....
      
      producer.send();
      ....
    }

 

이렇게 producer.send() 를 이용하여 Kafka 에 Topic 을 보내게 되면 Consumer 클래스에 추가한 어노테이션 " @KafkaListener(topics = “${spring.kafka.template.default-topic}”)" 에 의해 Consumer 가 호출 되어 Topic 이 로그에 보여집니다. (따로 Consumer 를 호출 하지 않아도 됩니다!)

 

 

요롷게 하면 생각보다 쉽게 kafka 를 Spring Boot 에서 사용할 수 있습니다 :)