博主
258
258
258
258
专辑

第五节 Maven项目如何收发Kafka消息

亮子 2023-08-02 07:55:12 5975 0 0 0

1、添加依赖到pom文件中

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

2、发送Kafka消息代码

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaMessageProducer {

    public static void main(String[] args) {
        // 配置Kafka生产者的属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  // Kafka broker的地址和端口
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);
        
        try {
            // 发送消息到指定topic
            String topic = "test-topic";
            String key = "message-key";
            String value = "Hello, Kafka!";
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

            producer.send(record);
            System.out.println("消息发送成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();  // 关闭Kafka生产者实例
        }
    }
}

3、接收Kafka消息

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {

    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // 创建消费者配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_consumer_group_id");

        // 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        // 消费消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("接收到消息:key = " + record.key() + ", value = " + record.value());
                    // 处理接收到的消息
                }
            }
        } finally {
            consumer.close();
        }
    }
}