准备工作
- 安装kafka,版本为2.6.0
- 安装zookeeper,版本为3.6.2
- 启动kafka
- 启动zookeeper
- SpringBoot版本2.3.1.RELEASE
核心目录结构
- KafkaConsumer.java 消费者
- KafkaProducerController 生产者
- application.yaml 主配置文件
- application-kafka.properties kafka配置文件
代码
pom依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.6.1</version> </dependency>
|
application-kafka.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.batch-size=16
spring.kafka.producer.retries=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.max-poll-records=1000
topicName.topic1=test1 topicName.topic2=test2
|
KafkaProducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @RestController @RequestMapping("/kafka") public class KafkaProducerController {
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/send") public ResponseEntity<?> send(@RequestParam String topic, @RequestParam String msg) { kafkaTemplate.send(topic, msg); log.info("[Producer] topic: {}; msg: {}", topic, msg); return new ResponseEntity<>(HttpStatus.OK); } }
|
KafkaConsumer
1 2 3 4 5 6 7 8 9 10
| @Slf4j @Component public class KafkaConsumer {
@KafkaListener(topics = "${topicName.topic1}", groupId = "${spring.kafka.consumer.group-id}") @KafkaListener(topics = "${topicName.topic2}", groupId = "${spring.kafka.consumer.group-id}") public void listen(ConsumerRecord<?, ?> record) { log.info("[Consumer] topic: {}; msg: {}; offset: {}", record.topic(), record.value(), record.offset()); } }
|
或者下面的这种写法,两种写法都能同时监听到多个topic的消息
1 2 3 4 5 6 7 8 9 10 11 12
| @Slf4j @Component public class KafkaConsumer {
@KafkaListeners({ @KafkaListener(topics = "${topicName.topic1}"), @KafkaListener(topics = "${topicName.topic2}") }) public void listen(ConsumerRecord<?, ?> record) { log.info("[Consumer] topic: {}; msg: {}; offset: {}", record.topic(), record.value(), record.offset()); } }
|
测试
使用postman向test1主题与test2主题分别发送一条消息,查看日志。
日志:
测试成功!
消费者成功的接收到了生产者发送的消息。