第四节 SpringBoot集成kafka消息队列

亮子 2023-01-16 08:48:28 17253 0 0 0

1、添加依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、application.properties配置文件

# 应用名称
spring.application.name=server-demo-kafka

# 应用服务 WEB 访问端口
server.port=8080

# kafka服务器配置
spring.kafka.bootstrap-servers=localhost:9092

# kafka生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# kafka消费者配置
spring.kafka.consumer.group-id=kafka-demo-kafka-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3、生产者发送消息

package com.sm2005a.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
@RequestMapping(value = "/kafka")
public class TestKafkaController {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @RequestMapping(value = "/msg")
    public String hello(@RequestParam("msg") String msg) {
        kafkaTemplate.send("first", msg);
        log.info("first:"+msg);

        return msg;
    }

}

4、消费者接收消息

package com.sm2005a.messag;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaMessage {

    @KafkaListener(topics = "first")
    public void recvKafkaMessage(String msg) {
        System.out.println("recv:"+msg);
    }

}