SpringBoot集成kafka-消费者批量消费消息

在这里插入图片描述

1、消费者

设置批量接收消息

package com.power.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class EventConsumer {

    @KafkaListener(topics = {"batchTopic"},groupId="batchGroup")
    public void onEvent(List<ConsumerRecord<String,String>> records){
        System.out.println("批量消费:records.size() = "+records.size()+", records = "+records);
    }

}

2、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent(){
        for (int i = 0; i < 125; i++) {
            User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();
            String userJson = JSONUtils.toJSON(user);
            kafkaTemplate.send("batchTopic","k"+i, userJson);
        }
    }

}

3、application.yml配置文件

spring:
  application:
    #应用名称
    name: spring-boot-03-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092

    #配置消息监听器
    listener:
      #设置批量消费消息,默认是单个消息消费(single)
      type: batch

    #设置批量消费每次最多取多少条数据
    consumer:
      max-poll-records: 20
      #从第一条消息开始接收
      auto-offset-reset: earliest

4、实体类

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

5、生产者发送消息测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot03KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendEvent3() {
        eventProducer.sendEvent();
    }

}

6、测试

6.1、测试启动生产者

在这里插入图片描述

6.2、测试启动消费者

每次接收20条消息

在这里插入图片描述