Java整合Kafka实现生产及消费
admin
2024-03-13 15:14:34

文章目录

  • 前提条件
  • 项目环境
  • 创建Topic
  • 生产消息
    • 生产者参数配置
    • 生产自定义分区策略
    • 生产到指定分区
  • 消费消息
    • 消费参数配置
    • 设置offset

前提条件

  • 搭建Kafka环境,参考Kafka集群环境搭建及使用
  • Java环境:JDK1.8
  • Maven版本:apache-maven-3.8.6
  • 开发工具:IntelliJ IDEA

项目环境

  1. 创建maven项目。
  2. pom.xml文件中引入kafka依赖。
org.apache.kafkakafka_2.112.1.0

创建Topic

创建topic命名为testtopic并指定2个分区。

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2

生产消息

public class Producer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 生产参数配置Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer kafkaProducer = new KafkaProducer(properties);int i=0;while (true) {//生产消息Future future = kafkaProducer.send(new ProducerRecord("testtopic", "key"+i, "value"+i));//获取生产的数据信息RecordMetadata recordMetadata = future.get();System.out.println("time:"+recordMetadata.timestamp()+" key:"+i+" value:"+i+" partition:"+recordMetadata.partition()+" offset:"+recordMetadata.offset());Thread.sleep(1000);i+=1;}}
}

生产者参数配置

// ACK机制,默认为1 (0,1,-1)
properties.setProperty(ProducerConfig.ACKS_CONFIG, "");
// Socket发送消息缓冲区大小,默认为128K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.SEND_BUFFER_CONFIG, ""); 
// Socket接收消息缓冲区大小,默认为32K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG, ""); 
// 生产者客户端发送消息的最大值,默认1M
properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ""); 
// 发送消息异常时重试次数,默认为0
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "");   
// 重试间隔时间,默认100
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "");    
// 生产消息自定义分区策略类
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "");
// 开启幂等 ,默认true
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "");

更多配置信息查看ProducerConfig类

生产自定义分区策略

  1. 创建分区策略类,实现org.apache.kafka.clients.producer.Partitioner接口,编写具体策略。
public class PartitionPolicy implements Partitioner {private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = this.nextValue(topic);List availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return ((PartitionInfo)availablePartitions.get(part)).partition();} else {return Utils.toPositive(nextValue) % numPartitions;}} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}@Overridepublic void close() {}@Overridepublic void configure(Map map) {}
}
  1. 参数配置。
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionPolicy.class.getName());

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value)

Future future = kafkaProducer.send(new ProducerRecord("testtopic", 1, "key"+i, "value"+i));

消费消息

public class Consumer {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//约定的编解码properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group");//默认为自动提交properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//当设置为自动提交时,默认5秒自动提交//properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");////properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "5000");KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);//订阅topickafkaConsumer.subscribe(Arrays.asList("testtopic"));Set assignment = kafkaConsumer.assignment();ConsumerRecords records = null;while (assignment.size() == 0) {records = kafkaConsumer.poll(Duration.ofMillis(100));assignment = kafkaConsumer.assignment();}/*//1.根据时间戳获取 offset,设置 offsetMap offsetsForTimes=new HashMap<>();for (TopicPartition topicPartition : assignment) {offsetsForTimes.put(topicPartition,1669972273941L);}Map offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(offsetsForTimes);offsetAndTimestampMap.forEach((tp,offsettime)->{kafkaConsumer.seek(tp,offsettime.offset());});*//*//2.指定从头开始消费kafkaConsumer.seekToBeginning(assignment);*//*//3.指定从某offset开始消费kafkaConsumer.seek(tp,0);*/while (true) {if (records.isEmpty()) {Thread.sleep(3000);} else {System.out.printf("records count:" + records.count());Iterator> iterator = records.iterator();while (iterator.hasNext()) {ConsumerRecord record = iterator.next();System.out.println(" time:" + record.timestamp() + " key:" + record.key() + " value:" + record.value() + " partition:" + record.partition() + " offset:" + record.offset());}kafkaConsumer.commitSync();}records = kafkaConsumer.poll(Duration.ofMillis(0));}}
}

消费参数配置

// 消费者必须指定一个消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
// 消费者每次最多POLL的数量
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
// 消费者POLL的时间间隔
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_DOC, "");
// 设置是否自动提交,默认为true
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");  
// 如果是自动提交,默认5s后提交,会发生丢失消息和重复消费情况
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");   
// 当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。

更多配置信息查看ConsumerConfig类

设置offset

如代码所示,设置offset的几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

相关内容

热门资讯

国产C919飞抵迪拜 将亮相迪... 据中国南方航空公司提供的消息,中国南方航空一架C919飞机于当地时间14日凌晨1点15分顺利抵达阿联...
RUN动海棠·运动季即将开幕,... 记者从海南省三亚市海棠区获悉,“RUN动海棠·运动季”将于11月15日在华润海棠湾RUN运动公园启幕...
港科大突破:AI实现类人复杂推... 这项由香港科技大学(广州)深层互学科智能实验室的杨佳宇、范宇轩、赖松宁等研究员与英国剑桥大学、北京航...
内化AI能力成转型关键,百度想... 来源:猎云精选,文/王非 “生成未来”、“应用来了”之后,AI行业迎来“效果涌现”。 2024年百度...
科技巨头「偷偷借钱」搞AI,次... 作者 | 王晗玉 编辑 | 张帆 近日,Meta发布公告,确认公司将于2028年前在美国投资6000...