目录
Flink中如果使用Apache Kafka需要注意下connector的版本,因为从Kafka0.9/0.10开始部分参数发生了较大变化。以开启自动提交偏移量为例,Apache Kafka 0.8.2之前的参数设置项为auto.commit.enable,从Kafka 0.9.0.0版本开始变更为enable.auto.commit。
此外,Flink Kafka Consumer是对Apache Kafka进行了二次封装,支持发现动态创建的 Kafka 分区并使用精准一次的语义保证去消耗它们。在初始检索分区元数据之后(当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为flink.partition-discovery.interval-millis设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。
Kafka Connector
Maven Dependency
|
Supported since
|
Consumer and Producer Class name
|
Kafka version
|
Notes
|
flink-connector-kafka-0.8_2.11
|
1.0.0
|
FlinkKafkaConsumer08
FlinkKafkaProducer08
|
0.8.x
|
内部使用SimpleConsumer API, 偏移量由Flink提交至ZK
|
flink-connector-kafka-0.9_2.11
|
1.0.0
|
FlinkKafkaConsumer09
FlinkKafkaProducer09
|
0.9.x
|
使用新的KafkaConsumer API
|
flink-connector-kafka-0.10_2.11
|
1.2.0
|
FlinkKafkaConsumer010
FlinkKafkaProducer010
|
0.10.x
|
生产和消费中带有时间戳的Kafka消息
|
flink-connector-kafka-0.11_2.11
|
1.4.0
|
FlinkKafkaConsumer011
FlinkKafkaProducer011
|
0.11.x
|
Kafka0.11.x起不再支持scala 2.10。此连接器支持Kafka Producer事务性消息传递,并提供exactly once语义
|
Producer常用设置
/** * Producer setting. * * @return Producer Properties */ public static Properties getProducerProps() { Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092"); /** * Producer希望leader返回接受消息后的确认信息. 可选值 all, -1, 0 1. 默认值为1. * 1.> acks=0 不需要等待leader尽心确认. 此时retries设置无效. 响应里来自服务端的offset总是-1. * Producer只管发不管发送成功与否。延迟低,容易丢失数据。 * 2.> acks=1 表示leader写入成功(但是并没有刷新到磁盘)后即向Producer响应。延迟中等,但是一旦 * leader副本挂了,就会丢失数据。 * 3.> acks=all 等待数据完成副本的复制, 等同于-1. 假如需要保证消息不丢失, 需要使用该设置. 同时 * 需要设置unclean.leader.election.enable为true, 保证当ISR列表为空时, 选择其他存活的副本作为新的leader. */ props.put("acks", "all"); /** * 设置大于零的值时,Producer会发送失败后会进行重试。 */ props.put("retries", 0); /** * Producer批量发送同一个partition消息以减少请求的数量从而提升客户端和服务端的性能,默认大小是16348 byte(16k). * 发送到broker的请求可以包含多个batch, 每个batch的数据属于同一个partition,太小的batch会降低吞吐.太大会浪费内存. */ props.put("batch.size", 16384); /** * batch.size和liner.ms配合使用,前者限制大小后者限制时间。前者条件满足的时候,同一partition的消息会立即发送, * 此时linger.ms的设置无效,假如要发送的消息比较少, 则会等待指定的时间以获取更多的消息,此时linger.ms生效. * 默认设置为0ms(没有延迟). */ props.put("linger.ms", 1); /** * Producer可以使用的最大内存来缓存等待发送到server端的消息.默认值33554432 byte(32m) */ props.put("buffer.memory", 33554432); props.put("compression.type", "snappy"); props.put("max.request.size", 10485760); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return props; }
Connsumer常用设置
/** * Connsumer setting. * * @return Connsumer Properties */ public static Properties getConsumerProps() { Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092"); props.put("group.id", "test"); /** * Kafka08 */ props.put("auto.commit.enable", "true"); /** * kafka010 */ props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); /** * 用来限制每次consumer fetch数据的大小限制,只是限制partition的,无法限制到一次拉取的总量。 */ props.put("max.partition.fetch.bytes", 10485760); props.put("fetch.message.max.bytes", 3072000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); return props; }
常见问题
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
原因是消费者心跳线程必须在此超时之前向服务端发送心跳,否则认为该消费者不再存活被提出消费者组,此时需要增大session.timeout.ms配置项超时时间。
转载请注明:雪后西塘 » Flink中KafkaConnector常用设置