目录
前言
在实时数仓方面除了直接将数据写入Druid,Kafka等相关的消息队列以外,Flink还支持将数据写出到HDFS上并以Parquet格式进行存储,本案例简单地介绍了整个过程的实现,为后续实时数仓或者其他业务线提供参考。
依赖
除了Flink所依赖的常规Jar包以外,还需要提供以下依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.5.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency>
编码
Flink同时支持Java和Scala,本文中采用Scala进行示例:
package com.w3sun.flink.demo
import com.alibaba.fastjson.JSON
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import scala.collection.mutable.ListBuffer
import scala.util.Random
/**
* @author: w3sun
* @date: 2019/3/3 20:49
* @description:
*
*/
object String2Parquet {
//模拟生成数据
def generateSource(out: SourceContext[String]): Unit = {
val lb = new ListBuffer[String]
lb += "{\"time\":\"2018-12-10\",\"id\":\"1\",\"content\":\"realtime\"}"
lb += "{\"time\":\"2019-12-10\",\"id\":\"2\",\"content\":\"batch\"}"
lb += "{\"time\":\"2020-12-10\",\"id\":\"3\",\"content\":\"so what\"}"
while (true) {
val index = Random.nextInt(3)
Thread.sleep(200)
out.collect(lb(index))
}
}
def main(args: Array[String]): Unit = {
//参数校验
val params = ParameterTool.fromArgs(args)
val output = params.get("output")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
val timeAssigner = new DateTimeBucketAssigner[LogEvent]("yyyy-MM-dd-HH-mm")
val source: DataStream[LogEvent] = env.addSource(generateSource _)
.map(line => {
val event: LogEvent = JSON.parseObject(line, classOf[LogEvent])
event
})
val sink = StreamingFileSink.forBulkFormat(new Path(output),
ParquetAvroWriters.forReflectRecord(classOf[LogEvent])).withBucketAssigner(timeAssigner).build()
source.addSink(sink).name("String2Parquet")
env.execute(this.getClass.getSimpleName)
}
//case class 默认实现Serializable
case class LogEvent(time: String, id: String, content: String) {
override def toString: String = time + "\t" + id + "\t" + content
}
}
package com.w3sun.flink.demo
import com.alibaba.fastjson.JSON
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import scala.collection.mutable.ListBuffer
import scala.util.Random
/**
* @author: w3sun
* @date: 2019/3/3 20:49
* @description:
*
*/
object String2Parquet {
//模拟生成数据
def generateSource(out: SourceContext[String]): Unit = {
val lb = new ListBuffer[String]
lb += "{\"time\":\"2018-12-10\",\"id\":\"1\",\"content\":\"realtime\"}"
lb += "{\"time\":\"2019-12-10\",\"id\":\"2\",\"content\":\"batch\"}"
lb += "{\"time\":\"2020-12-10\",\"id\":\"3\",\"content\":\"so what\"}"
while (true) {
val index = Random.nextInt(3)
Thread.sleep(200)
out.collect(lb(index))
}
}
def main(args: Array[String]): Unit = {
//参数校验
val params = ParameterTool.fromArgs(args)
val output = params.get("output")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
val timeAssigner = new DateTimeBucketAssigner[LogEvent]("yyyy-MM-dd-HH-mm")
val source: DataStream[LogEvent] = env.addSource(generateSource _)
.map(line => {
val event: LogEvent = JSON.parseObject(line, classOf[LogEvent])
event
})
val sink = StreamingFileSink.forBulkFormat(new Path(output),
ParquetAvroWriters.forReflectRecord(classOf[LogEvent])).withBucketAssigner(timeAssigner).build()
source.addSink(sink).name("String2Parquet")
env.execute(this.getClass.getSimpleName)
}
//case class 默认实现Serializable
case class LogEvent(time: String, id: String, content: String) {
override def toString: String = time + "\t" + id + "\t" + content
}
}
package com.w3sun.flink.demo import com.alibaba.fastjson.JSON import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.core.fs.Path import org.apache.flink.formats.parquet.avro.ParquetAvroWriters import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import scala.collection.mutable.ListBuffer import scala.util.Random /** * @author: w3sun * @date: 2019/3/3 20:49 * @description: * */ object String2Parquet { //模拟生成数据 def generateSource(out: SourceContext[String]): Unit = { val lb = new ListBuffer[String] lb += "{\"time\":\"2018-12-10\",\"id\":\"1\",\"content\":\"realtime\"}" lb += "{\"time\":\"2019-12-10\",\"id\":\"2\",\"content\":\"batch\"}" lb += "{\"time\":\"2020-12-10\",\"id\":\"3\",\"content\":\"so what\"}" while (true) { val index = Random.nextInt(3) Thread.sleep(200) out.collect(lb(index)) } } def main(args: Array[String]): Unit = { //参数校验 val params = ParameterTool.fromArgs(args) val output = params.get("output") val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) val timeAssigner = new DateTimeBucketAssigner[LogEvent]("yyyy-MM-dd-HH-mm") val source: DataStream[LogEvent] = env.addSource(generateSource _) .map(line => { val event: LogEvent = JSON.parseObject(line, classOf[LogEvent]) event }) val sink = StreamingFileSink.forBulkFormat(new Path(output), ParquetAvroWriters.forReflectRecord(classOf[LogEvent])).withBucketAssigner(timeAssigner).build() source.addSink(sink).name("String2Parquet") env.execute(this.getClass.getSimpleName) } //case class 默认实现Serializable case class LogEvent(time: String, id: String, content: String) { override def toString: String = time + "\t" + id + "\t" + content } }
效果
total 0
drwxr-xr-x 6 w3sun staff 192 Mar 2 17:57 .
drwxr-xr-x 4 w3sun staff 128 Mar 2 17:54 ..
drwxr-xr-x 16 w3sun staff 512 Mar 2 17:55 2019-03-02-17-54
drwxr-xr-x 242 w3sun staff 7744 Mar 2 17:56 2019-03-02-17-55
drwxr-xr-x 244 w3sun staff 7808 Mar 2 17:57 2019-03-02-17-56
drwxr-xr-x 25 w3sun staff 800 Mar 2 17:57 2019-03-02-17-57
-rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-0-0
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-1
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-2
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-0-3
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-1-0
-rw-r--r-- 1 w3sun staff 787 Mar 2 17:54 part-1-1
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-1-2
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-0
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-1
-rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-2-2
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-3-0
-rw-r--r-- 1 w3sun staff 858 Mar 2 17:54 part-3-1
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-3-2
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-3-3
total 0
drwxr-xr-x 6 w3sun staff 192 Mar 2 17:57 .
drwxr-xr-x 4 w3sun staff 128 Mar 2 17:54 ..
drwxr-xr-x 16 w3sun staff 512 Mar 2 17:55 2019-03-02-17-54
drwxr-xr-x 242 w3sun staff 7744 Mar 2 17:56 2019-03-02-17-55
drwxr-xr-x 244 w3sun staff 7808 Mar 2 17:57 2019-03-02-17-56
drwxr-xr-x 25 w3sun staff 800 Mar 2 17:57 2019-03-02-17-57
-rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-0-0
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-1
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-2
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-0-3
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-1-0
-rw-r--r-- 1 w3sun staff 787 Mar 2 17:54 part-1-1
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-1-2
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-0
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-1
-rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-2-2
-rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-3-0
-rw-r--r-- 1 w3sun staff 858 Mar 2 17:54 part-3-1
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-3-2
-rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-3-3
total 0 drwxr-xr-x 6 w3sun staff 192 Mar 2 17:57 . drwxr-xr-x 4 w3sun staff 128 Mar 2 17:54 .. drwxr-xr-x 16 w3sun staff 512 Mar 2 17:55 2019-03-02-17-54 drwxr-xr-x 242 w3sun staff 7744 Mar 2 17:56 2019-03-02-17-55 drwxr-xr-x 244 w3sun staff 7808 Mar 2 17:57 2019-03-02-17-56 drwxr-xr-x 25 w3sun staff 800 Mar 2 17:57 2019-03-02-17-57 -rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-0-0 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-1 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-0-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-0-3 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-1-0 -rw-r--r-- 1 w3sun staff 787 Mar 2 17:54 part-1-1 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-1-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-0 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-2-1 -rw-r--r-- 1 w3sun staff 784 Mar 2 17:54 part-2-2 -rw-r--r-- 1 w3sun staff 838 Mar 2 17:54 part-3-0 -rw-r--r-- 1 w3sun staff 858 Mar 2 17:54 part-3-1 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:54 part-3-2 -rw-r--r-- 1 w3sun staff 867 Mar 2 17:55 part-3-3
可以看到,数据按照分钟级别生成文件夹,然后再生成多个文件。
注意事项
Streaming到Parquet数据的生成是由Checkpoint触发的,因此必须设置Checkpoint为Enable状态,至于数据刷盘时间根据业务线数据量不同自行决定。
转载请注明:雪后西塘 » Flink落盘Parquet文件