粉粉蕉的笔记本粉粉蕉的笔记本
  • JAVA

    • 代码笔记
    • Java8实战
    • 分布式事务实战(Seata)
    • 模板引擎(FreeMarker)
    • SpringSecurity
  • PYTHON

    • 概述
    • python3
    • python3(菜鸟教程)
    • pandas
    • numpy
    • matplotlib
  • 中间件

    • Kafka
    • RocketMQ
    • Redis
    • MongoDB
    • Elastic Search
  • 数据库

    • Mysql
  • 设计模式
  • 运维

    • linux命令速查
    • windows命令速查
    • Docker笔记
    • kubernetes学习笔记
    • kubernetes实操笔记
    • 运维工具大全
    • git操作宝典
  • 大数据

    • 概览
    • Hadoop
    • Hive
  • 机器学习

    • 机器学习概览
  • 概率论
  • 线性代数
  • 统计学
  • 金融知识学习
  • 聚宽
  • 因子分析
  • RSS
  • 资源导航
  • 医保
  • 健身

    • 笔记
    • 训练计划
  • 装修攻略
  • 读书笔记

    • 《深度学习》
我也想搭建这样的博客!
🚋开往
  • JAVA

    • 代码笔记
    • Java8实战
    • 分布式事务实战(Seata)
    • 模板引擎(FreeMarker)
    • SpringSecurity
  • PYTHON

    • 概述
    • python3
    • python3(菜鸟教程)
    • pandas
    • numpy
    • matplotlib
  • 中间件

    • Kafka
    • RocketMQ
    • Redis
    • MongoDB
    • Elastic Search
  • 数据库

    • Mysql
  • 设计模式
  • 运维

    • linux命令速查
    • windows命令速查
    • Docker笔记
    • kubernetes学习笔记
    • kubernetes实操笔记
    • 运维工具大全
    • git操作宝典
  • 大数据

    • 概览
    • Hadoop
    • Hive
  • 机器学习

    • 机器学习概览
  • 概率论
  • 线性代数
  • 统计学
  • 金融知识学习
  • 聚宽
  • 因子分析
  • RSS
  • 资源导航
  • 医保
  • 健身

    • 笔记
    • 训练计划
  • 装修攻略
  • 读书笔记

    • 《深度学习》
我也想搭建这样的博客!
🚋开往
    • 概述
    • 原理
    • 开发笔记
    • 部署&运维

Spring-Boot集成Kafka

【生产者】配置

maven -- pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml

# 更多配置见:https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#appendix.application-properties
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 消息重发的次数
      retries: 0
      # 一个批次可占多少内存
      # 批次(batch):生产者可以以批次的形式推送消息,一个批次包含多条消息;
      # 可设置什么时候推送批次:批次中消息累计数量(500条)、时间间隔(100ms)、批次大小(64KB)等
      batch-size: 16384
      # 生产者内存缓冲区大小
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

【生产者】发送Kafka消息

Service类

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    // 构造器注入KafkaTemplate
    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 同步发送消息(阻塞等待结果)
     */
    public void sendMessageSync(String topic, String key, String message) {
        try {
            // 发送消息并等待确认
            SendResult<String, String> result = kafkaTemplate.send(topic, key, message).get();
            System.out.println("发送成功 -> Topic: " + topic 
                + ", Partition: " + result.getRecordMetadata().partition() 
                + ", Offset: " + result.getRecordMetadata().offset());
        } catch (Exception e) {
            System.err.println("发送失败: " + e.getMessage());
            // 可添加重试或补偿逻辑
        }
    }

    /**
     * 异步发送消息(非阻塞,回调处理)
     */
    public void sendMessageAsync(String topic, String key, String message) {
        ListenableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(topic, key, message);

        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送成功 -> Topic: " + topic 
                    + ", Partition: " + result.getRecordMetadata().partition());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.err.println("发送失败: " + ex.getMessage());
                // 可记录失败消息到数据库或重试队列
            }
        });
    }
}

【生产者】配置负载均衡策略

【方案1】直接通过partition入参指定分区号

public void sendToPartition(String topic, int partition, String key, String message) {
    // 直接使用带分区参数的 send 方法
    kafkaTemplate.send(topic, partition, key, message)
        .addCallback(
            result -> handleSuccess(result),
            ex -> handleFailure(ex)
        );
}

【方案2】自定义分区策略(基于分区键)

application.yml

spring:
  kafka:
    producer:
      properties:
        partitioner.class: com.example.kafka.CustomPartitioner

实现自定义分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // 示例:根据键的哈希值计算分区
        if (key != null) {
            return Math.abs(key.hashCode()) % numPartitions;
        }
        
        // 没有键时使用随机分区
        return ThreadLocalRandom.current().nextInt(numPartitions);
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

【消费者】配置

maven -- pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka服务器地址
    consumer:
      group-id: my-consumer-group      # 消费者组ID
      auto-offset-reset: earliest       # 从最早的消息开始消费(可选:latest/earliest)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false         # 关闭自动提交偏移量(推荐手动提交)

【消费者】消费监听类

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    // 监听指定Topic,并发消费者线程数为3(根据分区数调整)
    @KafkaListener(
            topics = "your-topic-name",
            concurrency = "3",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(String message, Acknowledgment ack) {
        try {
            // 1. 处理消息逻辑
            System.out.println("Received message: " + message);
            // 模拟业务处理(如数据库操作、计算等)
            processMessage(message);

            // 2. 手动提交偏移量(确保消息处理成功后再提交)
            ack.acknowledge();
        } catch (Exception e) {
            // 3. 处理异常(如重试、记录日志等)
            System.err.println("消费失败: " + e.getMessage());
            // 可根据业务决定是否提交偏移量(如不提交则下次重试)
        }
    }

    private void processMessage(String message) {
        // 实际业务逻辑
    }
}

kafka的事务控制

  1. 假设你的是SpringBoot项目,需要application.properties处添加事务的配置
# 启用幂等性生产者(使用事务必须)
spring.kafka.producer.properties.enable.idempotence=true
# 隔离级别(默认值是read_uncommitted‌)
spring.kafka.producer.properties.isolation.level=read_committed
  1. 创建KafkaTemplate并添加事务管理器
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 其他配置...

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public KafkaTransactionManager<String, String> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }
}
  1. 使用KafkaTemplate发送事务消息
@Service
public class KafkaService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendTransactionalMessage() {
        kafkaTemplate.executeInTransaction(operations -> {
            try {
                operations.send("topic1", "key1", "value1");
                operations.send("topic2", "key2", "value2");
                // 其他业务逻辑...
                // 如果一切正常,事务会自动提交
            } catch (Exception e) {
                // 如果发生异常,事务会自动回滚
                throw e;
            }
        });
    }
}
Last Updated: 7/18/25, 10:21 AM
Contributors: dongyz8
Prev
原理
Next
部署&运维