SpringBoot自定义配置连接kafka

M 2023-05-26 03:08:25 17213 0 0 0

1、添加依赖

<dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
</dependency>

2、application.yml配置文件

spring:
  kafka:
    consumer:
      group-id: lihanfeng-admin
      #自动提交,如果为false,spring会帮助我们自动提交
      enable-auto-commit: true
      #在各分区下有提交的offset时:从offset处开始消费
      #在各分区下无提交的offset时:从最新的数据开始消费
      auto-offset-reset: latest
      max-poll-records: 1 # 每次只消费一条消息
      #自动提交间隔
      auto-commit-interval: 1000
      #心跳间隔
      heartbeat-interval: 3000
      #后台的心跳线程必须在30秒之内提交心跳,否则会rebalance
      session:
       timeout:
         ms: 20000
    #生产者配置
    producer:
      #isr + leader都确定收到
      acks: all
      # 可重试错误的重试次数,例如“连接错误”、“无主且未选举出新leader”
      retries: 0
      # 同一批次内存大小(默认16k
      batch-size: 4096
      #生产者内存缓存区大小
      buffer-memory: 40960
      #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
      linger:
       ms: 1000
    #连接地址 ip:port
    bootstrap-servers: 127.0.0.1:9092
  jackson:
    time-zone: asia/shanghai

3、代码初始化连接kafka

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka消费者配置类
 *
 * @author lihanfeng
 * @date 2023/5/24
 */
@Configuration
@EnableKafka
@DependsOn("cache")
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;

    @Value("${spring.kafka.consumer.session.timeout.ms}")
    private String sessionTimeoutMs;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactory());
       factory.getContainerProperties().setPollTimeout(1500);
       return factory;
    }

    private Map<String, Object> consumerConfigs() {
       Map<String, Object> propsMap = new HashMap<>(10);
       propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
       propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
       propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
       propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
       propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
       propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
       propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       return propsMap;
    }

    private ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaConsumerListener consumer() {
       return new KafkaConsumerListener();
    }
}

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka生产者配置类
 *
 * @author lihanfeng
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.linger.ms}")
    private String lingerMs;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    private Map<String, Object> producerConfigs() {
       Map<String, Object> props = new HashMap<>(10);
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
       props.put(ProducerConfig.ACKS_CONFIG, acks);
       props.put(ProducerConfig.RETRIES_CONFIG, retries);
       props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
       props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
       props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       return props;
    }

    private ProducerFactory<String, String> producerFactory() {
       return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> producer() {
       return new KafkaTemplate<>(producerFactory());
    }
}

4、消息实体类

import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 自定义Kafka消息格式
 *
 * @author lihanfeng
 * @date 2023/05/24
 */
@Data
@NoArgsConstructor
public class TaskMessage<T> {
    /**
     * 消息数据实体
     */
    private Object data;

    /**
     * 时间戳
     */
    private Long timestamp;

    public TaskMessage(Object data){
       this.data = data;
       this.timestamp = System.currentTimeMillis();
    }

    public TaskMessage(Object data,Long timestamp){
       this.data = data;
       this.timestamp = timestamp;
    }

}

5、发送消息方法

public class KafkaMessageService {

@Resource
private KafkaTemplate<String, String> producer;


/**
* 发送kafka消息
* @param taskMessage 消息体
* @param topic 消息topic
*/
public void sendKafkaMessage(TaskMessage taskMessage, String topic ){
    ListenableFuture<SendResult<String, String>> future = null;
       try {
         future = producer.send(topic, new ObjectMapper().writeValueAsString(taskMessage));
       } catch (JsonProcessingException e) {
         log.info("数据格式异常,无法转化成json串");
       }catch (NullPointerException e){
         log.error("kafka对象为null 请检查");
       }
}


/**
* 调用发送消息方法
*/
    public void sendKafkaMessage() {
       TaskMessage taskMessage = new TaskMessage<>();
       taskMessage.setData(123);//对象实体类
       taskMessage.setTimestamp(1234566L);//时间戳(毫秒时间戳)
       kafkaMessageService.sendKafkaMessage(taskMessage, "lihanfeng-topic");
    }

6、接受消息

@KafkaListener(topics = {"lihanfeng-topic"})
@Transactional(rollbackOn = Exception.class)
public void listenAgentMessage(ConsumerRecord<?, ?> data) {
    try {
         String body = record.value().toString();
         TaskMessage taskMessage = new ObjectMapper().readValue(body, TaskMessage.class);
         log.info("接收到agent 任务队列消息:{}", taskMessage.toString());
       } catch (Exception e) {
         log.error("消费任务队列消息异常", e);
       }
}