(1). 概述

在这一小节,主要学习下Kafka生产者的使用与配置.

(2). 生产者ACK配置

“Kafka官网配置参考”

在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞,集群什么时候返回ack,此时,ack有三个配置项:

(3). 生产者缓冲区配置

(4). 引入依赖

<!-- 注意:要与kafka安装版本保持一致 -->
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.8.1</version>
</dependency>

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
	<version>1.2.4</version>
</dependency>
<dependency>
	<groupId>junit</groupId>
	<artifactId>junit</artifactId>
	<version>4.12</version>
	<scope>test</scope>
</dependency>

(5). 生产者发送消息案例

package help.lixin.kafka.example.producer;

import com.alibaba.fastjson.JSON;
import com.sun.tools.corba.se.idl.StringGen;
import help.lixin.kafka.example.producer.model.User;
import org.apache.kafka.clients.producer.*;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class TestProducer {
    private Properties kafkaProperties = null;
    private String topic = "hello";

    @Before
    public void initProperties() {
        kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
        kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    }

    /**
     * 测试同步发送消息(默认重试3次)
     * RESULT:topic:hello,partition:1,offset:8
     * RESULT:topic:hello,partition:0,offset:3
     * RESULT:topic:hello,partition:1,offset:9
     * RESULT:topic:hello,partition:1,offset:10
     */
    @Test
    public void testSync() {
        Producer<String, String> producer = new KafkaProducer(kafkaProperties);

        int condition = 5;
        for (int i = 1; i <= 1000; i++) {
            if (i == condition) {
                break;
            }

            User user = User.newBuilder()
                    .setId(i)
                    .setName("张三")
                    .setAge(25)
                    .builder();
            // hash(key) % partitionNum
            ProducerRecord<String, String> record = new ProducerRecord(
                    topic,                     // topic
                    user.getId().toString(),  // key
                    JSON.toJSONString(user)); // value
            try {
                RecordMetadata metadata = producer.send(record).get();
                String format = String.format("topic:%s,partition:%d,offset:%d", metadata.topic(), metadata.partition(), metadata.offset());
                System.out.println("RESULT:" + format);
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 同步发送,默认会重试3次,当3次失败后,会抛出异常.
                // 可以记录到日志里,人工干预处理.
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 测试异步发送消息()
     * RESULT:topic:hello,partition:0,offset:2
     * RESULT:topic:hello,partition:1,offset:5
     * RESULT:topic:hello,partition:1,offset:6
     * RESULT:topic:hello,partition:1,offset:7
     *
     * @throws Exception
     */
    @Test
    public void testAsync() {
        Producer<String, String> producer = new KafkaProducer(kafkaProperties);

        int condition = 10;
        for (int i = 6; i <= 1000; i++) {
            if (i == condition) {
                break;
            }
            User user = User.newBuilder()
                    .setId(i)
                    .setName("张三")
                    .setAge(25)
                    .builder();

            // 异步发送
            // hash(key) % partitionNum
            ProducerRecord<String, String> record = new ProducerRecord(
                    topic,                     // topic
                    user.getId().toString(),  // key
                    JSON.toJSONString(user)); // value

            // Callback
            producer.send(record, (metadata, ex) -> {

                if (null != metadata) {
                    String format = String.format("topic:%s,partition:%d,offset:%d", metadata.topic(), metadata.partition(), metadata.offset());
                    System.out.println("RESULT:" + format);
                }

                if (null != ex) {
                    System.out.println(ex.getMessage());
                }
            });
        }

        try {
            // sleep , wait callback end
            TimeUnit.SECONDS.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    /**
     * 测试消息保存在指定的分区下
     * RESULT:topic:hello,partition:1,offset:11
     * RESULT:topic:hello,partition:1,offset:12
     * RESULT:topic:hello,partition:1,offset:13
     * RESULT:topic:hello,partition:1,offset:14
     *
     * @throws Exception
     */
    @Test
    public void testPartition() throws Exception {
        Producer<String, String> producer = new KafkaProducer(kafkaProperties);

        int condition = 5;
        for (int i = 1; i <= 1000; i++) {
            if (i == condition) {
                break;
            }

            User user = User.newBuilder()
                    .setId(i)
                    .setName("张三")
                    .setAge(25)
                    .builder();
            // hash(key) % partitionNum
            ProducerRecord<String, String> record = new ProducerRecord(
                    topic,                     // topic
                    1,                // partition
                    user.getId().toString(),  // key
                    JSON.toJSONString(user)); // value
            RecordMetadata metadata = producer.send(record).get();
            String format = String.format("topic:%s,partition:%d,offset:%d", metadata.topic(), metadata.partition(), metadata.offset());
            System.out.println("RESULT:" + format);
        }
    }


    /**
     * 测试同步发送消息,并且配置ack
     */
    @Test
    public void testAckConfig() {
        // *****************************ACK配置***************************************
        /**
         * ack = 0  , 生产者发送消息后,不需要等待kafka多副本中任何一个broder持久化消息.性能得到了提升了,但是,会丢失消息.
         * ack = 1  , 生产者发送消息后,要等到kafka多副本之间的leader已经收到消息,并把消息持久化到磁盘上了,但是,leader挂了之后,数据也真会丢失.
         * ack = -1 , 生产者发送消息后,要等到kafka多副本之间的follwer已经收到消息,并把消息持久化到磁盘上了,这样提供者才能继续发送下一条信息,但是,吞吐量得不到提升.
         */
        kafkaProperties.put(ProducerConfig.ACKS_CONFIG, -1);

        // ******************************重试配置**************************************
        // 同步重试次数为:3
        kafkaProperties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 重试间隔(3秒后重试)
        kafkaProperties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);

        // *****************************缓冲区配置***************************************
        // 1. Message --> Buffer(32M)
        // 2. Kafka Producer Thread(Poll 16KB) Message --> Buffer(32M)
        // 3. Kafka Producer Thread --> Send Broder
        // Kafka的客户端发送数据到服务器,是经过缓冲(不是来一条就发一条),也就是说:通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高.
        // buffer.memory的本质就是用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB.
        kafkaProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // 当一批消息体大小达到这个batch.size的时候会发送,默认是:16KB
        kafkaProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // 不论消息是否在缓存冲达到16K,都会在规定的时间内(linger.ms),把消息发送出去
        // 10毫秒内,batch没有满的情况下,也必须把消息发送出去,不能让消息发送延迟时间太长.
        kafkaProperties.put(ProducerConfig.LINGER_MS_CONFIG, 10);

        Producer<String, String> producer = new KafkaProducer(kafkaProperties);

        int condition = 10;
        for (int i = 6; i <= 1000; i++) {
            if (i == condition) {
                break;
            }
            User user = User.newBuilder()
                    .setId(i)
                    .setName("张三")
                    .setAge(25)
                    .builder();

            // 异步发送
            // hash(key) % partitionNum
            ProducerRecord<String, String> record = new ProducerRecord(
                    topic,                     // topic
                    user.getId().toString(),  // key
                    JSON.toJSONString(user)); // value

            // Callback
            producer.send(record, (metadata, ex) -> {

                if (null != metadata) {
                    String format = String.format("topic:%s,partition:%d,offset:%d", metadata.topic(), metadata.partition(), metadata.offset());
                    System.out.println("RESULT:" + format);
                }

                if (null != ex) {
                    System.out.println(ex.getMessage());
                }
            });
        }

        try {
            // sleep , wait callback end
            TimeUnit.SECONDS.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}