Kafa 를 많이 사용한다고는 알고 있었지만, 일을 하면서 사용할 기회가 없어서(핑계!!)찾아 보지 못했다. 이번에 진행하는 프로젝트에서는 Kafka 를 사용할 일이 생겨서 Spring Boot 에서 Kafka 를 사용하는 방법을 정리해 보았습니다 :)
Kafka 란 분산 처리를 위해 개발된 Distributed streaming platform 으로 아주 아주 단순화 시키자면 고가용성이 보장되는 Message Queue 라고 이해하면 된다. 물론 Kafka 는 이외에도 더 많은 기능을 제공한다.
[Kafka 설치]
항상 그렇듯이 Kafka 설치 역시 Docker-Compose 로 쉽게 구성할 수 있다.
kafka 는 기본적으로 Kafka 의 Producer 와 Consumer 를 관리하는 Zookeeper 와 함께 구성하게 된다.
이렇게 구성하는 것은 단순하게 구성하는 것이고, 편하게 사용하기 위해서 아래의 서비스들을 Docker Compose 로 구성 되어 있습니다. 이번 구성에서는 kafka Broker 는 1개로만 구성합니다.
- Schema Registry
RESTful 인터페이스를 사용하여 스키마(Schema)를 관리하거나 조회하는 기능을 제공합니다. - Schema Registry UI
schema-registry-ui는 스키마 레지스트리의 RESTful 기능을 UI 로 제공해주는 서비스입니다. Schema 조회, 생성, 수정, 삭제를 UI 로 제공합니다.
- Kafka Rest Proxy
Kafka 클러스터에 RESTful 인터페이스를 제공합니다. API를 사용하여 메시지를 생성해서 이용할 수 있습니다. - Kafka Topics UI
Kafka Topic을 조회하고 Kafka Cluster 의 상태를 보여주는 UI 서비스로 Kafka Rest Proxy 를 위한 서비스입니다.
- Ksqldb Server
KSQL은 Kafka 클러스터와 연동 되며 Topic 에 대해 query 문을 작성해주는 서비스로 Schema Registry UI 와 연동 된다.
# Docker Compose yml 설정
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
restart: unless-stopped
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./full-stack/zoo1/data:/data
- ./full-stack/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.5.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./full-stack/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
kafka-schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: kafka-schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- zoo1
- kafka1
schema-registry-ui:
image: landoop/schema-registry-ui:0.9.4
hostname: kafka-schema-registry-ui
ports:
- "8001:8000"
environment:
SCHEMAREGISTRY_URL: http://kafka-schema-registry:8081/
PROXY: "true"
depends_on:
- kafka-schema-registry
kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:5.5.0
hostname: kafka-rest-proxy
ports:
- "8082:8082"
environment:
# KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
KAFKA_REST_HOST_NAME: kafka-rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
kafka-topics-ui:
image: landoop/kafka-topics-ui:0.9.4
hostname: kafka-topics-ui
ports:
- "8000:8000"
environment:
KAFKA_REST_PROXY_URL: "http://kafka-rest-proxy:8082/"
PROXY: "true"
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
- kafka-rest-proxy
ksqldb-server:
image: confluentinc/cp-ksqldb-server:5.5.0
hostname: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
KSQL_LISTENERS: http://0.0.0.0:8088/
KSQL_KSQL_SERVICE_ID: ksqldb-server_
depends_on:
- zoo1
- kafka1
[Spring Boot 에서 사용]
1. Spring Boot 에서 사용하기 위해 pom.xml 에 Kafka 관련 Dependency 를 추가합니다.
<!-- KAFKA -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. application.properties 에 Kafka에서 사용할 설정 값들을 추가합니다.
# KAFKA
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=new-topic
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=1000
spring.kafka.template.default-topic=new-topic
3.Kafka 로 Topic 을 보내는 Producer 클래스를 추가합니다.
package ml.jjeaby.kafkaTest
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
public @Component @Slf4j class Producer {
private KafkaProducer<String, String> producer = null;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.template.default-topic}")
private String topicName;
@PostConstruct
public void build() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
producer = new KafkaProducer<>(properties);
}
public String send() {
String result = "SEND FAIL";
Date nowDate = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
String message = "payLoad:" + dateFormat.format(nowDate);
ProducerRecord<String, String> prd = new ProducerRecord<>(this.topicName, message);
try {
producer.send(prd, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
log.info(exception.getMessage());
}
}
});
result = "SEND SUCCESS";
}
catch(Exception e) {
log.info(e.getMessage());
e.printStackTrace();
}
finally {
log.info(result + " : " + message);
producer.flush();
}
return result + " : " + message ;
}
}
4. Kafka 에서 Topic 을 받는 Consumer 클래스를 추가합니다.
package ml.jjeaby.kafkaTest
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
public @Slf4j
@Component
class Consumer {
private KafkaConsumer<String, String> consumer = null;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${spring.kafka.consumer.group-id}")
private String groupID;
@Value("${spring.kafka.consumer.value-deserializer}")
private String keyDeSerializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeSerializer;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
@Value("${spring.kafka.template.default-topic}")
private String topicName;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@PostConstruct
public void build() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeSerializer);
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeSerializer);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
consumer = new KafkaConsumer<>(properties);
}
@KafkaListener(topics = "${spring.kafka.template.default-topic}")
public void consume(@Headers MessageHeaders headers, @Payload String payload) {
log.info("CONSUME HEADERS : " + headers.toString());
log.info("CONSUME PAYLOAD : " + payload);
}
}
5. Producer 를 호출하여 Topic 을 Kafka 로 보내는 코드를 추가합니다.
public String getKafkaSendTest() {
....
producer.send();
....
}
이렇게 producer.send() 를 이용하여 Kafka 에 Topic 을 보내게 되면 Consumer 클래스에 추가한 어노테이션 " @KafkaListener(topics = “${spring.kafka.template.default-topic}”)" 에 의해 Consumer 가 호출 되어 Topic 이 로그에 보여집니다. (따로 Consumer 를 호출 하지 않아도 됩니다!)
요롷게 하면 생각보다 쉽게 kafka 를 Spring Boot 에서 사용할 수 있습니다 :)
'개발' 카테고리의 다른 글
How to fix Ethernet Network not showing(Ethernet Network not showing in Ubuntu 18.04) Tip (0) | 2020.06.09 |
---|---|
Ubuntu Unicode Error, Locale Setting (0) | 2020.06.08 |
UBUNTU NO_PUBKEY 7EA0A9C3F273FCD8 문제 해결 (0) | 2020.06.08 |
How to use Flutter’s Rest API (0) | 2020.06.08 |
Mecab-ko 설치 방법 정리 (0) | 2020.06.07 |