目录
前言
在实时数仓方面除了直接将数据写入Druid,Kafka等相关的消息队列以外,Flink还支持将数据写出到HDFS上并以Parquet格式进行存储,本案例简单地介绍了整个过程的实现,为后续实时数仓或者其他业务线提供参考。
依赖
除了Flink所依赖的常规Jar包以外,还需要提供以下依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.5.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency>
编码
Flink同时支持Java和Scala,本文中采用Scala进行示例:
package com.w3sun.flink.demo import com.alibaba.fastjson.JSON import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.Path import org.apache.flink.formats.parquet.avro.ParquetAvroWriters import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import scala.collection.mutable.ListBuffer import scala.util.Random /** * @author: w3sun * @date: 2019/3/3 20:49 * @description: * */ object String2Parquet { //模拟生成数据 def generateSource(out: SourceContext[String]): Unit = { val lb = new ListBuffer[String] lb += "{\"time\":\"2018-12-10\",\"id\":\"1\",\"content\":\"realtime\"}" lb += "{\"time\":\"2019-12-10\",\"id\":\"2\",\"content\":\"batch\"}" lb += "{\"time\":\"2020-12-10\",\"id\":\"3\",\"content\":\"so what\"}" while (true) { val index = Random.nextInt(3) Thread.sleep(200) out.collect(lb(index)) } } def main(args: Array[String]): Unit = { //参数校验 val params = ParameterTool.fromArgs(args) val output = params.get("output") val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) val timeAssigner = new DateTimeBucketAssigner[LogEvent]("yyyy-MM-dd-HH-mm") val source: DataStream[LogEvent] = env.addSource(generateSource _) .map(line => { val event: LogEvent = JSON.parseObject(line, classOf[LogEvent]) event }) val sink = StreamingFileSink.forBulkFormat(new Path(output), ParquetAvroWriters.forReflectRecord(classOf[LogEvent])).withBucketAssigner(timeAssigner).build() source.addSink(sink).name("String2Parquet") env.execute(this.getClass.getSimpleName) } //case class 默认实现Serializable case class LogEvent(time: String, id: String, content: String) { override def toString: String = time + "\t" + id + "\t" + content } }
效果
total 0 drwxr-xr-x 6 w3sun staff 192 Mar 2 17:57 . drwxr-xr-x 4 w3sun staff 128 Mar 2 17:54 .. drwxr-xr-x 16 w3sun staff 512 Mar 2 17:55 2019-03-02-17-54 drwxr-xr-x 242 w3sun staff 7744 Mar 2 17:56 2019-03-02-17-55 drwxr-xr-x 244 w3sun staff 7808 Mar 2 17:57 2019-03-02-17-56 drwxr-xr-x 25 w3sun staff 800 Mar 2 17:57 2019-03-02-17-57 -rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-0-0 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-1 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-0-3 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-1-0 -rw-r--r-- 1 w3sun staff 787 Mar 2 17:54 part-1-1 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-1-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-0 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-1 -rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-2-2 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-3-0 -rw-r--r-- 1 w3sun staff 858 Mar 2 17:54 part-3-1 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-3-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-3-3
可以看到,数据按照分钟级别生成文件夹,然后再生成多个文件。
注意事项
Streaming到Parquet数据的生成是由Checkpoint触发的,因此必须设置Checkpoint为Enable状态,至于数据刷盘时间根据业务线数据量不同自行决定。
转载请注明:雪后西塘 » Flink落盘Parquet文件