最新消息:天气越来越冷,记得加一件厚衣裳

Flink落盘Parquet文件

Flink w3sun 4642浏览 0评论

前言

在实时数仓方面除了直接将数据写入Druid,Kafka等相关的消息队列以外,Flink还支持将数据写出到HDFS上并以Parquet格式进行存储,本案例简单地介绍了整个过程的实现,为后续实时数仓或者其他业务线提供参考。

依赖

除了Flink所依赖的常规Jar包以外,还需要提供以下依赖:

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.6.0-cdh5.5.0</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-parquet</artifactId>
 <version>1.7.1</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-avro</artifactId>
 <version>1.7.1</version>
</dependency>
<dependency>
 <groupId>org.apache.parquet</groupId>
 <artifactId>parquet-avro</artifactId>
 <version>1.10.0</version>
</dependency>

编码

Flink同时支持Java和Scala,本文中采用Scala进行示例:

package com.w3sun.flink.demo


import com.alibaba.fastjson.JSON
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._

import scala.collection.mutable.ListBuffer
import scala.util.Random

/**
  * @author: w3sun
  * @date: 2019/3/3 20:49
  * @description:
  *
  */
object String2Parquet {
  //模拟生成数据
  def generateSource(out: SourceContext[String]): Unit = {
    val lb = new ListBuffer[String]
    lb += "{\"time\":\"2018-12-10\",\"id\":\"1\",\"content\":\"realtime\"}"
    lb += "{\"time\":\"2019-12-10\",\"id\":\"2\",\"content\":\"batch\"}"
    lb += "{\"time\":\"2020-12-10\",\"id\":\"3\",\"content\":\"so what\"}"
    while (true) {
      val index = Random.nextInt(3)
      Thread.sleep(200)
      out.collect(lb(index))
    }
  }

  def main(args: Array[String]): Unit = {
    //参数校验
    val params = ParameterTool.fromArgs(args)
    val output = params.get("output")
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
    val timeAssigner = new DateTimeBucketAssigner[LogEvent]("yyyy-MM-dd-HH-mm")
    val source: DataStream[LogEvent] = env.addSource(generateSource _)
      .map(line => {
        val event: LogEvent = JSON.parseObject(line, classOf[LogEvent])
        event
      })
    val sink = StreamingFileSink.forBulkFormat(new Path(output),
      ParquetAvroWriters.forReflectRecord(classOf[LogEvent])).withBucketAssigner(timeAssigner).build()
    source.addSink(sink).name("String2Parquet")
    env.execute(this.getClass.getSimpleName)
  }

  //case class 默认实现Serializable
  case class LogEvent(time: String, id: String, content: String) {
    override def toString: String = time + "\t" + id + "\t" + content
  }
}

效果

total 0
drwxr-xr-x    6 w3sun  staff   192 Mar  2 17:57 .
drwxr-xr-x    4 w3sun  staff   128 Mar  2 17:54 ..
drwxr-xr-x   16 w3sun  staff   512 Mar  2 17:55 2019-03-02-17-54
drwxr-xr-x  242 w3sun  staff  7744 Mar  2 17:56 2019-03-02-17-55
drwxr-xr-x  244 w3sun  staff  7808 Mar  2 17:57 2019-03-02-17-56
drwxr-xr-x   25 w3sun  staff   800 Mar  2 17:57 2019-03-02-17-57
-rw-r--r--   1 w3sun  staff  784 Mar  2 17:54 part-0-0
-rw-r--r--   1 w3sun  staff  838 Mar  2 17:54 part-0-1
-rw-r--r--   1 w3sun  staff  838 Mar  2 17:54 part-0-2
-rw-r--r--   1 w3sun  staff  867 Mar  2 17:55 part-0-3
-rw-r--r--   1 w3sun  staff  867 Mar  2 17:54 part-1-0
-rw-r--r--   1 w3sun  staff  787 Mar  2 17:54 part-1-1
-rw-r--r--   1 w3sun  staff  838 Mar  2 17:54 part-1-2
-rw-r--r--   1 w3sun  staff  867 Mar  2 17:54 part-2-0
-rw-r--r--   1 w3sun  staff  867 Mar  2 17:54 part-2-1
-rw-r--r--   1 w3sun  staff  784 Mar  2 17:54 part-2-2
-rw-r--r--   1 w3sun  staff  838 Mar  2 17:54 part-3-0
-rw-r--r--   1 w3sun  staff  858 Mar  2 17:54 part-3-1
-rw-r--r--   1 w3sun  staff  867 Mar  2 17:54 part-3-2
-rw-r--r--   1 w3sun  staff  867 Mar  2 17:55 part-3-3

可以看到,数据按照分钟级别生成文件夹,然后再生成多个文件。

注意事项

Streaming到Parquet数据的生成是由Checkpoint触发的,因此必须设置Checkpoint为Enable状态,至于数据刷盘时间根据业务线数据量不同自行决定。

转载请注明:雪后西塘 » Flink落盘Parquet文件

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址