1、添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、application.properties配置文件
# 应用名称
spring.application.name=server-demo-kafka
# 应用服务 WEB 访问端口
server.port=8080
# kafka服务器配置
spring.kafka.bootstrap-servers=localhost:9092
# kafka生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka消费者配置
spring.kafka.consumer.group-id=kafka-demo-kafka-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3、生产者发送消息
package com.sm2005a.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
@RequestMapping(value = "/kafka")
public class TestKafkaController {
@Autowired
KafkaTemplate kafkaTemplate;
@RequestMapping(value = "/msg")
public String hello(@RequestParam("msg") String msg) {
kafkaTemplate.send("first", msg);
log.info("first:"+msg);
return msg;
}
}
4、消费者接收消息
package com.sm2005a.messag;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessage {
@KafkaListener(topics = "first")
public void recvKafkaMessage(String msg) {
System.out.println("recv:"+msg);
}
}