1、添加依赖到pom文件中
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
2、发送Kafka消息代码
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaMessageProducer {
public static void main(String[] args) {
// 配置Kafka生产者的属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka broker的地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
try {
// 发送消息到指定topic
String topic = "test-topic";
String key = "message-key";
String value = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
System.out.println("消息发送成功!");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close(); // 关闭Kafka生产者实例
}
}
}
3、接收Kafka消息
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// 创建消费者配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_consumer_group_id");
// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("接收到消息:key = " + record.key() + ", value = " + record.value());
// 处理接收到的消息
}
}
} finally {
consumer.close();
}
}
}