当前位置:首页 > IT技术 > 编程语言 > 正文

KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)
2022-09-06 22:42:16


文章目录

一、基础集成
1. 技术选型

软件/框架

版本

jdk

1.8.0_202

springboot

2.5.4

kafka server

kafka_2.12-2.8.0

kafka client

2.7.1

zookeeper

3.7.0

2. 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3. kafka配置

properties版本

spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092

# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432

# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000

yml版本项目内部配置

server:
port: 8002
spring:
application:
# 应用名称
name: ly-kafka
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: nacos.server.com:8848
config:
# 配置中心地址
server-addr: nacos.server.com:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos-config 服务端配置

在这里插入代码片
4. auto-offset-reset 简述

关于
auto.offset.reset 配置有3个值可以设置,分别如下:

earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset时,从头开始消费;
latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据;
none: topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常;
默认建议用 earliest, 设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。

而 latest 这个设置容易丢失消息,假如 kafka 出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费, 中间出问题的哪些就不管了。

none 这个设置没有用过,兼容性太差,经常出问题。

5. 新增一个订单类

模拟业务系统中,用户每下一笔订单,就发送一个消息,供其他服务消费

package com.gblfy.kafka.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 订单id
*/
private long orderId;
/**
* 订单号
*/
private String orderNum;
/**
* 订单创建时间
*/
private LocalDateTime createTime;
}
6. 生产者(异步)
package com.gblfy.lykafka.provider;

import com.alibaba.fastjson.JSONObject;
import com.gblfy.common.constant.KafkaTopicConstants;
import com.gblfy.common.entity.Order;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.time.LocalDateTime;

/**
* Kafka生产者
*
* @author gblfy
* @date 2021-09-28
*/
@Service
public class KafkaProvider {
private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
// 构建一个订单类
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 发送消息,订单类的 json 作为消息体
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));

// 监听回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable e) {
log.info("发送消息失败: {}", e.getMessage());
}

@Override
public void onSuccess(SendResult<String, String> result) {
RecordMetadata metadata = result.getRecordMetadata();
log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
}
7. 消费者
package com.gblfy.lykafka.controller;

import com.gblfy.lykafka.provider.KafkaProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@RestController
@RequestMapping("/kafka")
public class KafkaProviderController {

@Autowired
private KafkaProvider kafkaProvider;

@GetMapping("/sendMQ")
public String sendMQContent() {
kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());
return "OK";
}
}

通过 @KafkaListener注解,我们可以指定需要监听的 topic 以及 groupId, 注意,这里的 topics 是个数组,意味着我们可以指定多个 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。

注意:消息发布者的 TOPIC 需要保持与消费者监听的 TOPIC 一致,否者消费不到消息。

8. kafka配置类
package com.gblfy.common.constant;

public class KafkaTopicConstants {
//kafka发送消息主题
public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";

// kafka消费者组需要和yml文件中的 kafka.consumer.group-id的值保持一致
public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";
}
9.单元测试

新建单元测试,功能测试消息发布,以及消费。

package com.gblfy.kafka;

import com.gblfy.kafka.controller.KafkaProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@SpringBootTest
class KafkaSpringbootApplicationTests {

@Autowired
private KafkaProvider kafkaProvider;

@Test
public void sendMessage() throws InterruptedException {
// 发送 1000 个消息
for (int i = 0; i < 1000; i++) {
long orderId = i+1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}
9. 效果图

KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)_java


KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)_ide_02

10. 源码地址

​https://gitee.com/gb_90/kafka-parent​

11.微服务专栏

​https://gitee.com/gb_90/micro-service-parent​


本文摘自 :https://blog.51cto.com/g

开通会员,享受整站包年服务立即开通 >