Spring-Boot集成Kafka
【生产者】配置
maven -- pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml
# 更多配置见:https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#appendix.application-properties
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
# 消息重发的次数
retries: 0
# 一个批次可占多少内存
# 批次(batch):生产者可以以批次的形式推送消息,一个批次包含多条消息;
# 可设置什么时候推送批次:批次中消息累计数量(500条)、时间间隔(100ms)、批次大小(64KB)等
batch-size: 16384
# 生产者内存缓冲区大小
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
【生产者】发送Kafka消息
Service类
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
// 构造器注入KafkaTemplate
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 同步发送消息(阻塞等待结果)
*/
public void sendMessageSync(String topic, String key, String message) {
try {
// 发送消息并等待确认
SendResult<String, String> result = kafkaTemplate.send(topic, key, message).get();
System.out.println("发送成功 -> Topic: " + topic
+ ", Partition: " + result.getRecordMetadata().partition()
+ ", Offset: " + result.getRecordMetadata().offset());
} catch (Exception e) {
System.err.println("发送失败: " + e.getMessage());
// 可添加重试或补偿逻辑
}
}
/**
* 异步发送消息(非阻塞,回调处理)
*/
public void sendMessageAsync(String topic, String key, String message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("发送成功 -> Topic: " + topic
+ ", Partition: " + result.getRecordMetadata().partition());
}
@Override
public void onFailure(Throwable ex) {
System.err.println("发送失败: " + ex.getMessage());
// 可记录失败消息到数据库或重试队列
}
});
}
}
【生产者】配置负载均衡策略
【方案1】直接通过partition入参指定分区号
public void sendToPartition(String topic, int partition, String key, String message) {
// 直接使用带分区参数的 send 方法
kafkaTemplate.send(topic, partition, key, message)
.addCallback(
result -> handleSuccess(result),
ex -> handleFailure(ex)
);
}
【方案2】自定义分区策略(基于分区键)
application.yml
spring:
kafka:
producer:
properties:
partitioner.class: com.example.kafka.CustomPartitioner
实现自定义分区器:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 示例:根据键的哈希值计算分区
if (key != null) {
return Math.abs(key.hashCode()) % numPartitions;
}
// 没有键时使用随机分区
return ThreadLocalRandom.current().nextInt(numPartitions);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
【消费者】配置
maven -- pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka服务器地址
consumer:
group-id: my-consumer-group # 消费者组ID
auto-offset-reset: earliest # 从最早的消息开始消费(可选:latest/earliest)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false # 关闭自动提交偏移量(推荐手动提交)
【消费者】消费监听类
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
// 监听指定Topic,并发消费者线程数为3(根据分区数调整)
@KafkaListener(
topics = "your-topic-name",
concurrency = "3",
containerFactory = "kafkaListenerContainerFactory"
)
public void consume(String message, Acknowledgment ack) {
try {
// 1. 处理消息逻辑
System.out.println("Received message: " + message);
// 模拟业务处理(如数据库操作、计算等)
processMessage(message);
// 2. 手动提交偏移量(确保消息处理成功后再提交)
ack.acknowledge();
} catch (Exception e) {
// 3. 处理异常(如重试、记录日志等)
System.err.println("消费失败: " + e.getMessage());
// 可根据业务决定是否提交偏移量(如不提交则下次重试)
}
}
private void processMessage(String message) {
// 实际业务逻辑
}
}
kafka的事务控制
- 假设你的是SpringBoot项目,需要application.properties处添加事务的配置
# 启用幂等性生产者(使用事务必须)
spring.kafka.producer.properties.enable.idempotence=true
# 隔离级别(默认值是read_uncommitted)
spring.kafka.producer.properties.isolation.level=read_committed
- 创建KafkaTemplate并添加事务管理器
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 其他配置...
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
}
- 使用KafkaTemplate发送事务消息
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendTransactionalMessage() {
kafkaTemplate.executeInTransaction(operations -> {
try {
operations.send("topic1", "key1", "value1");
operations.send("topic2", "key2", "value2");
// 其他业务逻辑...
// 如果一切正常,事务会自动提交
} catch (Exception e) {
// 如果发生异常,事务会自动回滚
throw e;
}
});
}
}