Kafka 详解(三)------Producer生产者

在第一篇博客我们了解到一个kafka系统,通常是生产者Producer将消息发送到Broker,然后消费者Consumer去Broker获取,那么本篇博客我们来介绍什么是生产者Producer。1、生产者概览我们知道一个系统在运行过程中会有很多消息产生,比如前面说的对于一个购物网站,通常会记录用户的活动,网站的运行度量指标以及一些日志消息等等,那么产生这些消息的组件我们都可以称为生产者。而对于生产...

Kafka 详解(三)------Producer生产者

  在第一篇博客我们了解到一个kafka系统,通常是生产者Producer 将消息发送到 Broker,然后消费者 Consumer 去 Broker 获取,那么本篇博客我们来介绍什么是生产者Producer。

1、生产者概览

  我们知道一个系统在运行过程中会有很多消息产生,比如前面说的对于一个购物网站,通常会记录用户的活动,网站的运行度量指标以及一些日志消息等等,那么产生这些消息的组件我们都可以称为生产者。

  而对于生产者产生的消息重要程度又有不同,是否都很重要不允许丢失,是否允许丢失一部分?以及是否有严格的延迟和吞吐量要求?

  对于这些场景在 Kafka 中会有不同的配置,以及不同的 API 使用。

2、生产者发送消息步骤

  下图是生产者向 Kafka 发送消息的主要步骤:

  

  ①、首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必须要声明的,分区和键可以不用指定。

  ②、调用send() 方法进行消息发送。

  ③、因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和 value对象序列化成字节数组。

  ④、接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之后,生产者就知道该往哪个主题和分区发送记录了。

  ⑤、接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。

  ③、Broker成功接收到消息,表示发送成功,返回消息的元数据(包括主题和分区信息以及记录在分区里的偏移量)。发送失败,可以选择重试或者直接抛出异常。

3、Java Producer API

  首先在POM 文件中导入 kafka client。

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version></dependency>

  实例代码:

 1 package com.ys.utils; 2  3 import org.apache.kafka.clients.producer.*; 4 import java.util.Properties; 5  6 /** 7  * Create by YSOcean 8  */ 9 public class KafkaProducerUtils {10 11  public static void main(String[] args) {12Properties kafkaProperties = new Properties();13//配置broker地址信息14kafkaProperties.put("bootstrap.servers", "192.168.146.200:9092,192.168.146.201:9092,192.168.146.202:9092");15//配置 key 的序列化器16kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");17//配置 value 的序列化器18kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");1920//通过上面的配置文件生成 Producer 对象21Producer producer = new KafkaProducer(kafkaProperties);22//生成 ProducerRecord 对象,并制定 Topic,key 以及 value23ProducerRecord<String,String> record =24  new ProducerRecord<String, String>("testTopic","key1","hello Producer");25//发送消息26   producer.send(record);27  }28 }

  通过运行上述代码,我们向名为 testTopic 的主题中发送了一条键为 key1,值为 hello Producer 的消息。

  

4、属性配置

  在上面的实例中,我们配置了如下三个属性:

  ①、bootstrap.servers:该属性指定 brokers 的地址清单,格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其它 broker 的信息。——建议至少提供两个 broker 的信息,因为一旦其中一个宕机,生产者仍然能够连接到集群上。

  ②、key.serializer:将 key 转换为字节数组的配置,必须设定为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会用这个类把键对象序列化为字节数组。——kafka 默认提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。当然也可以自定义序列化器。

  ③、value.serializer:和 key.serializer 一样,用于 value 的序列化。

  以上三个属性是必须要配置的,下面还有一些别的属性可以不用配置,默认。

  ④、acks:此配置指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的,这个参数保障了消息发送的可靠性。默认值为 1。

    一、acks=0。生产者不会等待服务器的反馈,该消息会被立刻添加到 socket buffer 中并认为已经发送完成。也就是说,如果发送过程中发生了问题,导致服务器没有接收到消息,那么生产者也无法知道。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。好处就是由于生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

    二、acks=1。只要集群首领收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领还没有被选举出来),生产者会收到一个错误的响应,为了避免丢失消息,生产者会重发消息(根据配置的retires参数确定重发次数)。不过如果一个没有收到消息的节点成为首领,消息还是会丢失,这个时候的吞吐量取决于使用的是同步发送还是异步发送。

    三、acks=all。只有当集群中参与复制的所有节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,但是延迟最高。

  ⑤、buffer.memory:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。默认值为33554432 字节。如果应用程序发送消息的速度超过发送到服务器的速度,那么会导致生产者内存不足。这个时候,send() 方法会被阻塞,如果阻塞的时间超过了max.block.ms (在kafka0.9版本之前为block.on.buffer.full 参数)配置的时长,则会抛出一个异常。

  ⑥、compression.type:该参数用于配置生产者生成数据时可以压缩的类型,默认值为 none(不压缩)。还可以指定snappy、gzip或lz4等类型,snappy 压缩算法占用较少的 CPU,gzip 压缩算法占用较多的 CPU,但是压缩比最高,如果网络带宽比较有限,可以使用该算法,使用压缩可以降低网络传输开销和存储开销,这往往是 kafka 发送消息的瓶颈所在。

  ⑦、retires:该参数用于配置当生产者发送消息到服务器失败,服务器返回错误响应时,生产者可以重发消息的次数,如果达到了这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过 retry.backoff.on 参数来改变这个时间间隔。

  还有一些属性配置,可以参考官网:http://kafka.apachecn.org/documentation.html#producerconfigs

5、序列化器

  前面我们介绍过,消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。

  ①、默认序列化器

  Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。

  下面是Kafka 实现的字符串序列化器 StringSerializer:

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by Fernflower decompiler)//package org.apache.kafka.common.serialization;import java.io.UnsupportedEncodingException;import java.util.Map;import org.apache.kafka.common.errors.SerializationException;public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; public StringSerializer() { } public void configure(Map<String, ?> configs, boolean isKey) {  String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";  Object encodingValue = configs.get(propertyName);  if (encodingValue == null) {encodingValue = configs.get("serializer.encoding");  }  if (encodingValue instanceof String) {this.encoding = (String)encodingValue;  } } public byte[] serialize(String topic, String data) {  
源文地址:https://www.guoxiongfei.cn/cntech/9866.html