博主
258
258
258
258
专辑

第七节 kafka发消息实现消息不丢失不重复

亮子 2023-12-25 00:25:22 10105 0 0 0

1、添加依赖

        <!-- kafka依赖 -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、配置文件

spring:
  kafka:
    bootstrap-servers: 192.168.80.131:9092
    producer:  #生产者序列化器
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      ack-mode: manual
    consumer: #消费者序列化器
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual

3、注入模板引擎

    @Autowired
    KafkaTemplate kafkaTemplate;

4、发送消息,发送失败后重新发送

    @PostMapping(value = "/send")
    public Result send(@RequestBody LogVo logVo) {
        // 生成消息的唯一ID
        Long msgId = stringRedisTemplate.opsForValue().increment(logIdKey);
        logVo.setLogId(msgId.intValue());

        // 发送消息
        String msg = JSON.toJSONString(logVo);
        kafkaTemplate.send(topicName, msg)
                .addCallback(
                        new SuccessCallback() {
                            @Override
                            public void onSuccess(Object o) {
                                // 消息发送成功
                                log.info("消息发送成功:" + msg);
                            }
                        },
                        new FailureCallback() {
                            @Override
                            public void onFailure(Throwable throwable) {
                                // 记录发送失败日志

                                log.error("kafka发送消息失败:" + msg);
                                // 保障消息不丢失:发送失败再次发送

                            }
                        });
        return Result.SUCCESS();
    }

5、接收消息,并判断是否重复消息

   /***
     * @description http://edu.secby.cn/view/1472381950642249730
     * @return void
     * @author 军哥
     * @date 2023/12/14 17:59
     */
    @KafkaListener(topics = "#{'${site.logTopic}'}")
    public void recv(String msg) {
        log.info("kafka收到消息:"+msg);
        LogVo logVo = JSON.parseObject(msg, LogVo.class);

        // 保障消息不重复消费:检查是否已经消费过了
        String key = logIdKey + logVo.getLogId();
        if(stringRedisTemplate.hasKey(key)) {
            log.error("kafka:消息已经重复~~~~");
            return;
        }

        // 保存到数据库
        TbLog tbLog = new TbLog();
        BeanUtils.copyProperties(logVo, tbLog);
        tbLog.setCreateTime(new Date());
        tbLog.setUpdateTime(tbLog.getCreateTime());
        tbLogService.add(tbLog);
    }