1、添加依赖
<!-- kafka依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置文件
spring:
kafka:
bootstrap-servers: 192.168.80.131:9092
producer: #生产者序列化器
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
ack-mode: manual
consumer: #消费者序列化器
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
3、注入模板引擎
@Autowired
KafkaTemplate kafkaTemplate;
4、发送消息,发送失败后重新发送
@PostMapping(value = "/send")
public Result send(@RequestBody LogVo logVo) {
// 生成消息的唯一ID
Long msgId = stringRedisTemplate.opsForValue().increment(logIdKey);
logVo.setLogId(msgId.intValue());
// 发送消息
String msg = JSON.toJSONString(logVo);
kafkaTemplate.send(topicName, msg)
.addCallback(
new SuccessCallback() {
@Override
public void onSuccess(Object o) {
// 消息发送成功
log.info("消息发送成功:" + msg);
}
},
new FailureCallback() {
@Override
public void onFailure(Throwable throwable) {
// 记录发送失败日志
log.error("kafka发送消息失败:" + msg);
// 保障消息不丢失:发送失败再次发送
}
});
return Result.SUCCESS();
}
5、接收消息,并判断是否重复消息
/***
* @description http://edu.secby.cn/view/1472381950642249730
* @return void
* @author 军哥
* @date 2023/12/14 17:59
*/
@KafkaListener(topics = "#{'${site.logTopic}'}")
public void recv(String msg) {
log.info("kafka收到消息:"+msg);
LogVo logVo = JSON.parseObject(msg, LogVo.class);
// 保障消息不重复消费:检查是否已经消费过了
String key = logIdKey + logVo.getLogId();
if(stringRedisTemplate.hasKey(key)) {
log.error("kafka:消息已经重复~~~~");
return;
}
// 保存到数据库
TbLog tbLog = new TbLog();
BeanUtils.copyProperties(logVo, tbLog);
tbLog.setCreateTime(new Date());
tbLog.setUpdateTime(tbLog.getCreateTime());
tbLogService.add(tbLog);
}