Flink中KafkaConnector常用设置

2018年12月1日 0 条评论 3.54k 次阅读 0 人点赞

Flink中如果使用Apache Kafka需要注意下connector的版本,因为从Kafka0.9/0.10开始部分参数发生了较大变化。以开启自动提交偏移量为例,Apache Kafka 0.8.2之前的参数设置项为auto.commit.enable,从Kafka 0.9.0.0版本开始变更为enable.auto.commit

此外,Flink Kafka Consumer是对Apache Kafka进行了二次封装,支持发现动态创建的 Kafka 分区并使用精准一次的语义保证去消耗它们。在初始检索分区元数据之后(当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为flink.partition-discovery.interval-millis设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。

Kafka Connector

Maven Dependency
Supported since
Consumer and Producer Class name
Kafka version
Notes
flink-connector-kafka-0.8_2.11
1.0.0
FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x
内部使用SimpleConsumer API, 偏移量由Flink提交至ZK
flink-connector-kafka-0.9_2.11
1.0.0
FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x
使用新的KafkaConsumer API
flink-connector-kafka-0.10_2.11
1.2.0
FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x
生产和消费中带有时间戳的Kafka消息
flink-connector-kafka-0.11_2.11
1.4.0
FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x
 Kafka0.11.x起不再支持scala 2.10。此连接器支持Kafka Producer事务性消息传递,并提供exactly once语义

Producer常用设置

Connsumer常用设置

常见问题

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
原因是消费者心跳线程必须在此超时之前向服务端发送心跳,否则认为该消费者不再存活被提出消费者组,此时需要增大session.timeout.ms配置项超时时间。

勇敢,和生活的艰难无关。

文章评论(0)