最新消息:天气越来越冷,记得加一件厚衣裳

Flink中KafkaConnector常用设置

Flink w3sun 4560浏览 0评论

Flink中如果使用Apache Kafka需要注意下connector的版本,因为从Kafka0.9/0.10开始部分参数发生了较大变化。以开启自动提交偏移量为例,Apache Kafka 0.8.2之前的参数设置项为auto.commit.enable,从Kafka 0.9.0.0版本开始变更为enable.auto.commit。

此外,Flink Kafka Consumer是对Apache Kafka进行了二次封装,支持发现动态创建的 Kafka 分区并使用精准一次的语义保证去消耗它们。在初始检索分区元数据之后(当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为flink.partition-discovery.interval-millis设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。

Kafka Connector

Maven Dependency
Supported since
Consumer and Producer Class name
Kafka version
Notes
flink-connector-kafka-0.8_2.11
1.0.0
FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x
内部使用SimpleConsumer API, 偏移量由Flink提交至ZK
flink-connector-kafka-0.9_2.11
1.0.0
FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x
使用新的KafkaConsumer API
flink-connector-kafka-0.10_2.11
1.2.0
FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x
生产和消费中带有时间戳的Kafka消息
flink-connector-kafka-0.11_2.11
1.4.0
FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x
 Kafka0.11.x起不再支持scala 2.10。此连接器支持Kafka Producer事务性消息传递,并提供exactly once语义

Producer常用设置

/**
   * Producer setting.
   *
   * @return Producer Properties
   */
  public static Properties getProducerProps() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092");
    /**
     * Producer希望leader返回接受消息后的确认信息. 可选值 all, -1, 0 1. 默认值为1.
     * 1.> acks=0 不需要等待leader尽心确认. 此时retries设置无效. 响应里来自服务端的offset总是-1.
     *     Producer只管发不管发送成功与否。延迟低,容易丢失数据。
     * 2.> acks=1 表示leader写入成功(但是并没有刷新到磁盘)后即向Producer响应。延迟中等,但是一旦
     *     leader副本挂了,就会丢失数据。
     * 3.> acks=all 等待数据完成副本的复制, 等同于-1. 假如需要保证消息不丢失, 需要使用该设置. 同时
     *     需要设置unclean.leader.election.enable为true, 保证当ISR列表为空时, 选择其他存活的副本作为新的leader.
     */
    props.put("acks", "all");
    /**
     * 设置大于零的值时,Producer会发送失败后会进行重试。
     */
    props.put("retries", 0);
    /**
     * Producer批量发送同一个partition消息以减少请求的数量从而提升客户端和服务端的性能,默认大小是16348 byte(16k).
     * 发送到broker的请求可以包含多个batch, 每个batch的数据属于同一个partition,太小的batch会降低吞吐.太大会浪费内存.
     */
    props.put("batch.size", 16384);
    /**
     * batch.size和liner.ms配合使用,前者限制大小后者限制时间。前者条件满足的时候,同一partition的消息会立即发送,
     * 此时linger.ms的设置无效,假如要发送的消息比较少, 则会等待指定的时间以获取更多的消息,此时linger.ms生效.
     * 默认设置为0ms(没有延迟).
     */
    props.put("linger.ms", 1);
    /**
     * Producer可以使用的最大内存来缓存等待发送到server端的消息.默认值33554432 byte(32m)
     */
    props.put("buffer.memory", 33554432);
    props.put("compression.type", "snappy");
    props.put("max.request.size", 10485760);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    return props;
  }

Connsumer常用设置

/**
   * Connsumer setting.
   *
   * @return Connsumer Properties
   */
  public static Properties getConsumerProps() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092");
    props.put("group.id", "test");
    /**
     * Kafka08
     */
    props.put("auto.commit.enable", "true");
    /**
     * kafka010
     */
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    /**
     * 用来限制每次consumer fetch数据的大小限制,只是限制partition的,无法限制到一次拉取的总量。
     */
    props.put("max.partition.fetch.bytes", 10485760);
    props.put("fetch.message.max.bytes", 3072000);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    return props;
  }

常见问题

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
原因是消费者心跳线程必须在此超时之前向服务端发送心跳,否则认为该消费者不再存活被提出消费者组,此时需要增大session.timeout.ms配置项超时时间。

转载请注明:雪后西塘 » Flink中KafkaConnector常用设置

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址