目录
前言
在实时数仓方面除了直接将数据写入Druid,Kafka等相关的消息队列以外,Flink还支持将数据写出到HDFS上并以Parquet格式进行存储,本案例简单地介绍了整个过程的实现,为后续实时数仓或者其他业务线提供参考。
依赖
除了Flink所依赖的常规Jar包以外,还需要提供以下依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.5.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency>
编码
Flink同时支持Java和Scala,本文中采用Scala进行示例:
package com.w3sun.flink.demo
import com.alibaba.fastjson.JSON
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import scala.collection.mutable.ListBuffer
import scala.util.Random
/**
* @author: w3sun
* @date: 2019/3/3 20:49
* @description:
*
*/
object String2Parquet {
//模拟生成数据
def generateSource(out: SourceContext[String]): Unit = {
val lb = new ListBuffer[String]
lb += "{\"time\":\"2018-12-10\",\"id\":\"1\",\"content\":\"realtime\"}"
lb += "{\"time\":\"2019-12-10\",\"id\":\"2\",\"content\":\"batch\"}"
lb += "{\"time\":\"2020-12-10\",\"id\":\"3\",\"content\":\"so what\"}"
while (true) {
val index = Random.nextInt(3)
Thread.sleep(200)
out.collect(lb(index))
}
}
def main(args: Array[String]): Unit = {
//参数校验
val params = ParameterTool.fromArgs(args)
val output = params.get("output")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
val timeAssigner = new DateTimeBucketAssigner[LogEvent]("yyyy-MM-dd-HH-mm")
val source: DataStream[LogEvent] = env.addSource(generateSource _)
.map(line => {
val event: LogEvent = JSON.parseObject(line, classOf[LogEvent])
event
})
val sink = StreamingFileSink.forBulkFormat(new Path(output),
ParquetAvroWriters.forReflectRecord(classOf[LogEvent])).withBucketAssigner(timeAssigner).build()
source.addSink(sink).name("String2Parquet")
env.execute(this.getClass.getSimpleName)
}
//case class 默认实现Serializable
case class LogEvent(time: String, id: String, content: String) {
override def toString: String = time + "\t" + id + "\t" + content
}
}
效果
total 0 drwxr-xr-x 6 w3sun staff 192 Mar 2 17:57 . drwxr-xr-x 4 w3sun staff 128 Mar 2 17:54 .. drwxr-xr-x 16 w3sun staff 512 Mar 2 17:55 2019-03-02-17-54 drwxr-xr-x 242 w3sun staff 7744 Mar 2 17:56 2019-03-02-17-55 drwxr-xr-x 244 w3sun staff 7808 Mar 2 17:57 2019-03-02-17-56 drwxr-xr-x 25 w3sun staff 800 Mar 2 17:57 2019-03-02-17-57 -rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-0-0 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-1 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-0-3 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-1-0 -rw-r--r-- 1 w3sun staff 787 Mar 2 17:54 part-1-1 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-1-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-0 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-1 -rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-2-2 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-3-0 -rw-r--r-- 1 w3sun staff 858 Mar 2 17:54 part-3-1 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-3-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-3-3
可以看到,数据按照分钟级别生成文件夹,然后再生成多个文件。
注意事项
Streaming到Parquet数据的生成是由Checkpoint触发的,因此必须设置Checkpoint为Enable状态,至于数据刷盘时间根据业务线数据量不同自行决定。
转载请注明:雪后西塘 » Flink落盘Parquet文件