Flink

概述

概念

简介

  • 是一个框架分布式处理引擎,用于对无界和有界数据流进行有状态计算

    Flink 框架处理流程

核心特性

  • 高吞吐和低延迟:每秒处理数百万个事件,毫秒级延迟
  • 结果的准确性:提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果
  • 精确一次(exactly-once)的状态一致性保证
  • 可以连接到最常用的存储系统:如Apache Kafka、Apache Cassandra、Elasticsearch、JDBC、Kinesis和(分布式)文件系统,如HDFS和S3
  • 高可用,支持动态扩展:本身高可用的设置,加上与K8s,YARN和Mesos的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink能做到以极少的停机时间7×24全天候运行
  • 能够更新应用程序代码并将作业(jobs)迁移到不同的Flink集群,而不会丢失应用程序的状态

分层API

分层API

  • 越顶层越抽象,表达含义越简明,使用越方面
  • 越底层越具体,表达能力越丰富,使用越灵活

Spark

  • 以批处理为根本,并尝试在批处理之上支持流计算;在Spark的世界观中,万物皆批次,离线数据是一个大批次,而实时数据则是由一个一个无限的小批次组成的
  • 底层数据模型是弹性分布式数据集(RDD),Spark Streaming进行微批处理的底层接口DStream,实际上处理的也是一组组小批数据RDD的集合
  • 需要将任务对应的DAG划分阶段(Stage),一个完成后经过shuffle再进行下一阶段的计算

Spark Streaming流处理示意

  • 在Flink的世界观中,万物皆流,实时数据是标准的、没有界限的流,而离线数据则是有界限的流
  • 基本数据模型是数据流(DataFlow),以及事件(Event)序列
  • 一个事件在一个节点处理完后可以直接发往下一个节点进行处理

有界流与无界流

快速上手

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.5</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.5</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.5</version>
</dependency>
1
2
3
4
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

代码

批处理

  • 这种代码的实现方式,是基于DataSet API

  • 官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理

    1
    $ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从文件读取
DataSource<String> lineDS = env.readTextFile("data/flink/input/words.txt");
// 将每行数据进行分词,转换成二元组类型
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDS
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
for (String word : line.split(" ")) {
// 将每个单词转换成二元组输出
out.collect(Tuple2.of(word, 1L));
}
// 解决泛型擦除
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 按word进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
// 分组内进行聚合
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
sum.print();

有界流处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取文件
DataStreamSource<String> lineDSS = env.readTextFile("data/flink/input/words.txt");
// 转换计算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDSS
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOneTuple.keyBy(data -> data.f0);
// 聚合
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKS.sum(1);
// 打印
sum.print();
// 启动执行
env.execute();

无界流处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从参数中提取主机名和端口号: --host [主机名] --port [端口号]
ParameterTool params = ParameterTool.fromArgs(args);
String host = params.get("host");
String port = params.get("port");
// 读取文本流
DataStreamSource<String> lineDSS = env.socketTextStream("localhost", 8080);
// 转换计算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDSS
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOneTuple.keyBy(data -> data.f0);
// 聚合
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKS.sum(1);
// 打印
sum.print();
// 启动执行
env.execute();

部署

概述

关键组件

  • 客户端(Client):获取代码并做转换后提交给JobManger
  • 作业管理器(JobManager):对作业进行中央调度管理(进一步处理转换)后分发任务给众多的TaskManager
  • 任务管理器(TaskManager):数据的处理

Flink集群中的主要组件

入门操作

本地模式

集群模式

  • 角色分配

    节点服务器 bigdata1 bigdata2 Bigdata3
    角色 JobManager TaskManager TaskManager
  • 配置文件

    • 修改conf/flink-conf.yaml文件

      1
      2
      # JobManager节点地址.
      jobmanager.rpc.address: bigdata1
    • 修改conf/workers文件

      1
      2
      bigdata2
      bigdata3
    • 分发

  • 启动集群

    • 在bigdata1节点服务器上执行start-cluster.sh启动Flink集群

      1
      $ bin/start-cluster.sh
  • Web UIhttp://bigdata1:8081

提交作业

  • 程序打包

    • 为方便自定义结构和定制依赖,可引入插件maven-assembly-plugin进行打包
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    </configuration>
    <executions>
    <execution>
    <id>make-assembly</id>
    <phase>package</phase>
    <goals>
    <goal>single</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>
  • Web提交作业

  • CLI提交作业

    1
    2
    3
    4
    $ bin/flink run -m bigdata1:8081 -c com.mousse.flink.WordCount ./Flink-1.0-SNAPSHOT.jar

    # 取消作业
    $ bin/flink cancel [Job ID]
    • –m:指定提交到的JobManager
    • -c:指定入口类

部署模式

概述

  • 在一些应用场景中对于集群资源分配和占用的方式可能会有特定的需求,Flink为各种场景提供了不同的部署模式
  • 主要区别
    • 集群的生命周期以及资源的分配方式
    • 应用的main方法执行位置
  • 会话模式:集群的生命周期独立于集群上运行的任何作业的生命周期,提交的所有作业共享资源
  • 单作业模式:为每个提交的作业创建一个集群,带来了更好的资源隔离,这时集群的生命周期与作业的生命周期绑定
  • 应用模式:为每个应用程序创建一个会话集群,在JobManager上直接调用应用程序的main()方法

会话模式

  • 需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业
  • 集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源
  • 作业结束了就释放资源,资源不够时提交新的作业就会失败
  • 同一个TaskManager上可能运行了很多作业,如果其中一个发生故障导致TaskManager宕机,那么所有作业都会受到影响
  • 适合单个规模小、执行时间短的大量作业

会话模式

单作业模式

  • 为每个提交的作业启动一个集群,更好地隔离资源
  • 客户端运行应用程序后启动集群,作业被提交给JobManager,进而分发给TaskManager执行。作业作业完成后集群就会关闭,所有资源也会释放
  • 每个作业都有它自己的JobManager管理,占用独享的资源。即使发生故障,它的TaskManager宕机也不会影响其他作业
  • 单作业模式在生产环境运行更加稳定,是实际应用的首选模式
  • Flink本身无法直接这样运行,一般需要借助一些资源管理框架来启动集群,如YARN、Kubernetes

单作业模式

应用模式

  • 前两种模式下,应用代码都在客户端上执行,然后由客户端提交给JobManager。客户端需要占用大量网络带宽去下载依赖和把二进制数据发送给JobManager;加上很多情况下提交作业用的是同一个客户端,会加重客户端所在节点的资源消耗
  • 应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群
  • Jar包与集群1对1

应用模式

独立模式

  • 所需要的所有Flink组件,都只是操作系统上运行的一个JVM进程
  • 不依赖任何外部的资源管理平台
  • 如果资源不足或出现故障,没有自动扩展或重分配资源的保证,必须手动处理
  • 可以将独立模式的集群放在容器中运行。Flink提供了独立模式的容器化部署方式,可在Docker或Kubernetes上进行部署
  • 无法直接以单作业方式启动集群

Yarn模式

概念

  • 客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上Flink会部署JobManager和TaskManager的实例,从而启动集群
  • Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源

配置

  • 环境变量

    • 必须保证设置了环境变量HADOOP_CLASSPATH
    1
    2
    3
    4
    5
    # HADOOP_HOME
    export HADOOP_HOME=/hadoop
    export PATH=$PATH:$HADOOP_HOME/bin
    export PATH=$PATH:$HADOOP_HOME/sbin
    export HADOOP_CLASSPATH=`hadoop classpath`
  • 启动Hadoop集群,包括HDFS和YARN

部署

会话模式

  • 启动集群

    • 启动hadoop集群(HDFS,YARN)

    • 执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群

      1
      $ bin/yarn-session.sh -nm test
      • -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行
      • -jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB
      • -nm(--name):配置在YARN UI界面上显示的任务名
      • -qu(--queue):指定YARN队列名
      • -tm(--taskManager):配置每个TaskManager所使用内存
    • Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的

    • YARN Session启动之后会给出一个web UI地址以及一个YARN application ID,用户可以通过web UI或者命令行两种方式提交作业

  • 提交作业

    • 通过Web UI提交

    • 通过命令行提交

      1. 将JAR包上传至集群

      2. 将该务提交到已经开启的Yarn-Session中运行

        1
        $ bin/flink run -c cc.mousse.WordCount Flink-1.0-SNAPSHOT.jar
        • 客户端可自行确定JobManager的地址,也可以通过-m-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到
    • 创建的Yarn-Session实际上是一个Yarn的Application,并且有唯一的Application ID

单作业模式

  • 提交

    1
    $ bin/flink run -d -t yarn-per-job -c cc.mousse.WordCount Flink-1.0-SNAPSHOT.jar
    1
    2
    # 通过参数-m yarn-cluster指定向YARN集群提交任务
    $ bin/flink run -m yarn-cluster -c cc.mousse.WordCount Flink-1.0-SNAPSHOT.jar
  • 取消

    1
    2
    3
    4
    # application_XXXX_YY是当前应用的ID,<jobId>是作业的ID
    # 如果取消作业,整个Flink集群也会停掉
    $ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
    $ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

应用模式

  • 提交

    1
    $ bin/flink run-application -t yarn-application -c cc.mousse.WordCount Flink-1.0-SNAPSHOT.jar
  • 查看

    1
    $ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
  • 取消

    1
    $ ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
  • 可以通过yarn.provided.lib.dirs配置选项指定位置,将Jar上传到远程,这种方式下Jar可以预先上传到HDFS,而不需要单独发送到集群,使作业提交更加轻量

    1
    $ ./bin/flink run-application -t yarn-application	-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"	hdfs://myhdfs/jars/my-application.jar

高可用

K8S模式

架构

系统架构

整体构成

整体构成

JobManager

概念

  • 作业管理器是一个Flink集群中任务管理和调度的核心,是控制应用执行的主进程
  • 每个应用都应该被唯一的JobManager所控制执行

JobMaster

  • JobManager中最核心的组件,负责处理单独的作业(Job)(JobMaster和Job一一对应)
  • 在作业提交时JobMaster会先接收到要执行的应用
    • 应用一般是客户端提交来的,包括:Jar包,数据流图(dataflow graph),和作业图(JobGraph)
  • JobMaster把JobGraph转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),包含了所有可以并发执行的任务
  • JobMaster会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上
  • 在运行过程中,JobMaster会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调

ResourceManager

  • 资源管理器主要负责资源的分配和管理,在Flink集群中只有一个
  • 资源主要指TaskManager的任务槽(task slots)
  • 任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行

Dispatcher

  • 分发器主要负责提供一个REST接口,用来提交应用
  • 会启动一个Web UI,用来方便地展示和监控作业执行的信息
  • 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉

TaskManager

  • 任务管理器是Flink中的工作进程,通常会有多个TaskManager运行,每一个TaskManager都包含了一定数量的任务槽(task slots)。Slot是资源调度的最小单位,slot的数量限制了TaskManager能够并行处理的任务数量
  • 启动之后,TaskManager会向资源管理器注册它的slots;收到资源管理器的指令后,TaskManager就会将一个或者多个槽位提供给JobMaster调用,JobMaster就可以分配任务来执行
  • 在执行过程中,TaskManager可以缓冲数据,还可以跟其他运行同一应用的TaskManager交换数据

作业提交流程

抽象流程

作业提交流程

  1. 由客户端通过分发器提供的REST接口,将作业提交给JobManager
  2. 由分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster
  3. JobMaster将JobGraph解析为可执行的ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)
  4. 资源管理器判断当前是否由足够的可用资源;如果没有,启动新的TaskManager
  5. TaskManager启动之后,向ResourceManager注册自己的可用任务槽(slots)
  6. 资源管理器通知TaskManager为新的作业提供slots
  7. TaskManager连接到对应的JobMaster,提供slots
  8. JobMaster将需要执行的任务分发给TaskManager
  9. TaskManager执行任务,互相之间可以交换数据

独立模式

独立模式作业提交流程

Yarn模式

会话模式

Yarn会话模式作业提交流程

  1. 客户端通过REST接口,将作业提交给分发器
  2. 分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster
  3. JobMaster向资源管理器请求资源(slots)
  4. 资源管理器向YARN的资源管理器请求container资源
  5. YARN启动新的TaskManager容器
  6. TaskManager启动之后,向Flink的资源管理器注册自己的可用任务槽
  7. 资源管理器通知TaskManager为新的作业提供slots
  8. askManager连接到对应的JobMaster,提供slots
  9. JobMaster将需要执行的任务分发给TaskManager,执行任务

单作业模式

Yarn单作业模式作业提交流程

  1. 客户端将作业提交给YARN的资源管理器,这一步中会同时将Flink的Jar包和配置上传到HDFS,以便后续启动Flink相关组件的容器
  2. YARN的资源管理器分配Container资源,启动Flink JobManager,并将作业提交给JobMaster。这里省略了Dispatcher组件
  3. JobMaster向资源管理器请求资源(slots)
  4. 资源管理器向YARN的资源管理器请求container资源
  5. YARN启动新的TaskManager容器
  6. TaskManager启动之后,向Flink的资源管理器注册自己的可用任务槽
  7. 资源管理器通知TaskManager为新的作业提供slots
  8. TaskManager连接到对应的JobMaster,提供slots
  9. JobMaster将需要执行的任务分发给TaskManager,执行任务

重要概念

数据流图

  • 所有的Flink程序都可以归纳为由三部分构成
    • Source:源算子,负责读取数据源
    • Transformation:转换算子,利用各种算子进行处理加工
    • Sink:下沉算子,负责数据的输出
  • 在运行时,Flink程序会被映射成所有算子按照逻辑顺序连接在一起的一张图,称为“数据流图”(dataflow graph),它包含了这三部分
  • 类似于任意的有向无环图(DAG),每条数据流以一个或多个source算子开始,以一个或多个sink算子结束
  • 在大部分情况下,dataflow中的算子和程序中的转换运算是一一对应的关系

并行度

  • 每个算子可以包含一或多个子任务,这些子任务在不同的线程/容器/物理机中完全独立地运行
  • 并行度:一个特定算子的子任务的个数
  • 并行度设置优先级:算子 > 全局 > 参数 > 集群配置文件

并行度

算子链

数据传输形式

  • 一个程序中,不同的算子可能具有不同的并行度
  • 算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类
  • 一对一
    • 数据流维护着分区以及元素的顺序(如sourcemap算子),source算子读取数据之后,可以直接发送给map算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着map算子的子任务,看到的元素个数和顺序跟source算子的子任务产生的完全一样,保证着“一对一”的关系
    • mapfilterflatMap等算子都是这种one-to-one的对应关系
    • 类似于Spark中的窄依赖
  • 重分区
    • 数据流的分区会发生改变
    • 每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务
    • 比如从并行度为2的window算子,要传递到并行度为1的Sink算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去
    • 这些传输方式都会引起重分区(redistribute)的过程
    • 类似于Spark中的shuffle,宽依赖

合并算子链

  • 并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分

  • 如果要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置

    1
    2
    3
    4
    // 禁用算子链
    .map(word -> Tuple2.of(word, 1L)).disableChaining();
    // 从当前算子开始新链
    .map(word -> Tuple2.of(word, 1L)).startNewChain();

执行图

  • 逻辑流图:StreamGraph,根据用户通过DataStream API编写的代码生成的最初的DAG图,用来表示程序的拓扑结构
  • 作业图:JobGraph,StreamGraph经过优化后生成的就是作业图JobGraph,提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链
  • 执行图:ExecutionGraph,JobMaster收到JobGraph后,会根据它来生成执行图。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构
  • 物理图:Physical Graph,JobMaster生成执行图后, 会将它分发给TaskManager;各个TaskManager会根据执行图部署任务,最终的物理执行过程也会形成一张物理图。这只是具体执行层面的图,并不是一个具体的数据结构

四层调度图的演变过程

任务槽

任务与任务槽

  • 每个TaskManager都是一个JVM进程,可以启动多个独立的线程,来并行执行多个子任务

  • Taskmanager通过任务槽来控制该TaskManager能接收多少任务

  • 主要限定内存

  • 默认允许子任务共享任务槽,一个任务槽可以保存作业的整个管道。如果希望某个算子对应的任务完全独占一个slot,或只有某一部分算子共享slot,可通过设置slot共享组手动指定

    1
    2
    3
    // 只有属于同一个slot共享组的子任务,才会开启slot共享;不同组之间的任务是完全隔离的,必须分配到不同的slot上
    // 在这种场景下,总共需要的slot数量,就是各个slot共享组最大并行度的总和
    .map(word -> Tuple2.of(word, 1L)).slotSharingGroup(“1”);
  • 可以自行分配资源密集型和资源非密集型任务对资源的占比,保证最终的活平均分配给所有TaskManager

任务与任务槽

并行度与任务槽

  • 任务槽
    • 静态概念,指TaskManager具有的并发执行能力
    • 通过taskmanager.numberOfTaskSlots设定
  • 并行度
    • 动态概念,指TaskManager运行程序实际使用的并发能力
    • 通过parallelism.default设定
  • 用任务最大的并行度作为任务槽的数量(作业本身的并行度是它占据的任务槽的数量)

基础API

概念

  • 一个Flink程序就是对DataStream的各种转换
  • 代码基本上都由以下几部分构成
    • 获取执行环境(execution environment)
    • 读取数据源(source)
    • 定义基于数据的转换操作(transformations)
    • 定义计算结果的输出位置(sink)
    • 触发程序执行(execute)

环境

创建

getExecutionEnvironment

  • 根据当前运行的上下文直接得到正确的结果
  • 会根据当前运行的方式自行决定该返回什么样的运行环境
    • 如果程序是独立运行的,就返回一个本地执行环境
    • 如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境
1
STREAMEXECUTIONENVIRONMENT ENV = STREAMEXECUTIONENVIRONMENT.GETEXECUTIONENVIRONMENT();

createLocalEnvironment

  • 返回一个本地执行环境
  • 可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数
1
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

createRemoteEnvironment

  • 方法返回集群执行环境
  • 需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包
1
2
3
4
5
6
7
8
9
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
// JobManager主机名
"host",
// JobManager进程端口号
1234,
// 提交给JobManager的JAR包
"path/to/jarFile.jar"
);

执行模式

流执行模式

  • 一般用于需要持续实时处理的无界数据流
  • 默认

批执行模式

  • 这种模式下,Flink处理作业的方式类似于MapReduce框架

  • 命令行配置

    1
    bin/flink run -Dexecution.runtime-mode=BATCH ...
  • 代码配置

    1
    2
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

自动模式

  • 由程序根据输入数据源是否有界,来自动选择执行模式

触发

  • Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)
  • 需要显式地调用执行环境的execute()方法来触发程序执行
  • execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)

源算子

有界流

文件

1
DataStreamSource<String> fromFile = env.readTextFile("data/flink/input/clicks.csv");

集合

1
2
3
4
5
List<Event> nums = Arrays.asList(
new Event("Tom", "./home", 100L),
new Event("Jerry", "./cart", 200L)
);
DataStreamSource<Event> fromCollection = env.fromCollection(nums);

元素

1
2
3
4
DataStreamSource<Event> fromElements = env.fromElements(
new Event("Tom", "./home", 100L),
new Event("Jerry", "./cart", 200L)
);

无界流

Socket

1
DataStreamSource<String> fromSocketTextStream = env.socketTextStream("localhost", 8080);

Kafka

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.5</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
DataStreamSource<String> fromKafka = env.fromSource(
KafkaSource
.<String>builder()
.setBootstrapServers("bigdata1:9092")
.setTopics("clicks")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.build(),
// 事件时间和水印
WatermarkStrategy.noWatermarks(),
// 数据源名称
"Kafka Source"
);

自定义

基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static class ClickSource implements SourceFunction<Event> {

// 标志位
private Boolean isRunning = true;

/**
* 当前Source任务对应线程的执行方法
*
* @param ctx 当前Source源任务的上下文,向下游任务传输数据
*/
@Override
public void run(SourceContext<Event> ctx) {
// 循环生成数据
while (isRunning) {
ctx.collect(getRandomEvent());
}
}

// 随机生成数据
private static Event getRandomEvent() {
// 在指定的数据集中随机选取数据
Random random = new Random();
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
return new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
);
}

/**
* 取消当前任务
*/
@Override
public void cancel() {
this.isRunning = false;
}
}
1
DataStreamSource<Event> fromCustomSource = env.addSource(new ClickSource());

并行

  • 可以给当前数据源指定更高的并行度
1
2
3
static public class ClickParallelSource implements ParallelSourceFunction<Event> {
// 与SourceFunction无异
}
1
DataStreamSource<Event> fromCustomParallelSource = env.addSource(new ClickParallelSource()).setParallelism(2);

支持的数据类型

类型系统

  • Flink使用“类型信息”(TypeInformation)来统一表示数据类型
  • TypeInformation类是Flink中所有类型描述符的基类,它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器

支持类型

  • 基本类型
    • 所有Java基本类型及其包装类
    • voidStringDateBigDecimalBigInteger
  • 数组类型
    • 基本类型数组和对象数组
  • 复合数据类型
    • Java元组类型(TUPLE)
      • Flink内置的元组类型,是Java API的一部分
      • 最多25个字段,从Tuple0~Tuple25,不支持空字段
    • Scala样例类及Scala元组
      • 不支持空字段
    • 行类型(ROW)
      • 可以认为是具有任意个字段的元组,并支持空字段
    • POJO
      • Flink自定义的类似于Java bean模式的类
  • 辅助类型
    • Option、Either、List、Map等
  • 泛型类型
    • Flink支持所有的Java类和Scala类。如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化
    • 在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型
      • 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类)
      • 类有一个公共的无参构造方法
      • 类中的所有字段是public且非final的;或者有一个公共的gettersetter方法,这些方法需要符合Java bean的命名规范
      • 所以我们看到,之前的UserBehavior,就是我们创建的符合Flink POJO定义的数据类型

类型提示

  • Flink具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器

  • 由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息不够精细

  • Flink专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息

  • 可以通过returns()方法,明确地指定转换之后的DataStream里元素的类型

    1
    2
    returns(Types.TUPLE(Types.STRING, Types.LONG));
    returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

转换算子

基本算子

Map

  • 映射
1
2
3
4
5
6
7
8
9
10
11
// 从元素中读取数据
// 方式1
SingleOutputStreamOperator<String> map = dss.map(new MapFunction<Event, String>() {
// 自定义mapFunction
@Override
public String map(Event event) throws Exception {
return event.getUser();
}
});
// 方式2
map = dss.map(Event::getUser);

Fliter

  • 过滤
  • 通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断
    • true:元素正常输出
    • false:元素被过滤
1
2
// 只保留Mary数据
SingleOutputStreamOperator<Event> filter = dss.filter(event -> "Marry".equals(event.getUser()));

FlatMap

  • 扁平映射
1
2
3
4
5
6
7
8
9
SingleOutputStreamOperator<String> flatMap = dss
.flatMap((Event event, Collector<String> collector) -> {
collector.collect(event.getUser());
collector.collect(event.getUrl());
collector.collect(event.getTimestamp().toString());
})
// 解决泛型擦出
.returns(new TypeHint<String>() {
});

集合算子

KeyBy

  • 按键分区
  • 返回KeyedStream(指定Key的数据源)
  • KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(如sumreduce
  • 它可以将当前算子任务的状态(state)也按照key进行划分、限定为仅对当前key有效

简单聚合

  • sum():在输入流上,对指定的字段做叠加求和的操作
  • min():在输入流上,对指定的字段求最小值
  • max():在输入流上,对指定的字段求最大值
  • minBy():与min()类似,在输入流上针对指定字段求最小值
    • min()只计算指定字段的最小值,其他字段会第一条数据的值
    • minBy()返回包含字段最小值的整条数据
  • maxBy():与max()类似,在输入流上针对指定字段求最大值
    • 两者区别与min()/minBy()完全一致
1
2
3
4
5
6
// 按键分组后进行聚合,提取当前用户最近一次访问数据
dss
.keyBy(Event::getUser)
// 字段名
.maxBy("timestamp")
.print("maxBy: ");

reduce

  • 归约聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取访问量最大的用户
dss
// 转换数据结构(用户名, 1)
.map(event -> Tuple2.of(event.getUser(), 1L))
// 根据用户名分组
.keyBy(tuple -> tuple.f0)
// 求和
.reduce((tuple1, tuple2) -> Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1))
// 把所有记录放入一个分区
.keyBy(tuple -> "key")
// 获取访问量最大的用户
.reduce((tuple1, tuple2) -> tuple1.f1 > tuple2.f1 ? tuple1 : tuple2)
.print();

富函数类

  • “富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的
  • 典型的生命周期方法
    • open()方法:的初始化方法,会开启一个算子的生命周期
      • 当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。所以像文件IO的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在open()方法中完成
    • close()方法:生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static class MyRichMapper extends RichMapFunction<Event, Integer> {

@Override
public Integer map(Event event) throws Exception {
return event.getUrl().length();
}

/**
* 当前任务实例创建时调用的方法
* @param parameters The configuration containing the parameters attached to the contract.
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("open: task[" + getRuntimeContext().getIndexOfThisSubtask() + "]start");
}

@Override
public void close() throws Exception {
super.close();
System.out.println("close: task[" + getRuntimeContext().getIndexOfThisSubtask() + "]stop");
}

}

物理分区

概念

  • 将数据进行重新分布,传递到不同的流分区去进行下一步处理
  • 真正控制分区策略,精准地调配数据,告诉每个数据到底去哪里
  • keyBy逻辑分区是“软分区”

随机

  • 通过调用DataStream的shuffle()方法,将数据随机地分配到下游算子的并行任务中去
  • 随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区
  • 因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同
1
dss.shuffle().print().setParallelism(4);

轮询

  • 默认
  • 按照先后顺序将数据做依次分发
  • 通过调用DataStream的rebalance()方法,可以实现轮询重分区
  • 使用Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去
1
2
// TODO 轮询分区
dss.rebalance().print().setParallelism(4);

重缩放

  • 重缩放分区和轮询分区非常相似
  • 当调用rescale()方法时底层使用Round-Robin算法进行轮询,但只将数据轮询发送到下游并行任务的一部分
  • “发牌人”如果有多个,那么rebalance的方式是每个发牌人都面向所有人发牌;而rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌
  • 由于rebalance是所有分区数据的“重新平衡”,当TaskManager数据量较多时,这种跨节点的网络传输必然影响效率;如果配置的task slot数量合适,用rescale的方式进行“局部重缩放”,可以让数据只在当前TaskManager的多个slot之间重新分配,从而避免了网络传输带来的损耗

重缩放分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
env
.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for (int i = 0; i < 8; i++) {
// 将奇偶数分别发送到0号/1号分区
if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
ctx.collect(i);
}
}
}

@Override
public void cancel() {

}
})
.setParallelism(2)
.rescale()
.print()
.setParallelism(4);

广播

  • 经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理
  • 通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去
1
dss.broadcast().print().setParallelism(4);

全局

  • 通过调用global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去,就相当于强行让下游任务并行度变成了1
  • 所可能对程序造成很大的压力

自定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 按奇偶数进行分区
env
.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
.partitionCustom(
// 分区器
new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key % 2;
}
},
// Key选择器
new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
}
)
.print()
.setParallelism(4);

输出算子

文件

  • StreamingFileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink的静态方法
    • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)
    • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)
  • 在创建行或批量编码Sink时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder或bulkWriterFactory)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
dss
.map(Event::toString)
.addSink(
StreamingFileSink
.<String>forRowFormat(
new Path("data/flink/output"),
new SimpleStringEncoder<>(StandardCharsets.UTF_8.toString()))
// 文件滚动保存策略
.withRollingPolicy(
DefaultRollingPolicy
.builder()
// 最大文件
.withMaxPartSize(1024 * 1024 * 1024)
// 滚动周期
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))
// 不活跃的时间间隔
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
.build()
)
.build()
);

Kafka

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.5</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
env
// 从Kafka读取
.fromSource(
KafkaSource
.<String>builder()
.setBootstrapServers("bigdata1:9092")
.setTopics("clicks")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.build(),
// 事件时间和水印
WatermarkStrategy.noWatermarks(),
// 数据源名称
"Kafka Source"
)
// 进行转换
.map((MapFunction<String, String>) value -> {
String[] fields = value.split(",");
return new Event(fields[0], fields[1], Long.parseLong(fields[2])).toString();
})
// 写入Kafka
.sinkTo(
KafkaSink
.<String>builder()
.setBootstrapServers("bigdata1:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema
.builder()
.setTopic("events")
// 数据类型的编码
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build()
);

Redis

1
2
3
4
5
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
env
.addSource(new ClickSource())
.addSink(
new RedisSink<>(
// 创建Jedis连接配置
new FlinkJedisPoolConfig.Builder().setHost("localhost").build(),
// 指定命令
new RedisMapper<Event>() {
/**
* 返回当前Redis操作的描述
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(
RedisCommand.HSET,
"click"
);
}

@Override
public String getKeyFromData(Event event) {
return event.getUser();
}

@Override
public String getValueFromData(Event event) {
return event.getUrl();
}
}
)
);

Elasticsearch

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.14.5</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dss
.addSink(
new ElasticsearchSink
.Builder<>(
// 定义hosts列表
Collections.singletonList(new HttpHost("localhost", 9200)),
// 定义ElasticsearchSinkFunction
(ElasticsearchSinkFunction<Event>) (element, ctx, indexer) -> {
HashMap<String, String> map = new HashMap<>();
map.put(element.getUser(), element.getUrl());
// 构建IndexRequest
IndexRequest request = Requests.indexRequest().index("clicks").source(map);
indexer.add(request);
})
.build()
);

MySQL(JDBC)

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.5</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
dss
.addSink(
JdbcSink.sink(
"INSERT INTO clicks (user, url) VALUES (?, ?)",
(preparedStatement, event) -> {
preparedStatement.setString(1, event.getUser());
preparedStatement.setString(2, event.getUrl());
},
new JdbcConnectionOptions
.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("root")
.build()
)
);

自定义

  • 在实现SinkFunction的时候,需要重写的一个关键方法invoke(),这个方法可以实现将流里的数据发送出去的逻辑
  • 用到生命周期的概念使就需要使用了SinkFunction的富函数版本,创建HBase的连接以及关闭HBase的连接需要分别放在open()方法和close()方法中
  • 可能无法保证数据一致性

时间/窗口

时间

处理时间

  • 执行处理操作的机器的系统时间

事件时间

  • 每个事件在对应的设备上发生的时间,也就是数据生成的时间
  • 数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”
  • “先产生的数据先被处理”。由于分布式系统应用中数据流往往是乱序的,不能简单地把数据自带的时间戳当作时钟,需要用另外的标志来表示事件时间进展,在Flink中把它叫作事件时间的“水位线”(Watermarks)
  • 更关心事件时间

水位线

概念

概念

  • 水位线是数据流中插入的一个标记,用来表示事件时间的进展,它会随着数据一起在任务间传递
  • 主要内容是一个时间戳,用来指示当前的事件时间
  • 本质:流当中传输的一条特殊的数据,一个时间戳
  • 作用:处理乱序数据
  • 作用机制:通过延迟关窗
  • 传递
    • 广播方式传输,与Key无关
    • 当前任务中WaterMark取决于上游最小的WaterMark值
    • Watermark单调递增,只有Watermark比上次的大才会向下游传输

事件时间和窗口

  • 在事件时间语义下,不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进靠数据的时间戳来驱动
  • 水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后

有序流中的水位线

  • 为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳就是当前最新数据的时间戳

有序流中的水位线

乱序流中的水位线

  • 可以周期性地生成水位线,这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时就直接以它作为时间戳生成新的水位线
  • 为了遗漏迟到数据,需要多等几秒(把时钟调得更慢一些),让窗口能够把所有迟到数据都收进来得到正确的计算结果,需要保证当前时间已经进展到了这个时间戳,之后不可能再有迟到数据到来

乱序流中的水位线

特性

  • 是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 基于数据时间戳生成
  • 时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据
  • 水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合完成对乱序数据的正确处理

生成

总体原则

  • Flink中的水位线是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员
  • 若希望计算结果能更加准确,可将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。实时性降低,增加了延迟
  • 若希望处理得更快、实时性更强,可以将水位线延迟设得低一些。会导致窗口遗漏数据,计算结果不准确。对于这些 “漏网之鱼”,Flink另外提供了窗口处理迟到数据的方法。如果对准确性完全不考虑,可以直接使用处理时间语义,得到最低的延迟

内置生成器

  • 生成间隔

    1
    env.getConfig().setAutoWatermarkInterval(100);
  • 有序流

    1
    2
    3
    4
    5
    6
    .assignTimestampsAndWatermarks(
    WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    // 指定如何从数据中提取时间戳
    .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
    )
  • 乱序流

    1
    2
    3
    4
    5
    6
    .assignTimestampsAndWatermarks(
    WatermarkStrategy
    // 最大乱序程度
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
    .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
    )

传递

  • 当有一个新的水位线上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大说明事件时间有了进展
  • 当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务
  • 而当前任务自己的时钟是所有分区时钟里最小的那个

水位线的传递

窗口

概念

  • 将无限数据切割成有限的“数据块”
  • 在Flink中窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下该把窗口理解成一个“桶”。窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理

框与桶

  1. 第一个数据时间戳为2,判断之后创建第一个窗口[0, 10),并将2秒数据保存进去
  2. 后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口
  3. 11秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将11秒的数据保存进去。由于水位线设置延迟时间为2秒,所以现在的时钟是9秒,第一个窗口也没有到关闭时间
  4. 之后又有9秒数据到来,同样进入[0, 10)窗口中
  5. 12秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了10秒,所以 [0, 10)窗口应该关闭了。第一个窗口收集到了所有的7个数据,进行处理计算后输出结果,并将窗口关闭销毁
  6. 同样的,之后的数据依次进入第二个窗口,遇到20秒的数据时会创建第三个窗口[20, 30)并将数据保存进去;遇到22秒数据时,水位线达到了20秒,第二个窗口触发计算,输出结果并关闭

分类

按照驱动类型

  • 时间窗口
    • 以时间点来定义窗口的开始和结束所截取出的数据
    • “定点发车”
  • 计数窗口
    • 基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口
    • “人满就发车”

按窗口分配数据规则

  • 滚动窗口
    • 有固定的大小,是一种对数据进行“均匀切片”的划分方式
    • 窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态
  • 滑动窗口
    • 与滚动窗口类似,滑动窗口的大小也是固定的
    • 窗口之间并不是首尾相接的,而是可以“错开”一定的位置
  • 会话窗口
    • 基于会话对数据进行分组
    • 类似Web应用中session的概念,数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭
    • 只能基于时间窗口
  • 全局窗口
    • 全局有效,把相同key的所有数据都分配到同一个窗口中
    • 默认不做触发计算。如果希望能对数据进行计算处理,需要自定义触发器

API

按键分区/非按键分区

按键分区窗
  • 经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流KeyedStream
  • 基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行
  • 相同key的数据会被发送到同一个并行子任务,窗口操作会基于每个key进行单独的处理
  • 每个key上都定义了一组窗口,各自独立地进行统计计算
1
stream.keyBy().window()
非按键分区
  • 如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1
  • 实际应用中一般不推荐使用这种方式
1
stream.windowAll()

窗口分配器

概念
  • 用来在指定窗口的类型
  • 定义数据应该被“分配”到哪个窗口
时间窗口
1
2
3
4
5
6
// TODO 滚动时间窗口(间隔时间,偏移量/时差)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
// TODO 滑动时间窗口
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(1)))
// TODO 会话窗口
.window(EventTimeSessionWindows.withGap(Time.seconds(2)))
计数窗口
1
2
// TODO 计数窗口,单参滚动,多惨滑动
.countWindow(10, 2)

窗口函数

概念
  • 定义窗口如何进行计算的操作
  • 经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是WindowedStream。这个类型并不是DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到DataStream

流的转换

增量聚合函数
  • 每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态。等到窗口到了结束时间需要输出计算结果的时候,只需要拿出之前聚合的状态直接输出

  • 流处理思路

  • 提高了程序运行的效率和实时性

  • 归约函数ReduceFunction

    • 聚合状态的类型、输出结果的类型都必须和输入数据类型一样
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.getConfig().setAutoWatermarkInterval(100);
    env
    .addSource(new ClickSource())
    .assignTimestampsAndWatermarks(
    WatermarkStrategy
    // 最大乱序程度
    .<Event>forBoundedOutOfOrderness(Duration.ZERO)
    .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
    )
    .map((MapFunction<Event, Tuple2<String, Long>>) value -> new Tuple2<>(value.getUser(), 1L))
    .returns(new TypeHint<Tuple2<String, Long>>() {})
    .keyBy(value -> value.f0)
    // TODO 滚动时间窗口(间隔时间,偏移量/时差)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1))
    .print();
    env.execute();
  • 聚合函数AggregateFunction

    • 增量聚合函数的核心机制
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.getConfig().setAutoWatermarkInterval(100);
    env
    .addSource(new ClickSource())
    .assignTimestampsAndWatermarks(
    WatermarkStrategy
    // 最大乱序程度
    .<Event>forBoundedOutOfOrderness(Duration.ZERO)
    .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
    )
    .keyBy(value -> "key")
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // 用Long保存PV数,Set做UV去重
    .aggregate(new AggregateFunction<Event, Tuple2<Long, Set<String>>, Double>() {
    @Override
    public Tuple2<Long, Set<String>> createAccumulator() {
    return Tuple2.of(0L, new HashSet<>());
    }

    @Override
    public Tuple2<Long, Set<String>> add(Event value, Tuple2<Long, Set<String>> accumulator) {
    // 每来一条数据将PV+1,将User放入Set
    accumulator.f1.add(value.getUser());
    return Tuple2.of(accumulator.f0 + 1, accumulator.f1);
    }

    @Override
    public Double getResult(Tuple2<Long, Set<String>> accumulator) {
    // 窗口触发时输出比值
    return (double) accumulator.f0 / accumulator.f1.size();
    }

    @Override
    public Tuple2<Long, Set<String>> merge(Tuple2<Long, Set<String>> a, Tuple2<Long, Set<String>> b) {
    a.f1.addAll(b.f1);
    return Tuple2.of(a.f0 + b.f0, a.f1);
    }
    })
    .print();
    env.execute();
全窗口函数
  • 需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算

  • 批处理思路

  • 窗口函数WindowFunction

  • 处理窗口函数ProcessWindowFunction

    • 是Window API中最底层的通用窗口函数接口
    • 可以获取到上下文对象”。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间和事件时间水位线
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.getConfig().setAutoWatermarkInterval(100);
    env
    .addSource(new ClickSource())
    .assignTimestampsAndWatermarks(
    WatermarkStrategy
    // 最大乱序程度
    .<Event>forBoundedOutOfOrderness(Duration.ZERO)
    .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
    )
    .keyBy(value -> "key")
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // TODO 输出一条统计信息
    .process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {
    @Override
    public void process(String s, ProcessWindowFunction<Event, String, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
    Set<String> set = new HashSet<>();
    for (Event event : elements) {
    set.add(event.getUser());
    }
    int uv = set.size();
    long start = context.window().getStart();
    long end = context.window().getEnd();
    out.collect("窗口" + "[" + new Timestamp(start) + "-" + new Timestamp(end) + "] uv: " + uv);
    }
    })
    .print();
    env.execute();
结合使用
  • 增量聚合函数处理计算会更高效
  • 全窗口函数提供了更多信息
  • 在实际应用中往往把它们结合在一起使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(100);
env
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
// 最大乱序程度
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
)
.keyBy(value -> "key")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 用Long保存PV数,Set做UV去重
.aggregate(
// TODO 增量聚合计算UV值
new AggregateFunction<Event, Set<String>, Long>() {
@Override
public Set<String> createAccumulator() {
return new HashSet<>();
}

@Override
public Set<String> add(Event value, Set<String> accumulator) {
accumulator.add(value.getUser());
return accumulator;
}

@Override
public Long getResult(Set<String> accumulator) {
return (long) accumulator.size();
}

@Override
public Set<String> merge(Set<String> a, Set<String> b) {
a.addAll(b);
return a;
}
},
// TODO 处理窗口函数包装窗口信息输出
new ProcessWindowFunction<Long, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Long, String, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
long uv = elements.iterator().next();
long start = context.window().getStart();
long end = context.window().getEnd();
out.collect("窗口 " + new Timestamp(start) + "-" + new Timestamp(end) + " uv: " + uv);
}
}
)
.print();
env.execute();

其它API

触发器
  • 用来控制窗口什么时候触发计算
  • 触发计算本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程
  • 方法
    • onElement():窗口中每到来一个元素,都会调用这个方法
    • onEventTime():当注册的事件时间定时器触发时,将调用这个方法
    • onProcessingTime():当注册的处理时间定时器触发时,将调用这个方法
    • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态
  • 响应事件
    • CONTINUE:什么都不做
    • FIRE:触发计算,输出结果
    • PURG:清空窗口中的所有数据,销毁窗口
    • FIRE_AND_PURGE:触发计算输出结果,并清除窗口
移除器
  • 用来定义移除某些数据的逻辑
  • 方法
    • evictBefore():定义执行窗口函数之前的移除数据操作
    • evictAfter():定义执行窗口函数之后的以处数据操作
允许延迟
  • 可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口
  • 水位线的延迟:向前调表
  • 允许延迟:推迟发车时间
迟到的数据放入侧输出流
  • 将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理
  • 相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据

处理迟到数据

水位线与允许延迟

区别

  • 水位线
    • 用于解决乱序问题保证数据的完整性
    • 设定的时间是第一次触发窗口计算的时间
  • 允许延迟
    • 允许延迟的出现是因为如果水位线加大会导致窗口计算延迟
    • 设定的时间是触发窗口计算以后,还可以再等多久的迟到数据
    • 每次符合条件的数据到达都会再次触发一次窗口计算
  • 允许延迟是在水位线基础上再做了一层迟到数据的保证

流程

  • WaterMark到达之前,窗口在攒数据,不会触发计算
  • WaterMark等于windowEndTime时,第一次触发窗口计算
  • WaterMark到达之后,allowlateness之前,如果来了数据每条数据都会触发窗口计算。
  • 超过了allowlateness之后到达的迟到数据会丢弃(可用侧输出流解决)

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// TODO 设置延迟时间
.allowedLateness(Time.minutes(1))
// TODO 定义侧输出流
.sideOutputLateData(new OutputTag<Event>("late"){})
.aggregate(
new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return a + b;
}
},
new ProcessWindowFunction<Long, String, String, TimeWindow>() {

@Override
public void process(String s, ProcessWindowFunction<Long, String, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
out.collect(new UrlViewCount(
s,
elements.iterator().next(),
context.window().getStart(),
context.window().getEnd()
).toString());
}

}
)

处理函数

概念

  • 一个统一的“处理”(process)操作
  • 是所有转换算子的一个概括性的表达,可以自定义处理逻辑
  • 处理有状态流
  • 处理函数中,直面的数据流中最基本的元素:数据事件、状态及时间
  • 提供了一个“定时服务”,可以通过它访问流中的事件、时间戳、水位线,甚至可以注册“定时事件”
  • 继承了AbstractRichFunction抽象类,拥有富函数类的所有特性,同样可以访问状态和其他运行时信息
  • 可直接将数据输出到侧输出流中

分类

ProcessFunction

  • 基本处理函数
  • 基于DataStream直接调用.process()时作为参数传入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
env
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
)
.process(new ProcessFunction<Event, String>() {
// TODO 每来一次数据都会被调用
@Override
public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(value.toString());
log.info("task: {}", getRuntimeContext().getIndexOfThisSubtask());
log.info("timestamp: {}", ctx.timestamp());
log.info("watermark: {}", ctx.timerService().currentWatermark());
}
});

KeyedProcessFunction

  • 对流按键分区后的处理函数

  • 基于KeyedStream调用.process()时作为参数传入

  • 可以使用定时器

  • 处理时间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
    .addSource(new ClickSource())
    .keyBy(Event::getUser)
    .process(new KeyedProcessFunction<String, Event, String>() {
    @Override
    public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
    // 获取当前处理时间
    long processingTime = ctx.timerService().currentProcessingTime();
    out.collect(ctx.getCurrentKey() + "数据到达时间: " + new Timestamp(processingTime));
    // 注册10秒后的定时器
    ctx.timerService().registerProcessingTimeTimer(processingTime + 10 * 1000L);
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
    out.collect(ctx.getCurrentKey() + "定时器触发时间: " + new Timestamp(timestamp));
    }
    })
    .print();
    env.execute();
  • 事件事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
    .addSource(new ClickSource())
    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()))
    .keyBy(Event::getUser)
    .process(new KeyedProcessFunction<String, Event, String>() {
    @Override
    public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
    // 获取当前处理时间
    long processingTime = ctx.timestamp();
    out.collect(ctx.getCurrentKey() + "数据到达时间: " + new Timestamp(processingTime) + " watermark: " + ctx.timerService().currentWatermark());
    // 注册10秒后的定时器
    ctx.timerService().registerProcessingTimeTimer(processingTime + 10 * 1000L);
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
    out.collect(ctx.getCurrentKey() + "定时器触发时间: " + new Timestamp(timestamp));
    }
    })
    .print();
    env.execute();

ProcessWindowFunction

  • 开窗之后的处理函数,也是全窗口函数的代表
  • 基于WindowedStream调用.process()时作为参数传入

ProcessAllWindowFunction

  • 同样是开窗之后的处理函数
  • 基于AllWindowedStream调用.process()时作为参数传入

CoProcessFunction

  • 合并(connect)两条流之后的处理函数
  • 基于ConnectedStreams调用.process()时作为参数传入

ProcessJoinFunction

  • 间隔连接(interval join)两条流之后的处理函数
  • 基于IntervalJoined调用.process()时作为参数传入

BroadcastProcessFunction

  • 广播连接流处理函数
  • 基于BroadcastConnectedStream调用.process()时作为参数传入
  • “广播连接流”BroadcastConnectedStream是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物

KeyedBroadcastProcessFunction

  • 按键分区的广播连接流处理函数
  • 基于BroadcastConnectedStream调用.process()时作为参数传入
  • 与BroadcastProcessFunction不同的是,这时的广播连接流是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物

多流转换

分流

  • 使用处理函数(process function)的侧输出流(side output)
  • 处理函数本身可以认为是一个转换算子,它的输出类型是单一的,处理之后得到的仍然是一个DataStream
  • 侧输出流则不受限制,可以任意自定义输出数据,它们就像从“主流”上分叉出的“支流”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary") {
};
OutputTag<Tuple3<String, String, Long>> bobTag = new OutputTag<Tuple3<String, String, Long>>("Bob") {
};
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> processedStream = env
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
)
.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
switch (value.getUser()) {
// TODO 定义输出标签
case "Mary":
ctx.output(maryTag, Tuple3.of(value.getUser(), value.getUrl(), value.getTimestamp()));
break;
case "Bob":
ctx.output(bobTag, Tuple3.of(value.getUser(), value.getUrl(), value.getTimestamp()));
break;
default:
out.collect(value);
}
}
});
processedStream.print("main");
processedStream.getSideOutput(maryTag).print("mary");
processedStream.getSideOutput(bobTag).print("bob");
env.execute();

合流

基本

联合

  • 直接将多条流合在一起
  • 流中的数据类型必须相同,合并之后的新流会包括所有流中的元素
  • 合流之后的水位线以最小的为准
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream1 = env
.socketTextStream("localhost", 8080)
.map(value -> {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.parseLong(fields[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
SingleOutputStreamOperator<Event> stream2 = env
.socketTextStream("localhost", 8081)
.map(value -> {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.parseLong(fields[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
// TODO 合并两条流
stream1
.union(stream2)
.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
out.collect("watermark: " + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();

连接

  • 直接把两条流像接线一样对接起来
  • 连接操作允许流的数据类型不同
  • 连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中,内部仍保持各自的数据形式不变,彼此之间是相互独立的
  • 要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型

连接流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 来自APP的支付日志
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env
.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((element, recordTimestamp) -> element.f2)
);
// TODO 来自第三方支付平台的支付日志
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdPartyStream = env
.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple4<String, String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((element, recordTimestamp) -> element.f3)
);

// TODO 检测同一支付单在两条流中是否匹配
appStream
.connect(thirdPartyStream)
.keyBy(value -> value.f0, value -> value.f0)
.process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>() {
// 进入当前处理实例的数据的Key相同
// 定义状态变量保存已经到达的时间
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

@Override
public void open(Configuration parameters) {
this.appEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>(
"app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)
));
this.thirdPartyEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>(
"third-party-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)
));
}

// TODO appEvent
@Override
public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
if (thirdPartyEventState.value() != null) {
out.collect("对账成功: " + value + "::" + thirdPartyEventState.value());
// 清空状态
thirdPartyEventState.clear();
} else {
// 更新状态
appEventState.update(value);
// 注册定时器等待另一条流的输出时间
ctx.timerService().registerProcessingTimeTimer(value.f2 + 5000);
}
}

// TODO thirdPartyEvent
@Override
public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null) {
out.collect("对账成功: " + appEventState.value() + "::" +value);
// 清空状态
appEventState.clear();
} else {
// 更新状态
thirdPartyEventState.update(value);
// 注册定时器等待另一条流的输出时间
ctx.timerService().registerProcessingTimeTimer(value.f3 + 5000);
}
}

// TODO 定时器触发
@Override
public void onTimer(long timestamp, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
if (appEventState.value() != null) {
out.collect("对账失败: " + appEventState.value() + " " + "第三方支付平台信息未到");
}
if (thirdPartyEventState.value() != null) {
out.collect("对账失败: " + thirdPartyEventState.value() + " " + "app信息未到");
}
appEventState.clear();
thirdPartyEventState.clear();
}
})
.print();
env.execute();

双流联结

窗口

  • 基于时间的操作
  • 主要是针对单一数据流在某些时间段内的处理计算
1
2
3
4
5
6
stream1
.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env
.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.f1)
);
DataStream<Tuple2<String, Integer>> stream2 = env
.fromElements(
Tuple2.of("a", 3000),
Tuple2.of("b", 3000),
Tuple2.of("a", 4000),
Tuple2.of("b", 4000)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.f1)
);
stream1
.join(stream2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((JoinFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>) (first, second) -> first + "->" + second)
.print();
env.execute();

间隔

  • 针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配

  • 给定两个时间点分别叫作间隔的“上界”和“下界”;于是对于一条流A中的任意一个数据元素a,就可以开辟一段以a的时间戳为中心,下至下界点、上至上界点的一个闭区间,把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流B中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

    间隔联结

1
2
3
4
5
6
7
8
9
10
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
Tuple3.of("Mary", "order-1", 5000L),
Tuple3.of("Alice", "order-2", 5000L),
Tuple3.of("Bob", "order-3", 20000L),
Tuple3.of("Alice", "order-4", 20000L),
Tuple3.of("Cary", "order-5", 51000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.f2)
);
SingleOutputStreamOperator<Event> clickStream = env.fromElements(
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Alice", "./prod?id=200", 3500L),
new Event("Bob", "./prod?id=2", 2500L),
new Event("Alice", "./prod?id=300", 36000L),
new Event("Bob", "./home", 30000L),
new Event("Bob", "./prod?id=1", 23000L),
new Event("Bob", "./prod?id=3", 33000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
orderStream
.keyBy(value -> value.f0)
.intervalJoin(clickStream.keyBy(Event::getUser))
// 下订单前5秒到下订单后10秒
.between(Time.seconds(-5), Time.seconds(10))
.process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
@Override
public void processElement(Tuple3<String, String, Long> left, Event right, ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(right + "->" + left);
}
})
.print();
env.execute();

窗口同组

  • 用法跟window join非常类似,将两条流合并之后开窗处理匹配的元素
  • 区别在于调用apply()方法定义具体操作时传入的是一个CoGroupFunction
  • 同样有三个参数,分别代表两条流中的数据以及用于输出的收集器(Collector)。不同的是,这里的前两个参数不再是单独的每一组“配对”数据了,而是传入了可遍历的数据集合
  • 不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎样配对完全是自定义的
1
2
3
4
5
stream1.coGroup(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
1
2
3
4
5
6
7
8
...
stream1
.coGroup(stream2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>) (first, second, out) -> out.collect(first + "->" + second))
...

状态编程

概念

状态

  • 处理机制的核心就是“有状态的流式计算”
  • 在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时可基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护并用来计算输出结果的所有数据,叫作这个任务的状态

有状态算子

  • 无状态的算子任务:只需要观察每个独立事件,根据当前输入的数据直接转换输出结果
  • 有状态的算子任务:除当前数据之外,还需要一些其他数据来得到计算结果
  • 状态算子的一般处理流程
    1. 算子任务接收到上游发来的数据
    2. 获取当前状态
    3. 根据业务逻辑进行计算,更新状态
    4. 得到计算结果,输出发送到下游任务

有状态算子处理流程

管理

  • 将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量
  • 状态可以认为是子任务实例上的一个本地变量,能够被任务的业务逻辑访问和修改
  • Flink有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整

分类

原始状态

  • Raw State
  • 自定义的,相当于开辟了一块内存,需要自己管理,实现状态的序列化和故障恢复
  • Flink不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储

托管状态

  • Managed State

  • 由Flink的运行时(Runtime)来托管,由Flink统一管理

  • Flink提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构

  • Keyed State和Operator State都在本地实例上维护,每个并行子任务维护着对应的状态,算子的子任务之间状态不共享

  • 算子状态

    • 状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效
    • 不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效
    • 可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别

    算子状态

  • 按键分区状态

    • 状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就keyBy之后才可以使用
    • 聚合的结果是以Keyed State的形式保存的

    按键分区状态

按键分区状态

概念

Keyed State

  • 任务按照键(key)来访问和维护,以key为作用范围进行隔离
  • 底层类似于分布式的map数据结构,所有的状态会根据key保存成键值对的形式。当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的key,从map存储中读取出对应的状态值
  • 使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream即使转换算子实现了对应的富函数类也不能通过运行时上下文访问Keyed State

Key Groups

  • 在应用的并行度改变时,状态也需要随之进行重组
  • 不同key对应的Keyed State可以进一步组成所谓的键组(Key Groups),每一组都对应着一个并行子任务
  • 键组是Flink重新分配Keyed State的单元,键组的数量就等于定义的最大并行度
  • 当算子并行度发生改变时,Keyed State就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同

结构

ValueState

  • 状态中只保存一个“值”(value)
  • T value():获取当前状态的值
  • update(T value):对状态进行更新,传入的参数value就是要覆写的状态值
  • 需要状态描述器:ValueStateDescriptor
    • 有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
stream
// TODO 统计每个用户的PV
.keyBy(Event::getUser)
.process(new KeyedProcessFunction<String, Event, String>() {
// TODO 定义状态保存当前用户PV值和定时器
ValueState<Long> pvCountState;
ValueState<Long> timerTsState;

@Override
public void open(Configuration parameters) {
pvCountState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Long.class));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
}

@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
// TODO 更新PV值和定时器
Long count = pvCountState.value();
pvCountState.update(count == null ? 1 : count + 1);
if (timerTsState.value() == null) {
long ts = value.timestamp + 10 * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
timerTsState.update(ts);
}
}

@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + " PV: " + pvCountState.value());
// TODO 更新状态
long ts = timestamp + 10 * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
timerTsState.update(ts);
}
})
.print();

ListState

  • Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>
  • update(List<T> values):传入一个列表values,直接对状态进行覆盖
  • add(T value):在状态列表中添加一个元素value
  • addAll(List<T> values):向列表中添加多个元素,以列表values形式传入
  • 需要状态描述器:ListStateDescriptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// TODO 自定义列表状态进行全外联结
stream1
.connect(stream2)
.keyBy(value -> value.f0, value -> value.f0)
.process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
// TODO 定义列表状态保存两条六中已经大道的所有数据
private ListState<Tuple2<String, Long>> listState1;
private ListState<Tuple2<String, Long>> listState2;

@Override
public void open(Configuration parameters) {
listState1 = getRuntimeContext()
.getListState(new ListStateDescriptor<>(
"listState1",
Types.TUPLE(Types.STRING, Types.LONG)
));
listState2 = getRuntimeContext()
.getListState(new ListStateDescriptor<>(
"listState2",
Types.TUPLE(Types.STRING, Types.LONG)
));
}

@Override
public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
// TODO 获取另一条流中所有数据配对输出
listState2.get().forEach(value2 -> out.collect(value.f0 + " " + value.f2 + "->" + value2.f0 + " " + value2.f1));
listState1.add(Tuple2.of(value.f0, value.f2));
}

@Override
public void processElement2(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
// TODO 获取另一条流中所有数据配对输出
listState1.get().forEach(value1 -> out.collect(value1.f0 + " " + value1.f1 + "->" + value.f0 + " " + value.f2));
listState2.add(Tuple2.of(value.f0, value.f2));
}
})
.print();

MapState

  • UV get(UK key):传入一个key作为参数,查询对应的value值
  • put(UK key, UV value):传入一个键值对,更新key对应的value值
  • putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中
  • remove(UK key):将指定key对应的键值对删除
  • boolean contains(UK key):判断是否存在指定的key,返回一个boolean值
  • Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对
  • Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型
  • Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
  • boolean isEmpty():判断映射是否为空,返回一个boolean值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static class FakeWindowCount extends KeyedProcessFunction<String, Event, String> {
// TODO 窗口大小
private final Long size;
// TODO 保存每个窗口中统计的count
private MapState<Long, Long> mapState;

public FakeWindowCount(long size) {
this.size = size;
}

@Override
public void open(Configuration parameters) {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("count", Long.class, Long.class));
}

@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
// TODO 根据时间戳判断窗口
long currIdx = value.getTimestamp() / size * size;
long nextIdx = currIdx + size;
// TODO 注册定时器
ctx.timerService().registerEventTimeTimer(nextIdx - 1);
// TODO 更新状态进行增量聚合
if (!mapState.contains(currIdx)) {
mapState.put(currIdx, 0L);
}
mapState.put(currIdx, mapState.get(currIdx) + 1);
}

@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
// TODO 定时器触发,直接输出统计的pv结果
long windowEnd = timestamp + 1;
long windowStart = windowEnd - size;
long count = mapState.get(windowStart);
out.collect("URL: " + ctx.getCurrentKey()
+ " 窗口: " + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd)
+ " 访问量: " + count
);
// TODO 模拟窗口的销毁,清除map中的key
mapState.remove(windowStart);
}
}

ReducingState

  • 类似于值状态,不过需要对添加进来的所有数据进行归约,保存归约聚合之后的值
  • 接口调用的方法类似于ListState,不过保存的只是一个聚合值
  • add():直接把新数据和之前的状态进行归约,并用得到的结果更新状态
  • 需要状态描述器:ReducingStateDescriptor
    • 其中第二个参数就是定义了归约聚合逻辑的ReduceFunction

AggregatingState

  • 与归约状态类似,是一个用来保存添加进来的所有数据的聚合结果的值
  • 它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数来定义,里面通过一个累加器(Accumulator)来表示状态
  • 所合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活
  • add():直接使用指定的AggregateFunction进行聚合并更新状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public static class AvgTsResult extends RichFlatMapFunction<Event, String> {
// TODO 定义聚合状态计算平均时间戳
private AggregatingState<Event, Long> avgAggState;
// TODO 定义值状态保存当前用户访问频次
ValueState<Long> countState;
private final Long size;

public AvgTsResult(long size) {
this.size = size;
}

@Override
public void open(Configuration parameters) {
avgAggState = getRuntimeContext()
.getAggregatingState(new AggregatingStateDescriptor<>(
"avgTs",
new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L);
}

@Override
public Tuple2<Long, Long> add(Event value, Tuple2<Long, Long> accumulator) {
return Tuple2.of(accumulator.f0 + value.getTimestamp(), accumulator.f1 + 1);
}

@Override
public Long getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f0 / accumulator.f1;
}

@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
},
Types.TUPLE(Types.LONG, Types.LONG)
));
countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Long.class));
}

@Override
public void flatMap(Event value, Collector<String> out) throws Exception {
Long currCount = countState.value();
currCount = currCount == null ? 1L : currCount + 1;
// TODO 更新状态
countState.update(currCount);
avgAggState.add(value);
// TODO 达到size就输出结果
if (currCount.equals(size)) {
out.collect(value.getUser() + " 平均时间戳: " + new Timestamp(avgAggState.get()));
countState.clear();
}
}
}

生存时间

  • 在实际应用中很多状态会随着时间的推移逐渐增长,如果不加以限制最终就会导致存储空间的耗尽
  • 配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时就将它清除
  • 配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的enableTimeToLive()方法启动TTL功能
  • 目前的TTL设置只支持处理时间
  • 所有集合类型的状态在设置TTL时,都是针对每一项(per-entry)元素的。一个列表状态中的每一个元素都会以自己的失效时间来进行清理,而不是整个列表一起清理
1
2
3
4
5
6
7
8
9
10
11
12
ValueStateDescriptor<Event> valueStateDescriptor = new ValueStateDescriptor<>("valueState", Event.class);
valueState = getRuntimeContext().getState(valueStateDescriptor);
// TODO 配置状态的TTL
StateTtlConfig ttlConfig = StateTtlConfig
// 设置TTL时间
.newBuilder(Time.hours(1))
// 设置更新条件
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 设置状态的可见性
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
valueStateDescriptor.enableTimeToLive(ttlConfig);

算子状态

概念

  • 只针对当前算子并行任务有效,不需要考虑不同key的隔离
  • 不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State
  • 一般用在Source或Sink等与外部系统连接的算子上
  • 当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同

类型

ListState

  • 不会按key分别处理状态,每一个并行子任务上只会保留一个“列表”,是当前并行子任务上所有状态项的集合
  • 列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立
  • 不会存在“键组”(key group),为了方便重组分配就把它直接定义成了“列表”(list)
  • 当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”

UnionListState

  • 与常规列表状态的区别在于算子并行度进行缩放调整时对于状态的分配方式不同
  • 联合列表状态的重点就在于“联合”(union)。在并行度调整时,联合列表状态的算子则会直接广播状态的完整列表。并行子任务就获取到了联合后完整的“大列表”后可自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)
  • 如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式

BroadcastState

  • 所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样
  • 在并行度调整时只要复制一份到新的并行任务就可以实现扩展;对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉(状态都是复制出来的,并不会丢失)
  • 底层类似map的键值对,必须基于一个“广播流”(BroadcastStream)来创建

代码

  • Flink无法直接判断该怎样保存和恢复状态,而是提供了接口,让我们根据业务需求自行设计状态的快照保存(snapshot)和恢复(restore)逻辑
1
2
3
4
5
6
public interface CheckpointedFunction {
// 保存状态快照到检查点时,调用这个方法
void snapshotState(FunctionSnapshotContext context) throws Exception
// 初始化状态时调用这个方法,也会在恢复状态时调用
void initializeState(FunctionInitializationContext context) throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {

private final int size;
private final List<Event> buffer;
// TODO 定义算子状态
private ListState<Event> checkpointState;

public BufferingSink(int size) {
this.size = size;
this.buffer = new ArrayList<>();
}

@Override
public void invoke(Event value, Context context) {
// TODO 缓存到列表
buffer.add(value);
// TODO 如果达到阈值就批量写入
if (buffer.size() == this.size) {
buffer.forEach(System.out::println);
System.out.println("输出完毕");
buffer.clear();
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// TODO 对状态进行持久化
// TODO 清空状态
checkpointState.clear();
// TODO 复制缓存列表到列表状态
checkpointState.addAll(buffer);

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// TODO 定义算子状态
ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>("buffer", Event.class);
checkpointState = context.getOperatorStateStore().getListState(descriptor);
// TODO 故障恢复把状态恢复到列表中
if (context.isRestored()) {
checkpointState.get().forEach(buffer::add);
}
}

}

广播状态

概念

  • 让所有并行子任务都持有同一份状态,也就意味着一旦状态有变化,所以子任务上的实例都要更新
  • 可以将这动态的配置数据看作一条流,将这条流和本身要处理的数据流进行连接(connect),就可以实时地更新配置进行计算
  • 其他算子状态的列表(list)结构不同,底层是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)

代码案例

  • 考虑在电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”或者“登录-支付”,检测出这些连续的行为进行统计,就可以了解平台的运用状况以及用户的行为习惯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 读取用户行为事件流
DataStreamSource<Action> actionStream = env
.fromElements(
new Action("Alice", "login"),
new Action("Alice", "pay"),
new Action("Bob", "login"),
new Action("Bob", "buy")
);
// TODO 定义行为模式流代表要检测的标准
DataStreamSource<Pattern> patternStream = env
.fromElements(
new Pattern("login", "pay"),
new Pattern("login", "buy")
);
// TODO 连接两条流进行处理
actionStream
.keyBy(Action::getUserId)
.connect(patternStream
// TODO 定义广播状态描述器
.broadcast(new MapStateDescriptor<>("pattern", Void.class, Pattern.class))
)
.process(new KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>() {
// TODO 定义KeyedState保存用户上次行为
private ValueState<String> prevActionState;

@Override
public void open(Configuration parameters) {
prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("last-action", String.class));
}

@Override
public void processElement(Action value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.ReadOnlyContext ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
// TODO 从广播状态中获取匹配模式
ReadOnlyBroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<>("pattern", Void.class, Pattern.class));
Pattern pattern = patternState.get(null);
// TODO 获取用户上次行为
String prevAction = prevActionState.value();
// TODO 判断是否匹配
if (pattern != null && prevAction != null) {
if (pattern.getAction1().equals(prevAction) && pattern.getAction2().equals(value.getAction())) {
out.collect(Tuple2.of(ctx.getCurrentKey(), pattern));
}
} else {
// TODO 更新状态
prevActionState.update(value.getAction());
}
}

@Override
public void processBroadcastElement(Pattern value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.Context ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
// TODO 从上下文中获取并更新广播状态
BroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<>("pattern", Void.class, Pattern.class));
patternState.put(null, value);
}
})
.print();
env.execute();

持久化

检查点

概念

  • 所有任务的状态在某个时间点的一个快照(一份拷贝)
  • 在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的id和状态;如果发生故障,Flink就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样
  • 如果保存检查点之后又处理了一些数据后发生故障,重启恢复状态后这些数据带来的状态改变会丢失。为了让最终处理结果正确,需要让源(Source)算子重新读取这些数据再次处理一遍
  • 检查点默认被禁用,要在代码中手动开启。直接调用执行环境的enableCheckpointing()方法开启检查点
  • Flink还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于保存点是自定义的镜像保存,所以不会由Flink自动创建,而需要用户手动触发
  • 检查点的保存离不开JobManager和TaskManager以及外部存储系统的协调

流程

  1. 在应用进行检查点保存时,由JobManager向所有TaskManager发出触发检查点的命令
  2. TaskManger收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中
  3. 完成之后向JobManager返回确认信息(这个过程是分布式的)
  4. 当JobManger收到所有TaskManager的返回信息后,会确认当前检查点成功保存

检查点的保存

状态后端

概念

  • 状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)
  • 主要负责
    • 本地的状态管理
    • 将检查点(checkpoint)写入远程的持久化存储
      • 当应用程序 checkpoint 时,状态后端会在将状态发给JobManager前进行快照
  • 是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置

分类

  • HashMapStateBackend
    • 默认
    • 直接把状态当作对象,保存在Taskmanager的JVM堆上(将本地状态全部放入内存)
    • 普通的状态,窗口中收集的数据和触发器都会以键值对的形式存储,底层是哈希表
    • 检查点的保存一般放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)指定
    • 适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的
  • 内嵌RocksDB状态后端EmbeddedRocksDBStateBackend
    • 一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘
    • 默认存储在TaskManager的本地数据目录里
    • 数据被存储为序列化的字节数组(Byte Arrays)
      • 读写操作需要序列化/反序列化,访问性能要差一些
      • key的比较也会按照字节进行,而不是直接调用hashCode()equals()方法
    • 对于检查点同样会写入到远程的持久化文件系统中,还提供增量式保存检查点的机制,可以大大提升保存效率
    • EmbeddedRocksDBStateBackend始终是异步快照,不会因为保存检查点而阻塞数据的处理
    • 在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效

选择策略

  • 两种状态后端最大的区别在于本地状态存放位置
    • HashMap:内存
    • RocksDB后:RocksDB
  • 在实际应用中主要需要根据业务需求在处理性能和应用的扩展性上做一个选
    • HashMapStateBackend
      • 内存计算,读写速度非常快
      • 状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源
    • RocksDB
      • 硬盘存储,所以可以根据可用的磁盘空间进行扩展,支持增量检查点的状态后端,非常适合于超级海量状态的存储
      • 由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,平均读写性能比HashMapStateBackend慢一个数量级

配置

  • 从配置文件配置

    • 在flink-conf.yaml中,使用state.backend来配置默认状态后端
      • hashmap:HashMapStateBackend
      • rocksdb:EmbeddedRocksDBStateBackend
      • 可以是一个实现了状态后端工厂StateBackendFactory的类的完全限定类名
    1
    2
    3
    4
    # 默认状态后端
    state.backend: hashmap
    # 存放检查点的文件路径
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
  • 从代码配置

    • Flink发行版默认就包含RocksDB,只要代码中没有使用RocksDB的相关内容就不需要引入依赖。即在flink-conf.yaml配置文件中设定了state.backendrocksdb,也可以使用RocksDB作为状态后端正常运行

    • 依赖

      1
      2
      3
      4
      5
      <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
      <version>1.13.0</version>
      </dependency>
    • 配置

      1
      2
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExaecutionEnvironment();
      env.setStateBackend(new EmbeddedRocksDBStateBackend());

容错机制

检查点

保存

概念

  • 检查点是Flink容错机制的核心
  • 故障恢复之后继续处理的结果应该与发生故障前完全一致
  • 检查点的保存周期性触发,间隔时间可以进行设置
  • 隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按一定的逻辑结构放在一起持久化保存起来,构成检查点
  • 当所有任务都处理完一个相同的输入数据的时将它们的状态保存下来
    • 避免除状态之外其他额外信息的存储,提高检查点保存的效率
    • 一个数据要么就是被所有任务完整地处理完,状态得到保存;要么就是没处理完,状态全部没保存(相当于构建了一个事务
    • 如果出现故障恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理。只需要让源任务向数据源重新提交偏移量、请求重放数据。需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量

具体流程

  • 输入:“hello”“world”“hello”“flink”“hello”“world”“hello”“flink”

检查点保存的具体流程

  • 需要保存检查点时,在所有任务处理完同一条数据后对状态做个快照保存下来
  • 如上图已经处理了3条数据:“hello”“world”“hello”,Source算子的偏移量为3;后面的Sum算子处理完第三条数据“hello”之后所对应的状态为“hello”-> 2,“world”-> 1(这里KeyedState底层会以key-value形式存储)
  • 此时所有任务都已经处理完了前三个数据,可把当前的状态保存成一个检查点写入外部存储中
  • 至体保存位置由状态后端的配置项“检查点存储”(CheckpointStorage)决定
    • 有作业管理器的堆内存(JobManagerCheckpointStorage)
    • 文件系统(FileSystemCheckpointStorage)
    • 一般会将检查点写入持久化的分布式文件系统

恢复

概念

  • 运行流处理程序时会周期性地保存检查点。发生故障时需要找到最近一次成功保存的检查点来恢复状态
  • 所以故障恢复的过程需要JobManager的参与
    • 要正确地从检查点中读取并恢复状态,必须知道每个算子任务状态的类型和它们的先后顺序(拓扑结构);因此为了可以从之前的检查点中恢复状态,
    • 状态的拓扑结构在JobManager上可以由JobGraph分析得到,而检查点保存的定期触发也是由JobManager控制的

流程

  • 发生错误

    • 在处理第五个数据“hello”时发生了故障
      • Source任务已经处理完毕,偏移量为5
      • Map任务也处理完成
      • Sum任务在处理中发生了故障,此时状态并未保存

    发生错误

  1. 重启应用

    • 重启后所有任务的状态会清空

    重启应用

  2. 读取检查点,重置状态

    • 找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中

    读取检查点,重置状态

  3. 重放数据

    • 重放数据,保存检查点后开始重新读取数据,通过Source任务向外部数据源重新提交偏移量来实现

    image-20220822162804456

  4. 继续处理数据

    • 首先重放第4、5个数据,然后继续读取后面的数据
    • 处理到第5个数据时就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样
    • 没有丢掉数据也没有重复计算数据,保证了计算结果的正确性。在分布式系统中叫作实现了“精确一次”(exactly-once)的状态一致性保证

    继续处理数据

算法

检查点分界线

  • 在不暂停流处理的前提下,让每个任务“认出”触发检查点保存的那个数据
  • 在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存
  • 由于数据流是保持顺序依次处理的,遇到这个标识就代表之前的数据都处理完,可以保存一个检查点;在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点
  • 这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的“分界线”(Checkpoint Barrier)
  • 与水位线类似,检查点分界线也是一条特殊的数据,由Source算子注入到常规的数据流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过
  • 检查点分界线中带有一个检查点ID,这是当前要保存的检查点的唯一标识

检查点分界线

分布式快照算法

  • watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准
  • Flink使用了Chandy-Lamport算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法
  • 算法的核心是两个原则
    • 当上游任务向多个并行下游任务发送barrier时,需要广播出去
    • 当多个上游任务向同一个下游任务传递barrier时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,需要等到所有并行分区的barrier都到齐才可以开始状态的保存

分布式快照算法

流程

  1. 触发检查点保存

    • JobManager发送指令,触发检查点的保存
      • JobManager 会周期性地向每个 TaskManager发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点
    • Source任务保存状态,插入分界线
      • 收到指令后,TaskManger会在所有Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中

    触发检查点保存

  2. 分界线向下游传递

    • 状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后像数据一样把barrier向下游任务传递

    分界线向下游传递

  3. 执行分界线对齐

    • Sum任务可能收到来自上游两个并行Map任务的barrier,所以需要执行“分界线对齐”操作(所有的barrier都到达时才进行保存)
    • 如果分界线尚未到达的分区任务Map 1又传来了数据,需要保存到检查点,Sum任务应该正常继续处理数据,状态更新为3
    • 如果分界线已经到达的分区任务Map 2又传来数据,这已经是下一个检查点要保存的内容,要缓存起来、等到状态保存之后再做处理

    执行分界线对齐

  4. 保存状态到持久化存储

    • 各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将barrier向下游继续传递,并通知JobManager保存完毕
    • 这个过程中,每个任务保存自己的状态都是相对独立的,互不影响

    保存状态到持久化存储

  5. 处理缓存数据后继续运行

    • 完成检查点保存后任务就可以继续正常处理数据
    • 如果有等待分界线对齐时缓存的数据,需要先做处理;再按照顺序依次处理新到的数据
    • 当JobManager收到所有任务成功保存状态的信息就可以确认当前检查点成功保存,之后遇到故障就可以从这里恢复
    • 由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕
    • Flink 1.11后提供了不对齐的检查点保存方式,可将未处理的缓冲数据(in-flight data)也保存进检查点。这样遇到一个分区barrier时就不需等待对齐,而是可以直接启动状态的保存

配置

启用

  • 默认Flink禁用检查点。如果想要为Flink应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的enableCheckpointing()方法
  • 检查点的间隔时间是对处理性能和故障恢复速度的一个权衡
    • 如果希望对性能的影响更小,可以调大间隔时间
    • 如果希望故障重启后迅速赶上实时的数据处理,需要将间隔时间设小一些
1
2
3
4
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);

存储

  • 持久化存储位置取决于“检查点存储”(CheckpointStorage)的设置
    • 默认,检查点存储在JobManager的堆内存中
    • 对于大状态的持久化保存,提供在其他存储位置进行保存的接口CheckpointStorage
  • 具体可以通过调用检查点配置的setCheckpointStorage()配置
    • 需要传入一个CheckpointStorage的实现类
    • Flink主要提供了两种CheckpointStorage
      • 作业管理器的堆内存JobManagerCheckpointStorage
      • 文件系统FileSystemCheckpointStorage
1
2
3
4
// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

其他高级配置

  • 可以通过获取检查点配置(CheckpointConfig)进行设置

    1
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  • 检查点模式CheckpointingMode

    • 设置检查点一致性的保证级别
    • exactly-once:精确一次,默认
    • at-least-once:至少一次,适用于大多数低延迟的流处理程序,处理效率更高
  • 超时时间checkpointTimeout

    • 指定检查点保存的超时时间,超时没完成就会被丢弃掉
    • 参数为长整型毫秒数作,表示超时时间
  • 最小间隔时间minPauseBetweenCheckpoints

    • 指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以出发保存下一个检查点的指令,为正常处理数据留下了充足的间隙
    • 着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就不能开启下一次检查点的保存
    • 指定参数时,maxConcurrentCheckpoints的值强制为1
  • 最大并发检查点数量maxConcurrentCheckpoints

    • 指定运行中的检查点最多可以有多少个,限制同时进行的最大数量
    • 由于每个任务的处理进度不同,可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了
    • 如果前面设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints不起作用
  • 开启外部持久化存储enableExternalizedCheckpoints

    • 指定是否开启检查点的外部持久化
    • DELETE_ON_CANCELLATION:作业取消时自动删除外部检查点,但如果是作业失败退出则会保留检查点
    • RETAIN_ON_CANCELLATION:作业取消时保留外部检查点,默认,如果想释放空间需要自己手工清理
  • 检查点异常时是否让整个任务失败failOnCheckpointingErrors

    • 指定在检查点发生异常的时是否应该让任务直接失败退出
    • true:默认
    • false:任务会丢弃掉检查点然后继续运行
  • 不对齐检查点enableUnalignedCheckpoints

    • 不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间
    • 这要求检查点模式必须为exctly-once,且并发的检查点个数为1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 启用检查点,间隔时间1秒
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔时间500毫秒
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 超时时间1分钟
checkpointConfig.setCheckpointTimeout(60000);
// 同时只能有一个检查点
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 开启检查点的外部持久化保存,作业取消后依然保留
checkpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();
// 设置检查点存储,可以直接传入一个String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")

保存点

概念

  • 也是一个存盘的备份
  • 原理和算法与检查点完全相同,只是多了一些额外的元数据
  • 保存点就是通过检查点的机制来创建流式作业状态的一致性镜像的
  • 保存点与检查点最大的区别是触发的时机
    • 检查点
      • 由Flink自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能
      • 主要用来做故障恢复,是容错机制的核心
    • 保存点
      • 不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了
      • 保存点则更加灵活,可以用来做有计划的手动备份和恢复
        • 版本管理和归档存储
        • 更新Flink版本
        • 更新应用程序
        • 调整并行度
        • 暂停应用程序

使用

前提

  • 保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变

  • 保存点中状态都是以算子ID-状态名称这样的key-value组织起来的,算子ID可以在代码中直接调用SingleOutputStreamOperator的uid()方法来进行指定

    1
    2
    3
    4
    5
    6
    DataStream<String> stream = env
    .addSource(new StatefulSource())
    .uid("source-id")
    .map(new StatefulMapper())
    .uid("mapper-id")
    .print();
  • 对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态

  • 强烈建议在程序中为每一个算子手动指定ID

创建

  • 在命令行中为运行的作业创建一个保存点镜像

    • jobId:要做镜像保存的作业ID
    • targetDirectory:目标路径,可选,保存点存储的路径
    1
    bin/flink savepoint :jobId [:targetDirectory]
  • 通过配置文件flink-conf.yaml中的state.savepoints.dir项设定保存点的默认路径

    1
    state.savepoints.dir: hdfs:///flink/savepoints
  • 在程序代码中通过执行环境来设置单独的作业的路径

    1
    env.setDefaultSavepointDir("hdfs:///flink/savepoints");
  • 在停掉一个作业时直接创建保存点

    1
    bin/flink stop --savepointPath [:targetDirectory] :jobId

从保存点重启

1
bin/flink run -s :savepointPath [:runArgs]
  • -s:指定保存点的路径
  • 其他启动时的参数还是完全一样

状态一致性

概念

  • 结果的正确性
  • 不漏掉任何一个数据,而且也不会重复处理同一个数据
  • 发生故障时保证状态恢复后结果的正确

级别

最多一次AT-MOST-ONCE

  • 任务发生故障时直接重启,别的什么都不干
  • 既不恢复丢失的状态,也不重放丢失的数据
  • 每个数据在正常情况下会被处理一次,遇到故障时就会丢掉

至少一次AT-LEAST-ONCE

  • 在实际应用中一般希望至少不要丢掉数据
  • 所有数据都不会丢,肯定被处理了
  • 不能保证只处理一次,会可能被重复处理

精确一次EXACTLY-ONCE

  • 最严格的一致性保证
  • 所有数据不仅不会丢失,而且只被处理一次,不会重复处理
  • 对于每一个数据,最终体现在状态和输出结果上只能有一次统计
  • Flink使用检查点保证exactly-once语义

端到端的一致性

概念

  • 完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分
  • 完整应用的一致性,就叫作“端到端(end-to-end)的状态一致性”
  • 取决于三个组件中最弱的那一环

精确一次

内部保证

  • 检查点

Source端

  • 可重设数据的读取位置

Sink端

概念
  • 从故障恢复时数据不会重复写入外部系统
幂等写入
  • 一个操作可以重复执行很多次,但只导致一次结果更改(如HashMap)
  • 遇到故障进行恢复时,有可能会出现短暂的不一致
    • 短时间内,结果会突然“跳回”到之前的某个值,然后“重播”一段之前的数据
    • 当数据的重放逐渐超过发生故障的点的时候,最终的结果还是一致的
事务写入
  • 构建的事务对应着检查点,等到检查点真正完成时才把所有对应的结果写入Sink系统中
  • 具体在项目中的选型最终还应该是一致性级别和处理性能的权衡考量
  • 预写日志WAL
    • 把结果数据当成状态保存,收到检查点完成的通知后一次性写入Sink系统
    • 由于数据提前在状态后端中做了缓存,所以无论什么外部存储系统,理论上都能用这种方式一批搞定
    • 提供了一个模板类GenericWriteAheadSink实现
    • 可能会写入失败
  • 两阶段提交2PC
    • Sink会为每个检查点启动一个事务,将接下来所有接受的数据添加到事务里
    • 将数据写入外部Sink系统但不提交,是“预提交”的状态
    • 收到检查点完成的通知后提交事务,实现结果的真正写入
    • 真正意义上实现了exactly-once,需要一个提供事务支持的外部Sink系统
    • 提供了TwoPhaseCommitSinkFunction接口自定义实现两阶段提交的SinkFunction的实现
    • 对外部Sink系统的要求
      • 外部系统必须提供事务支持,或者Sink任务必须能够模拟外部系统上的事务
      • 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入
      • 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失
      • Sink任务必须能够在进程失败后恢复事务
      • 提交事务必须是幂等操作。事务的重复提交应该是无效的

Source/Sink一致性

Sink \ Source 不可重置 可重置
任意 AT-MOST-ONCE AT-LEAST-ONCE
幂等性 AT-MOST-ONCE EXACTLY-ONCE
(故障恢复时可能出现暂时的不一致)
预写日志 AT-MOST-ONCE AT-LEAST-ONCE
两阶段提交 AT-MOST-ONCE EXACTLY-ONCE

Flink和Kafka的精确一次

整体介绍

  • Flink内部:Fl可以通过检查点机制保证状态和处理结果的exactly-once语义
  • 输入端:Kafka可以对数据进行持久化保存,并可以重置偏移量(offset)
  • 输出端:写入Kafka的过程实际上是一个两阶段提交(2PC)

整体流程

流程

  1. 启动检查点保存

    • JobManager通知各个TaskManager启动检查点保存,Source任务会将检查点分界线(barrier)注入数据流
    • barrier可以将数据流中的数据分为进入当前检查点的集合和进入下一个检查点的集合

    启动检查点保存

  2. 算子任务对状态做快照

    • 分界线会在算子间传递下去
    • 每个算子收到barrier时会将当前的状态做个快照保存到状态后端

    算子任务对状态做快照

  3. Sink任务开启事务,进行预提交

    • 分界线传到了Sink任务时,Sink任务会开启一个事务
    • 对于Kafka而言,提交的数据会被标记为“未确认”。这个过程就是预提交

    Sink任务开启事务,进行预提交

  4. 检查点保存完成,提交事务

    • 所有算子的快照都完成,也就是这次的检查点保存最终完成时,JobManager向所有任务发确认通知,告诉大家当前检查点已成功保存
    • Sink任务收到确认通知后会正式提交之前的事务,把之前“未确认”的数据标为“已确认”,接下来就可以正常消费了
    • 在任务运行中的任何阶段失败,都会从上一次的状态恢复,所有没有正式提交的数据也会回滚

    检查点保存完成,提交事务

配置

  • 启用检查点
  • 在FlinkKafkaProducer的构造函数中传入参数Semantic.EXACTLY_ONCE
  • 配置Kafka读取数据的消费者的隔离级别为read_committed
    • 默认的隔离级别isolation.levelread_uncommitted
  • 事务超时配置
    • Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认1小时
    • Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟
    • 在检查点保存时间很长时可能出现Kafka已经认为事务超时,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了
    • 前者应小于等于后者

TableAPI&SQL

概念

概念

  • 提供了对于“表”处理的支持,是更高层级的应用API,在Flink中被称为Table API和SQL
  • Table API是基于“表”(Table)的一套API,它是内嵌在Java、Scala等语言中的一种声明式领域特定语言(DSL),也就是专门为处理表而设计的
  • 在此基础上,Flink还基于Apache Calcite实现了对SQL的支持。这样一来就可以在Flink程序中直接写SQL来实现处理需求

入门

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.14.5</version>
</dependency>

<!-- 桥接器主要负责Table API和下层DataStream API的连接支持 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.14.5</version>
</dependency>

<!-- 计划器是Table API的核心组件,负责提供运行时环境,并生成程序的执行计划-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.5</version>
</dependency>

<!-- 在Table API的内部实现上,部分相关的代码是用Scala实现 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.14.5</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
SingleOutputStreamOperator<Event> eventStream = streamEnv
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
// TODO 创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
// TODO 将DataStream转换为Table
Table eventTable = tableEnv.fromDataStream(eventStream);
// TODO 使用SQL查询,转换为DataStream并打印
Table resTable1 = tableEnv.sqlQuery("select user, url, `timestamp` from " + eventTable);
tableEnv.toDataStream(resTable1).print("result1");
// TODO 基于Table查询,转换为DataStream并打印
Table resTable2 = eventTable.select($("user"), $("url")).where($("user").isEqual("Alice"));
tableEnv.toDataStream(resTable2).print("result2");
streamEnv.execute();

基础API

概念

  • Table API和SQL可以看作联结在一起的一套API,这套API的核心概念就是表
  • 程序的整体处理流程与DataStream API非常相似,也可以分为读取数据源、转换、输出数据三部分
  • 输入输出操作不需要额外定义,只需要将用于输入和输出的表定义出来,然后进行转换查询
1
2
3
4
5
6
7
8
9
10
11
12
// 创建表环境
TableEnvironment tableEnv = ...;
// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )");
// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// 执行SQL对表进行查询转换,得到一个新的表
Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");
// 使用Table API对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);
// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");

创建环境

概念

  • 对于流处理框架来说数据流和表在结构上还是有所区别的
  • 使用Table API和SQL需要一个特别的运行时环境,这就是所谓的“表环境”(TableEnvironment)
  • 主要负责
    • 注册Catalog和表
      • 目录:与标准SQL中的概念一致,主要用来管理所有数据库和表的元数据
      • 默认叫作default_catalog
    • 执行 SQL 查询
    • 注册用户自定义函数(UDF)
    • DataStream和表之间的转换

基于流执行环境创建

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv1 = StreamTableEnvironment.create(env);

基于环境配置创建

1
2
3
4
5
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv2 = TableEnvironment.create(settings);

创建表

概念

  • 为了方便地查询表,表环境中会维护一个目录(Catalog)和表的对应关系。所以表都是通过Catalog来进行注册创建的

  • 表在环境中有一个唯一的ID,由三部分组成

    • 目录名:默认为default_catalog
    • 数据库名:默认为default_database
    • 表名
  • 可以在环境中进行设置自定义的默认目录名和库名

    1
    2
    tableEnv.useCatalog("custom_catalog");
    tableEnv.useDatabase("custom_database");

Connector Tables

  • 在表环境中读取这张表,连接器就会从外部系统读取数据并进行转换
  • 向这张表写入数据,连接器就会将数据输出到外部系统中
1
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
1
2
3
4
5
6
7
8
9
10
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" url STRING, " +
" ts BIGINT " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'data/flink/input/clicks.csv', " +
" 'format' = 'csv' " +
")";
tableEnv1.executeSql(createDDL);

Virtual Tables

  • 与SQL语法中的视图非常类似,也叫作创建“虚拟视图”(createTemporaryView)
  • 并不会直接保存这个表的内容,并没有“实体”
  • 用到这张表的时候会将它对应的查询语句嵌入到SQL中
1
tableEnv.createTemporaryView("NewTable", newTable);

表环境与表对象转换

  • 环境转对象

    1
    Table eventTable = tableEnv.from("EventTable");
  • 对象转环境

    1
    tableEnv.createTemporaryView("EventTable", eventTable);

查询表

使用SQL

1
2
3
4
5
Table urlCountTable = tableEnv.sqlQuery(
"SELECT user, COUNT(url) " +
"FROM EventTable " +
"GROUP BY user "
);
  • 将Table对象名eventTable直接以字符串拼接的形式添加到SQL语句中,在解析时会自动注册一个同名的虚拟表到环境中,省略创建虚拟视图的步骤

    1
    Table clickTable = tableEnvironment.sqlQuery("select url, user from " + eventTable);

使用TableAPI

  • Table API基于Table的Java实例进行调用,首先要得到表的Java对象
1
2
3
Table maryClickTable = eventTable
.where($("user").isEqual("Alice"))
.select($("url"), $("user"));

输出表

1
2
3
4
5
6
// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 经过查询转换,得到结果表
Table result = ...
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");
1
2
3
4
5
6
7
8
9
// 创建一张用于控制台打印输出的表
String createPrintOutDDL = "CREATE TABLE printOutTable (" +
" user_name STRING, " +
" cnt BIGINT " +
") WITH (" +
" 'connector' = 'print' " +
")";
tableEnv2.executeSql(createPrintOutDDL);
resultTable2.executeInsert("printOutTable");

表流转换

表转流

  • 仅插入流toDataStream

    • 无法更新数据(如聚合函数)
    1
    tableEnv.toDataStream(aliceVisitTable).print();
  • 更新日志流toChangelogStream

    1
    tableEnv.toDataStream(urlCountTable).print();

流转表

  • fromDataStream

    • DataStream转Table对象

    • 将流转换成表后,每行数据就对应着一个当前类实例,表中列名对应着实例中的属性

      1
      2
      3
      4
      5
      6
      // 获取表环境
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
      // 读取数据源
      SingleOutputStreamOperator<Event> eventStream = env.addSource(...)
      // 将数据流转换成表
      Table eventTable = tableEnv.fromDataStream(eventStream);
    • 可以在方法中增加参数,指定提取哪些属性作为表中的字段名,并可以任意指定位置

      1
      2
      3
      4
      // 提取Event中的timestamp和url作为表中的列
      Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"), $("url"));
      // 将timestamp字段重命名为ts
      Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"), $("url"));
  • createTemporaryView

    • DataStream转Table环境
    • 参数与fromDataStream相似
    1
    tableEnv.createTemporaryView("EventTable", eventStream, $("timestamp").as("ts"),$("url"));
  • **fromChangelogStream **

    • ChangelogStream转Table对象
    • 流中的数据类型只能是Row,而且每一个数据都需要指定当前行的更新类型(RowKind)

支持类型

  • 原子类型
    • 基础数据类型(Integer、Double、String)
    • 通用数据类型(也就是不可再拆分的数据类型)
    • 将原子类型看作了一元组Tuple1的处理结果
    • 转换后是只有一列的Table,列字段(field)的数据类型可以由原子类型推断出
    • 默认字段名是“f0”
    • 可以在fromDataStream()方法里增加参数重新命名列字段
  • Tuple类型
    • 对应在表中字段名默认就是元组中元素的属性名f0、f1、f2…
    • 所有字段都可以被重新排序,也可以提取其中的一部分字段
    • 可以通过调用表达式的as()方法来进行重命名
  • POJO类型
    • 如果不指定字段名称就会直接使用原始 POJO 类型中的字段名称
    • POJO中的字段同样可以被重新排序、提却和重命名
  • Row类型
    • 是Table中数据的基本组织形式
    • 复合类型,长度固定,无法直接推断出每个字段的类型,在使用时必须指明具体的类型信息
    • 在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)
    • 附加了一个属性RowKind,用来表示当前行在更新操作中的类型(可以用来表示更新日志流中的数据)

流处理&表

概念

  • 将Table转换成DataStream时,有“仅插入流”和“更新日志流”两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作

  • 关系型数据库的查询和流查询的对比

    关系型表/SQL 流处理
    处理的数据对象 字段元组的有界集合 字段元组的无限序列
    查询 对数据的访问 可以访问到完整的数据输入 无法访问到所有数据,必须“持续”等待流式输入
    查询终止条件 生成固定大小的结果集后终止 永不停止,根据持续收到的数据不断更新查询结果

动态表&持续查询

动态表

  • Dynamic Tables
  • 当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果,得到的表就会不断地动态变化
  • 是Flink在Table API和SQL中的核心概念
  • 借鉴了物化视图的思想,它的更新其实就是不停地处理更新日志流的过程

持续查询

  • Continuous Query
  • 对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表
  • 对输入动态表做“快照”,当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,构成“持续查询”

持续查询

查询限制

  • 状态大小

    • 用持续查询做流处理,往往会运行至少几周到几个月;所以持续查询处理的数据总量可能非常大,要维护的状态也将逐渐增长,最终可能会耗尽存储空间导致查询失败
  • 更新计算

    • 对于有些查询来说,更新计算的复杂度可能很高。每来一条新的数据,更新结果的时候可能需要全部重新计算,并且对很多已经输出的行进行更新

    • 一个典型的例子就是RANK()函数会基于一组数据计算当前值的排名,一个用户的排名发生改变时其他用户的排名也会改变;更新操作代价巨大,还会随着用户的增多越来越严重

动态表转流

仅追加流

  • Append-only
  • 可以直接转换为“仅追加”流,流中发出的数据是动态表中新增的每一行

撤回流

  • Retract
  • 添加消息:add
    • 插入操作
  • 撤回消息:retract
    • 删除操作
    • 更新操作:两个消息
      • 前数据的撤回(删除)
      • 新数据的插入

更新插入流

  • Upsert:是“update”和“insert”的合成词
  • 需要动态表中必须有唯一的键,外部系统也需要知道这唯一的键
  • 插入消息:upsert
    • 插入/更新操作:通过表中键判断
  • 删除消息:delete
    • 删除操作

时间属性&窗口

时间属性

  • 时间属性,就是每个表模式结构的一部分
  • 可以在创建表的DDL里直接定义为一个字段,也可以在DataStream转换成表时定义
  • 一旦定义了时间属性,就可以作为一个普通字段引用,并且可以在基于时间的操作中使用
  • 数据类型为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算
  • 按照时间语义的不同,可把时间属性的定义分成事件时间和处理时间

事件时间

概念

  • 处理乱序事件或者延迟事件的场景
  • 允许表处理程序根据每个数据中包含的时间戳(事件发生时间)生成结果

创建表DDL中定义

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE EventTable(
user STRING,
url STRING,
# TIMESTAMP([时间戳的精度])
# 3: 精确到毫秒
ts TIMESTAMP(3),
# 基于ts设置了5秒的水位线延迟
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE events (
user STRING,
url STRING,
ts BIGINT,
# 带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE)
# 如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为TIMESTAMP_LTZ会更方便
# 把长整形的值转换成TIMESTAMP_LTZ
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
  • 水位线延迟以“时间间隔”的形式定义

    1
    INTERVAL '5' SECOND

数据流转表时定义

  • 只负责指定时间属性,时间戳的提取和水位线的生成应该之前就在DataStream上定义好了
  • 由于DataStream中没有时区概念,因此会将事件时间属性解析成不带时区的TIMESTAMP类型,所有的时间值都被当作UTC标准时间
1
2
3
4
5
6
7
8
9
10
11
// 方法一:
// 流中数据类型为二元组Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime());

// 方法二:
// 流中数据类型为三元组Tuple3,最后一个字段就是事件时间戳
DataStream<Tuple3<String, String, Long>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime());

处理时间

概念

  • 使用时不需要提取时间戳和生成水位线
  • 必须要额外声明一个字段专门用来保存当前的处理时间

创建表DDL中定义

  • 这里的时间属性以“计算列”(computed column)的形式定义出来
    • 所计算列是Flink SQL中引入的特殊概念
    • 可以用一个AS语句来在表中产生数据中不存在的列
    • 可以利用原有的列、各种运算符及内置函数
    • 在事件时间属性的定义中,将ts字段转换成TIMESTAMP_LTZ类型的ts_ltz,也是计算列的定义方式
1
2
3
4
5
6
7
8
CREATE TABLE EventTable(
user STRING,
url STRING,
# 指定处理时间
ts AS PROCTIME()
) WITH (
...
);

数据流转表时定义

1
2
3
DataStream<Tuple2<String, String>> stream = ...;
// 声明一个额外的字段作为处理时间属性字段
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());

窗口

分组窗口

  • 老版本
  • 调用TUMBLE()HOP()SESSION(),传入时间属性字段、窗口大小等参数就可以
1
TUMBLE(ts, INTERVAL '1' HOUR)
1
2
3
4
5
6
7
8
9
10
Table result = tableEnv.sqlQuery(
"SELECT " +
"user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +
"COUNT(url) AS cnt " +
"FROM EventTable " +
"GROUP BY " + // 使用窗口和用户名进行分组
"user, " +
"TUMBLE(ts, INTERVAL '1' HOUR)" // 定义1小时滚动窗口
);

窗口表值函数

  • 窗口表值函数是Flink定义的多态表函数(PTF),将表进行扩展后返回

  • 表函数(table function)可以看作是返回一个表的函数

  • 在窗口TVF的返回值中增加了用来描述窗口的额外3个列

    • 窗口起始点window_start
    • 窗口结束点window_end
      • 窗口中的时间属性
      • 值为window_end - 1ms
      • 是窗口中能够包含数据的最大时间戳
      • 能够触发窗口计算的时刻
    • 窗口时间window_time
  • 滚动窗口Tumbling Windows

    1
    2
    # 基于时间字段ts,对表EventTable中的数据开了大小为1小时的滚动窗口。窗口会将表中的每一行数据,按照它们ts的值分配到一个指定的窗口中
    TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
  • 滑动窗口/跳跃窗口Hop Windows

    1
    2
    # 第三个参数是步长(slide),第四个参数才是窗口大小(size)
    HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS))
  • 累积窗口Cumulate Windows

    • 核心参数
      • 最大窗口长度max window size:统计周期
      • 累积步长step
    1
    CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

    累积窗口

  • 会话窗口Session Windows

    • 目前尚未完全支持

聚合查询

分组聚合

概念

  • SQL中的分组聚合可以对应DataStream API中keyBy之后的聚合转换,它们都是按照某个key对数据进行了划分,各自维护状态来进行聚合统计的

  • 在流处理中分组聚合同样是一个持续查询,更新查询,得到的是一个动态表,每当流中有一个新的数据到来时,都会导致结果表的更新操作

  • 将结果表转换成流或输出到外部系统,必须采用撤回流或更新插入流的编码方式

  • 为了防止状态无限增长耗尽资源,Flink Table API和SQL可以在表环境中配置状态的生存时间(TTL)

    • 表环境中配置

      1
      2
      3
      4
      5
      TableEnvironment tableEnv = ...
      // 获取表环境的配置
      TableConfig tableConfig = tableEnv.getConfig();
      // 配置状态保持时间
      tableConfig.setIdleStateRetention(Duration.ofMinutes(60));
    • 设置配置项

      1
      2
      3
      TableEnvironment tableEnv = ...
      Configuration configuration = tableEnv.getConfig().getConfiguration();
      configuration.setString("table.exec.state.ttl", "60 min");

案例

1
2
val aggTable = tableEnv.sqlQuery("SELECT user, COUNT(1) FROM clickTable GROUP BY user");
tableEnv.toChangelogStream(aggTable).print("agg");

窗口聚合

概念

  • 窗口本身返回的是就是一个表,所以窗口会出现在FROM后面
  • GROUP BY后面的则是窗口新增的字段window_startwindow_end

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 滚动窗口
val tumbleWindowResultTable = tableEnv
.sqlQuery("SELECT user, COUNT(1) AS cnt, " +
" window_end AS endT " +
"FROM TABLE( " +
" TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
") " +
"GROUP BY user, window_start, window_end "
);
tableEnv.toChangelogStream(tumbleWindowResultTable).print("tumble");
// 滑动窗口
val hopWindowResultTable = tableEnv
.sqlQuery("SELECT user, COUNT(url) AS cnt, " +
" window_end AS endT " +
"FROM TABLE( " +
" HOP( TABLE clickTable, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND)" +
") " +
"GROUP BY user, window_start, window_end "
);
tableEnv.toChangelogStream(hopWindowResultTable).print("hop");
// 累积窗口
val cumulateWindowResultTable = tableEnv
.sqlQuery("SELECT user, COUNT(url) AS cnt, " +
" window_end AS endT " +
"FROM TABLE( " +
" CUMULATE( TABLE clickTable, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND)" +
") " +
"GROUP BY user, window_start, window_end "
);
tableEnv.toChangelogStream(cumulateWindowResultTable).print("cumulate");

开窗聚合

概念

  • 比较特殊的聚合方式,可以针对每一行计算一个聚合值
  • 就好像是在每一行上打开了一扇窗户、收集数据进行统计一样
    • 比如以每一行数据为基准,计算它之前1小时内所有数据的平均值
  • 分组聚合、窗口TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果
  • 开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系
1
2
3
4
5
6
7
SELECT
<聚合函数> OVER (
[PARTITION BY <字段1>[, <字段2>, ...]]
ORDER BY <时间属性字段>
<开窗范围>),
...
FROM ...
  • PARTITION BY(可选)
    • 用来指定分区的键(key),类似于GROUP BY的分组
  • ORDER BY
    • OVER窗口选择的标准可以基于时间也可以基于数量,不论那种定义数据都应该是以某种顺序排列好的,而表中的数据本身是无序的
    • OVER子句中必须用ORDER BY明确地指出数据基于那个字段排序
    • Flink的流处理中,只支持按照时间属性的升序排列,ORDER BY后面的字段必须是定义好的时间属性
  • 开窗范围
    • 到底要扩展多少行来做聚合
    • 这个范围是由BETWEEN <下界> AND <上界> 定义,是“从下界到上界”的范围
    • 上界只能是CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围
    • 一般形式为BETWEEN ... PRECEDING AND CURRENT ROW
    • 应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)
  • 范围间隔
    • 范围间隔以RANGE为前缀,就是基于ORDER BY指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间
      • 例如开窗范围选择当前行之前1小时的数据
  • 行间隔
    • 行间隔以ROWS为前缀,就是直接确定要选多少行,由当前行出发向前选取

案例

  • 方式1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    val overWindowResultTable = tableEnv
    .sqlQuery("SELECT user, " +
    " avg(ts) OVER (" +
    " PARTITION BY user " +
    " ORDER BY et " +
    " ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
    ") AS avg_ts " +
    "FROM clickTable");
    tableEnv.toChangelogStream(overWindowResultTable).print("over");
  • 方式2

    1
    2
    3
    4
    5
    6
    7
    8
    SELECT user, ts,
    COUNT(url) OVER w AS cnt,
    MAX(CHAR_LENGTH(url)) OVER w AS max_url
    FROM EventTable
    WINDOW w AS (
    PARTITION BY user
    ORDER BY ts
    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

TopN

普通实现

  • 通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来;然后再用WHERE子句筛选行号小于等于N的那些行返回
  • 针对Top N这样一个经典应用场景,Flink SQL专门用OVER聚合做了优化实现
  • 要想实现Top N,必须按格式进行定义,否则优化器将无法正常解析
1
2
3
4
5
6
7
8
9
SELECT ...
FROM (
SELECT ...,
ROW_NUMBER() OVER (
[PARTITION BY <字段1>[, <字段1>...]]
ORDER BY <排序字段1> [asc|desc][, <排序字段2> [asc|desc]...]
) AS row_num
FROM ...)
WHERE row_num <= N [AND <其它条件>]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 选取所有用户中浏览量最大的两个
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
val tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE clickTable (" +
" `user` STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'data/flink/input/clicks.csv', " +
" 'format' = 'csv' " +
")"
);
// TODO 普通实现
val resTable1 = tableEnv.sqlQuery("SELECT user, cnt, row_num FROM( " +
"SELECT *, ROW_NUMBER() OVER( " +
"ORDER BY cnt DESC " +
") AS row_num " +
"FROM(SELECT user, COUNT(url) AS cnt FROM clickTable GROUP BY user) " +
") where row_num <= 2 "
);
tableEnv.toChangelogStream(resTable1).print("top2");

窗口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 统计一段时间内的前2名活跃用户
val t1 = "SELECT user, COUNT(url) AS cnt, window_start, window_end " +
"FROM TABLE( " +
"TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND) " +
") GROUP BY user, window_start, window_end ";

val resTable2 = tableEnv.sqlQuery("SELECT user, cnt, row_num FROM( " +
"SELECT *, ROW_NUMBER() OVER( " +
"PARTITION BY window_start, window_end " +
"ORDER BY cnt DESC " +
") AS row_num " +
"FROM(" + t1 + ") " +
") where row_num <= 2 "
);
tableEnv.toDataStream(resTable2).print("top2");

联结查询

常规

概念

  • 在两个动态表的联结中,任何一侧表的插入(INSERT)或更改(UPDATE)操作都会让联结的结果表发生改变
  • 常规联结查询一般是更新(Update)查询
  • 目前仅支持“等值条件”作为联结条件,也就是关键字ON后面必须是判断两表中字段相等的逻辑表达式

等值内联结

1
2
3
4
SELECT *
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id

等值外联结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT *
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.id

SELECT *
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.id

SELECT *
FROM Order
FULL OUTER JOIN Product
ON Order.product_id = Product.id

间隔

间隔联结

  • 只支持具有时间属性的“仅追加”表
  • 返回的是符合约束条件的两条中数据的笛卡尔积
  • “约束条件”除了常规的联结条件外,还多了一个时间间隔的限制
1
2
3
4
SELECT *
FROM Order o, Shipment s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
  • 时间间隔定义方式
    • ltime = rtime
    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

TTL

环境

  • 默认不会过期
1
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));

内连接

  • 左表:OnCreateAndWrite
  • 右表:OnCreateAndWrite
1
2
3
4
tableEnv
.sqlQuery("select t1.id,t1.vc,t2.id,t2.name from t1 join t2 on t1.id=t2.id")
.execute()
.print();

左外连接

  • 左表:OnReadAndWrite
  • 右表:OnCreateAndWrite
1
2
3
4
tableEnv
.sqlQuery("select t1.id,t1.vc,t2.id,t2.name from t1 left join t2 on t1.id=t2.id")
.execute()
.print();

全外连接

  • 左表:OnReadAndWrite
  • 右表:OnReadAndWrite
1
2
3
4
tableEnv
.sqlQuery("select t1.id,t1.vc,t2.id,t2.name from t1 full join t2 on t1.id=t2.id")
.execute()
.print();

LookUp表

概念

  • 源表后面进来的数据只会关联当时维表的最新信息,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 构建LookUp表(维度表)
tableEnv.executeSql(
""
+ "create TEMPORARY table base_dic( "
+ " `dic_code` String, "
+ " `dic_name` String, "
+ " `parent_code` String, "
+ " `create_time` String, "
+ " `operate_time` String "
+ ") WITH ( "
+ " 'connector' = 'jdbc', "
+ " 'url' = 'jdbc:mysql://bigdata1:3306/gmall', "
+ " 'table-name' = 'base_dic', "
+ " 'driver' = 'com.mysql.cj.jdbc.Driver', "
// 维表数据不变or会改变,但是数据的准确度要求不高
// 默认不缓存
// 缓存10条&1小时(需要一起写)
+ " 'lookup.cache.max-rows' = '10', "
+ " 'lookup.cache.ttl' = '1 hour', "
+ " 'username' = 'root', "
+ " 'password' = 'root' "
+ ")");

// 构建实时表
val waterSensorDS =
env.socketTextStream("hadoop102", 8888)
.map(
line -> {
String[] split = line.split(",");
return new WaterSensor(
split[0], Double.parseDouble(split[1]), Long.parseLong(split[2]));
});
Table table =
tableEnv.fromDataStream(waterSensorDS, $("id"), $("vc"), $("ts"), $("pt").proctime());

tableEnv.createTemporaryView("t1", table);

// 使用事实表关联维表并打印结果
tableEnv
.sqlQuery(
""
+ "select "
+ " t1.id, "
+ " t1.vc, "
+ " dic.dic_name "
+ "from t1 "
// 需要处理时间
+ "join base_dic FOR SYSTEM_TIME AS OF t1.pt as dic "
+ "on t1.id=dic.dic_code")
.execute()
.print();

窗口联结

  • Flink SQL1.13还不支持窗口联结

函数

内置

标量函数

  • Comparison Functions

    • 比较表达式,返回布尔类型
    • </>/=等符号连接两个值,也可用关键字定义的某种判断
      • value1 = value2:判断两个值相等
      • value1 <> value2:判断两个值不相等
      • value IS NOT NULL:判断value不为空
  • Logical Functions

    • 逻辑表达式,返回布尔类型
    • AND/``OR/NOT将布尔类型的值连接起来,也可用判断语句IS/IS NOT进行判断
  • Arithmetic Functions

    • 算术计算,包括用算术符号连接的运算和复杂的数学运算
      • numeric1 + numeric2:两数相加
      • POWER(numeric1, numeric2):幂运算,取数numeric1的numeric2次方
      • RAND():返回(0.0, 1.0)区间内的一个double类型的伪随机数
  • String Functions

    • 字符串处理
      • string1 || string2:两个字符串的连接
      • UPPER(string):将字符串string转为全部大写
      • CHAR_LENGTH(string):计算字符串string的长度
  • Temporal Functions

    • 时间相关操作
      • DATE string:按格式”yyyy-MM-dd“解析字符串string

        • 返回类型为SQL Date
      • TIMESTAMP string:按格式”yyyy-MM-dd HH:mm:ss[.SSS]“解析

        • 返回类型为SQL timestamp
      • CURRENT_TIME:返回本地时区的当前时间

        • 类型为SQL time(与LOCALTIME等价)
      • INTERVAL string range:返回一个时间间隔

        • string表示数值
        • range可以是DAY/MINUTE/DAT TO HOUR等单位,也可以是YEAR TO MONTH这样的复合单位
          • “2年10个月”可以写成:INTERVAL '2-10' YEAR TO MONTH

聚合函数

  • COUNT(*):统计个数,返回所有行的数量
  • SUM([ ALL | DISTINCT ] expression):对某个字段进行求和操作
    • 默认情况下省略了关键字ALL,表示对所有行求和
    • 如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次
  • RANK() :返回当前值在一组值中的排名
  • ROW_NUMBER():对一组值排序后,返回当前值的行号
    • RANK()的功能相似

自定义

流程

  • 注册函数

    1
    tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
  • 使用Table API调用函数

    • 调用注册函数

      1
      tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
    • 调用不注册函数

      1
      tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField")))
  • 使用SQL中调用函数

    1
    tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

标量函数

  • Scalar Functions:将输入的标量值转换成一个新的标量值
  • 必须定义的方法
    • eval():求值方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) throws Exception {
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
val tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE clickTable (" +
" `user` STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'data/flink/input/clicks.csv', " +
" 'format' = 'csv' " +
")"
);
// TODO 注册自定义标量函数
tableEnv.createTemporarySystemFunction("hashFun", HashFunctionDemo.class);
// TODO 调用UDF查询
val resTable = tableEnv.sqlQuery("SELECT user, hashFun(user) FROM clickTable");
tableEnv.toDataStream(resTable).print("hash");
env.execute();
}

// TODO 自定义实现ScalarFunction
public static class HashFunctionDemo extends ScalarFunction {

// 求值方法
public int eval(String s) {
return s.hashCode();
}

}

表函数

  • Table Functions:将标量值转换成一个或多个新的行数据,也就是扩展成一个表
  • 可以认为就是返回一个表的函数,这是一个“一对多”的转换关系
  • 泛型T:表函数返回数据的类型
  • 必须定义的方法
    • eval():求值方法
  • 使用LATERAL TABLE(<TableFunction>)生成扩展的“侧向表”,然后与原始表进行联结
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
...
// TODO 注册自定义表函数
tableEnv.createTemporarySystemFunction("splitFun", SplitFun.class);
// TODO 调用UDF查询
val resTable = tableEnv.sqlQuery("SELECT user, url, word, length " +
"FROM clickTable, LATERAL TABLE(splitFun(url)) AS T(word, length)"
);
tableEnv.toDataStream(resTable).print("split");
env.execute();
}

// TODO 自定义实现TableFunction
public static class SplitFun extends TableFunction<Tuple2<String, Integer>> {

// 求值方法
public void eval(String s) {
val fields = s.split("\\?");
for (val field : fields) {
collect(Tuple2.of(field, field.length()));
}
}

}

聚合函数

  • Aggregate Functions:将多行数据里的标量值转换成一个新的标量值
  • 泛型
    • T:聚合输出的结果类型
    • ACC:聚合的中间状态类型
  • 必须定义的方法
    • createAccumulator():创建累加器
    • accumulate():聚合计算
    • getValue():返回最终结果
  • 可选的方法
    • merge():对会话窗口进行聚合时必须要有
    • retract():数据撤回,聚合函数用在OVER窗口聚合时必须要有
    • resetAccumulator():重置累加器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static void main(String[] args) throws Exception {
...
// TODO 注册自定聚合函数
tableEnv.createTemporarySystemFunction("weightedAvgFun", WeightedAvgFun.class);
// TODO 调用UDF查询
val resTable = tableEnv.sqlQuery("SELECT user, weightedAvgFun(ts, 1) as w_avg " +
"FROM clickTable GROUP BY user"
);
tableEnv.toChangelogStream(resTable).print("split");
env.execute();
}

// TODO 定义累加器类型
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WeightedAvgAcc {
private long sum = 0;
private int count = 0;
}

// TODO 自定义实现AggregateFunction
// 计算加权平均值
public static class WeightedAvgFun extends AggregateFunction<Long, WeightedAvgAcc> {

@Override
public Long getValue(WeightedAvgAcc accumulator) {
return accumulator.getCount() == 0 ? null : accumulator.getSum() / accumulator.getCount();
}

@Override
public WeightedAvgAcc createAccumulator() {
return new WeightedAvgAcc();
}

// TODO 累加器计算方法
public void accumulate(WeightedAvgAcc accumulator, Long iValue, Integer iWeight) {
accumulator.setSum(accumulator.getSum() + iValue * iWeight);
accumulator.setCount(iWeight);
}

}

表聚合函数

  • Table Aggregate Functions:将多行数据里的标量值转换成一个或多个新的行数据

  • 泛型

    • T:聚合输出的结果类型
    • ACC:聚合的中间状态类型
  • 必须定义的方法

    • createAccumulator():创建累加器
    • accumulate():聚合计算
    • emitValue():返回最终结果
  • 可选的方法

    • merge():对会话窗口进行聚合时必须要有
    • retract():数据撤回,聚合函数用在OVER窗口聚合时必须要有
    • resetAccumulator():重置累加器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public static void main(String[] args) throws Exception {
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
val tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE clickTable (" +
" `user` STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'data/flink/input/clicks.csv', " +
" 'format' = 'csv' " +
")"
);
// TODO 注册自定表聚合函数
tableEnv.createTemporarySystemFunction("top2Fun", Top2Fun.class);
// TODO 调用UDF查询
val windowAggQuery = "SELECT user, COUNT(url) AS cnt, window_start, window_end " +
"FROM TABLE( " +
"TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND) " +
") GROUP BY user, window_start, window_end ";
val aggTable = tableEnv.sqlQuery(windowAggQuery);
// 使用Table API进行查询
val resTable = aggTable
.groupBy($("window_end"))
.flatAggregate(call("top2Fun", $("cnt")).as("value", "rank"))
.select($("window_end"), $("value"), $("rank"));
tableEnv.toChangelogStream(resTable).print("tableAgg");
env.execute();
}

// TODO 单独定义累加器类型, 包含Top2数据
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Top2Acc {
private Long first;
private Long second;
}

// TODO 自定义实现TableAggregateFunction
public static class Top2Fun extends TableAggregateFunction<Tuple2<Long, Integer>, Top2Acc> {

@Override
public Top2Acc createAccumulator() {
return new Top2Acc(Long.MIN_VALUE, Long.MIN_VALUE);
}

public void accumulate(Top2Acc accumulator, Long value) {
if (value > accumulator.getFirst()) {
accumulator.setSecond(accumulator.getFirst());
accumulator.setFirst(value);
} else if (value > accumulator.getSecond()) {
accumulator.setSecond(value);
}
}

// TODO 输出结果
public void emitValue(Top2Acc accumulator, Collector<Tuple2<Long, Integer>> out) {
if (accumulator.getFirst() != Long.MIN_VALUE) {
out.collect(Tuple2.of(accumulator.getFirst(), 1));
}
if (accumulator.getSecond() != Long.MIN_VALUE) {
out.collect(Tuple2.of(accumulator.getSecond(), 2));
}
}

}

SQL客户端

概念

  • 提供了一个命令行交互界面(CLI)工具来进行Flink程序的编写、测试和提交

流程

  1. 启动本地集群

    1
    ./bin/start-cluster.sh
  2. 启动Flink SQL客户端

    1
    ./bin/sql-client.sh
  3. 设置运行模式

    1. 设置表环境的运行时模式

      • 流处理,默认
      • 批处理
      1
      Flink SQL> SET 'execution.runtime-mode' = 'streaming';
    2. 设置客户端的执行结果模式

      • table,默认
      • changelog
      • tableau:经典的可视化表模式
      1
      Flink SQL> SET 'sql-client.execution.result-mode' = 'table';
    3. 其他设置,如空闲状态生存时间

      1
      Flink SQL> SET 'table.exec.state.ttl' = '1000';
  4. 执行SQL查询

连接到外部系统

控制台

1
2
3
4
5
6
CREATE TABLE ResultTable (
user STRING,
cnt BIGINT
WITH (
'connector' = 'print'
);

Kafka

依赖

  • 连接器

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
  • 对应格式

    • 根据Kafka连接器中配置的格式,可能需要引入对应的依赖支持
    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
    </dependency>

创建表

  • 只能输出仅追加模式的流
  • 如果将有更新操作(比如分组聚合)的结果表写入Kafka,就会因为Kafka无法识别撤回(retract)或更新插入(upsert)消息而导致异常
1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE KafkaTable (
`user` STRING,
`url` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

Upsert Kafka

  • 支持以更新插入(UPSERT)的方式向Kafka的topic中读写数据
  • Upsert Kafka连接器处理的是更新日志(changlog)流
  • 字段中需要用PRIMARY KEY指定主键,并且在WITH子句中分别指定key和value的序列化格式
  • 作为TableSource
    • 读取topic中的数据(key, value)
      • Value不为空:对当前key的数据值的更新/插入
      • Value为空:对当前key的数据值的删除
  • 作为TableSink
    • TableSource的反向操作
1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);

文件系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE MyTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
-- 连接器类型
'connector' = 'filesystem',
-- 文件路径
'path' = '...',
-- 文件格式
'format' = '...'
)

JDBC

依赖

  • 连接器

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
  • 驱动器

    1
    2
    3
    4
    5
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.38</version>
    </dependency>

创建表

  • 作为TableSink向数据库写入数据时,运行的模式取决于创建表的DDL是否定义了主键
    • 有主键:以更新插入(Upsert)模式运行
    • 无主键:以仅追加(Append)模式运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 创建一张连接到 MySQL的 表
# MyTable: Flink表环境中的表
CREATE TABLE MyTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
# users: MySQL中的表
'table-name' = 'users'
);

Elasticsearch

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

创建表

  • 作为TableSink向ES写入数据时,运行的模式取决于创建表的DDL是否定义了主键
    • 有主键:以更新插入(Upsert)模式运行
    • 无主键:以仅追加(Append)模式运行
1
2
3
4
5
6
7
8
9
10
11
12
-- 创建一张连接到 Elasticsearch的 表
CREATE TABLE MyTable (
user_id STRING,
user_name STRING
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
);

HBase

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

创建表

  • 除了所有ROW类型的字段(对应着HBase中的family),还应有一个原子类型的字段,会被识别为HBase的rowkey
  • 在表中这个字段可以任意取名,不一定非要叫rowkey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 创建一张连接到 HBase的 表
CREATE TABLE MyTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);

-- 假设表T的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO MyTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;

Hive

概念

  • Flink提供了“Hive目录”(HiveCatalog)功能,允许使用Hive的“元存储”(Metastore)来管理Flink的元数据
  • Metastore可以作为一个持久化的目录使用HiveCatalog可以跨会话存储Flink特定的元数据
    • 在HiveCatalog中执行执行创建Kafka表或者ElasticSearch表,可以把元数据持久化存储在Hive的Metastore中
    • 对于不同的作业会话就不需要重复创建,直接在SQL查询中重用就可以
  • Flink可以作为读写Hive表的替代分析引擎
    • 在Hive中进行批处理会更加高效
    • 有了连续在Hive中读写数据、进行流处理的能力
  • HiveCatalog被设计为“开箱即用”,与现有的Hive配置完全兼容,不需要做任何的修改与调整就可以直接使用
  • 只有Blink的计划器(planner)提供了Hive集成的支持

环境变量

1
export HADOOP_CLASSPATH=`hadoop classpath`

依赖

  • 不建议把这些依赖打包到结果jar文件中
  • 建议在运行时的集群环境中为不同的Hive版本添加不同的依赖支持
  • 具体版本对应的依赖关系可以查询官网
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- Flink 的Hive连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Hive 依赖 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>

连接

  • 代码方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);
    String name = "myhive";
    String defaultDatabase = "mydatabase";
    String hiveConfDir = "/opt/hive-conf";
    // 创建一个HiveCatalog,并在表环境中注册
    HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    tableEnv.registerCatalog("myhive", hive);
    // 使用HiveCatalog作为当前会话的catalog
    tableEnv.useCatalog("myhive");
  • SQL客户端方式

    1
    2
    3
    4
    5
    Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
    [INFO] Execute statement succeed.

    Flink SQL> use catalog myhive;
    [INFO] Execute statement succeed.

方言设置

  • SQL方式

    1
    set table.sql-dialect=hive;
  • 配置文件方式:sql-cli-defaults.yaml

    1
    2
    3
    4
    5
    6
    7
    execution:
    planner: blink
    type: batch
    result-mode: table

    configuration:
    table.sql-dialect: hive
  • TableAPI方式

    1
    2
    3
    4
    // 配置hive方言
    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    // 配置default方言
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- 设置SQL方言为hive,创建Hive表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- 设置SQL方言为default,创建Kafka表
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND – 定义水位线
) WITH (...);

-- 将Kafka中读取的数据经转换后写入Hive
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

CEP

概念

复杂事件处理

概念

  • Complex Event Processing,Flink提供专门用于处理复杂事件的库,主要目的是在无界流中检测出特定的数据组合
  • 可在事件流里检测到特定的事件组合并进行处理,如连续登录失败,或订单支付超时等
  • 把事件流中的一个个简单事件通过一定的规则匹配组合起来,基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出
  • CEP是针对流处理而言的,分析的是低延迟、频繁产生的事件流
  • 步骤
    1. 定义一个匹配规则
    2. 将匹配规则应用到事件流上,检测满足规则的复杂事件
    3. 对检测到的复杂事件进行处理,得到结果进行输出

模式

  • CEP所定义的匹配规则,主要内容
    • 每个简单事件的特征
    • 简单事件之间的组合关系
  • CEP其实是在流上进行模式匹配
  • 根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件
  • 模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,会导致模式匹配超时(timeout)

应用场景

  • 风险控制
  • 用户画像
  • 运维监控

入门案例

依赖

  • 为了精简和避免依赖冲突,Flink会保持尽量少的核心依赖。所以核心依赖中并不包括任何的连接器(conncetor)和库
  • 如果想要在Flink集群中提交运行CEP作业,应该向Flink SQL那样将依赖的jar包放在/lib目录下
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>1.14.5</version>
</dependency>

案例

  • 需求:检测用户行为,如果连续三次登录失败,就输出报警信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 获取登录数据流
val loginEventStream = env
.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
// TODO 定义模式
val pattern = Pattern
// 第一次登录失败事件
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) {
return value.getEventType().equals("fail");
}
})
// 第二次登录失败事件
.next("second")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) {
return value.getEventType().equals("fail");
}
})
// 第三次登录失败事件
.next("third")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) {
return value.getEventType().equals("fail");
}
});
// TODO 将模式应用到数据流,检测复杂事件
val patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), pattern);
// TODO 将检测到的复杂事件提取出来,进行处理得到报警信息输出
patternStream
.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> pattern) {
// 提取事件
val first = pattern.get("first").get(0);
val second = pattern.get("second").get(0);
val third = pattern.get("third").get(0);
return first.getUserId() + " 连续三次登录失败!登录时间:" +
first.getTimestamp() + ", " +
second.getTimestamp() + ", " +
third.getTimestamp();
}
}).
print();
env.execute();

模式API

个体模式

概念

  • 每个简单事件的匹配规则
  • 个体模式可包括
    • 单例模式singleton:默认,匹配接收一个事件
    • 循环模式looping:对同样特征的事件可以匹配多次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Pattern
// 第一次登录失败事件
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) {
return value.getEventType().equals("fail");
}
})
// 第二次登录失败事件
.next("second")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) {
return value.getEventType().equals("fail");
}
})

量词

  • 指定个体模式循环的次数
  • oneOrMore()
    • 匹配事件出现一次或多次
    • a.oneOrMore()表示可匹配1个或多个a的事件组合
    • 有时会用a+来简单表示
  • times(times)
    • 匹配事件发生特定次数(times)
    • a.times(3)表示aaa
  • times(fromTimes, toTimes)
    • 指定匹配事件出现的次数范围
      • fromTimes:最小次数
      • toTimes:最大次数
    • a.times(2, 4)可匹配aaaaaaaaa
  • greedy()
    • 只能用在循环模式后,总是尽可能多地去匹配
    • a.times(2, 4).greedy()若出现了连续4个a,会直接把aaaa检测出来进行处理,其他任意2个a不算匹配事件
  • optional()
    • 使当前模式成为可选的,可以满足这个匹配条件,也可以不满足
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 匹配事件出现4次
pattern.times(4);
// 匹配事件出现4次,或者不出现
pattern.times(4).optional();
// 匹配事件出现2, 3 或者4次
pattern.times(2, 4);
// 匹配事件出现2, 3 或者4次,并且尽可能多地匹配
pattern.times(2, 4).greedy();
// 匹配事件出现2, 3, 4次,或者不出现
pattern.times(2, 4).optional();
// 匹配事件出现2, 3, 4次,或者不出现;并且尽可能多地匹配
pattern.times(2, 4).optional().greedy();
// 匹配事件出现1次或多次
pattern.oneOrMore();
// 匹配事件出现1次或多次,并且尽可能多地匹配
pattern.oneOrMore().greedy();
// 匹配事件出现1次或多次,或者不出现
pattern.oneOrMore().optional();
// 匹配事件出现1次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();
// 匹配事件出现2次或多次
pattern.timesOrMore(2);
// 匹配事件出现2次或多次,并且尽可能多地匹配
pattern.timesOrMore(2).greedy();
// 匹配事件出现2次或多次,或者不出现
pattern.timesOrMore(2).optional()
// 匹配事件出现2次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();

条件

  • 选取事件的规则

  • CEP会按照这个规则对流中的事件进行筛选,判断是否接受当前的事件

  • 限定子类型

    • 为当前模式增加子类型限制条件
    1
    2
    3
    // SubEvent是流中数据类型Event的子类型
    // 只有当事件是SubEvent类型时,才可以满足当前模式pattern的匹配条件
    pattern.subtype(SubEvent.class);
  • 简单条件

    • 只能基于当前事件做判断
    • 本质是filter操作
    1
    2
    3
    4
    5
    6
    pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
    return value.user.startsWith("A");
    }
    });
  • 迭代条件

    • 依靠之前事件做判断
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    middle.oneOrMore()
    .where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
    // 事件中的user必须以A开头
    if (!value.user.startsWith("A")) {
    return false;
    }

    int sum = value.amount;
    // 获取当前模式之前已经匹配的事件,求所有事件amount之和
    for (Event event : ctx.getEventsForPattern("middle")) {
    sum += event.amount;
    }
    // 在总数量小于100时,当前事件满足匹配规则,可以匹配成功
    return sum < 100;
    }
    });
  • 组合条件

    • 独立定义多个条件,然后在外部把它们连接起来,构成“组合条件”
    1
    2
    3
    4
    5
    6
    7
    pattern.subtype(SubEvent.class)
    .where(new SimpleCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
    return ... // some condition
    }
    });
  • 终止条件

    • 遇到某个特定事件时当前模式就不再继续循环匹配
    • 调用模式对象的until()方法,传入一个IterativeCondition作为参数
    • 终止条件只与oneOrMore()oneOrMore().optional()结合使用

组合模式

概念

  • 将多个个体模式组合起来的完整模式,也叫作“模式序列”
1
2
3
4
5
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...)
.next("next").where(...)
.followedBy("follow").where(...)
...

初始模式

  • 所有的组合模式都必须以一个“初始模式”开头
  • 传入的String类型的参数就是模式的名称
  • begin方法传入的类型参数是模式要检测流中事件的基本类型
1
Pattern<Event, ?> start = Pattern.<Event>begin("start");

近邻条件

  • 模式之间的组合是通过一些“连接词”方法实现,这些连接词指明了先后事件之间有着怎样的近邻关系

  • 严格近邻

    • 匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件
    • 代码中对应的就是Pattern的next()方法
  • 宽松近邻

    • 默认
    • 只关心事件发生的顺序
    • 代码中对应followedBy()方法

    严格近邻与宽松近邻

  • 非确定性宽松近邻

    • 可以重复使用之前已经匹配过的事件
    • 代码中对应followedByAny()方法

    非确定性宽松近邻

其他限制条件

  • notNext()
    • 前一个模式匹配到的事件后面,不能紧跟着某种事件
  • notFollowedBy()
    • 前一个模式匹配到的事件后面,不会出现某种事件
    • 流数据不停地到来,永远不能保证之后“不会出现某种事件”
    • 一个模式序列不能以notFollowedBy()结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”
  • within()
    • 传入时间参数
    • 模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才有效
    • 一个模式序列中只能有一个时间限制,位置不限
    • 多次调用则会以最小的那个时间间隔为准
1
2
3
4
5
6
7
8
9
10
11
12
// 严格近邻条件
Pattern<Event, ?> strict = start.next("middle").where(...);
// 宽松近邻条件
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// 非确定性宽松近邻条件
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// 不能严格近邻条件
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// 不能宽松近邻条件
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// 时间限制条件
middle.within(Time.seconds(10));

循环模式中的近邻条件

  • 默认内部采用的是宽松近邻

  • consecutive()

    • 为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的
    • 需要与循环量词times()oneOrMore()配合使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 1. 定义Pattern,登录失败事件,循环检测3次
    Pattern<LoginEvent, LoginEvent> pattern = Pattern
    .<LoginEvent>begin("fails")
    .where(new SimpleCondition<LoginEvent>() {
    @Override
    public boolean filter(LoginEvent loginEvent) throws Exception {
    return loginEvent.eventType.equals("fail");
    }
    }).times(3).consecutive();
  • allowCombinations()

    • 为循环模式中的事件指定非确定性宽松近邻条件
    • 需要调用allowCombinations()方法实现,效果与followedByAny()相同

模式组

概念

  • 一般来说,代码中定义的模式序列就是在业务逻辑中匹配复杂事件的规则
  • 在有些非常复杂的场景中可能需要划分多个“阶段”,每个“阶段”又有一连串的匹配规则
  • Flink CEP允许以“嵌套”的方式来定义模式

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 以模式序列作为初始模式
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start_start").where(...)
.followedBy("start_middle").where(...)
);
// 在start后定义严格近邻的模式序列,并重复匹配两次
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...)
.followedBy("next_middle").where(...)
).times(2);
// 在start后定义宽松近邻的模式序列,并重复匹配一次或多次
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...)
.followedBy("followedby_middle").where(...)
).oneOrMore();
//在start后定义非确定性宽松近邻的模式序列,可以匹配一次,也可以不匹配
Pattern<Event, ?> nonDeterminRelaxed = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...)
.followedBy("followedbyany_middle").where(...)
).optional();

匹配后跳过

概念

  • 由于有循环模式和非确定性宽松近邻的存在,同一个事件有可能会重复利用,被分配到不同的匹配结果中

  • CEP中,提供了模式的“匹配后跳过策略”(After Match Skip Strategy),专门用来精准控制循环模式的匹配结果

  • 可以在Pattern的初始模式定义中,作为begin()的第二个参数传入

    1
    2
    Pattern.begin("start", AfterMatchSkipStrategy.noSkip())
    .where(...)

策略

  • 设事件序列“a a a b”
  • 若检测到6个匹配结果:(a1 a2 a3 b)(a1 a2 b)(a1 b)(a2 a3 b)(a2 b),(a3 b)
  • 不跳过NO_SKIP
    • 默认,所有可能的匹配都会输出
  • 跳至下一个SKIP_TO_NEXT
    • 找到一个a1开始的最大匹配之后,跳过a1开始的所有其他匹配,直接从下一个a2开始匹配
    • 得到(a1 a2 a3 b)(a2 a3 b)(a3 b)
    • greedy()效果是相同的
  • 跳过所有子匹配SKIP_PAST_LAST_EVENT
    • 这是最为精简的跳过策略
    • 找到a1开始的匹配(a1 a2 a3 b)后,直接跳过所有a1直到a3开头的匹配
    • 相当于把这些子匹配都跳过
    • 得到(a1 a2 a3 b)
  • 跳至第一个SKIP_TO_FIRST[a]
    • 传入一个参数,指明跳至哪个模式的第一个匹配事件
    • 找到a1开始的匹配(a1 a2 a3 b)后,跳到以最开始一个a1为开始的匹配
    • 相当于只留下a1开始的匹配
    • 得到(a1 a2 a3 b)(a1 a2 b)(a1 b)
  • 跳至最后一个SKIP_TO_LAST[a]
    • 传入一个参数,指明跳至哪个模式的最后一个匹配事件
    • 找到a1开始的匹配(a1 a2 a3 b)后,跳过所有a1a2开始的匹配,跳到以最后一个a3为开始的匹配
    • 得到(a1 a2 a3 b)(a3 b)

模式的检测处理

模式应用到流

  • 调用CEP类的静态方法pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入

    1
    2
    3
    4
    DataStream<Event> inputStream = ...
    Pattern<Event, ?> pattern = ...

    PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
  • 默认情况下采用事件时间语义,事件以各自的时间戳进行排序

    1
    2
    3
    // 可选的事件比较器
    EventComparator<Event> comparator = ...
    PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

处理匹配事件

Select

  • 匹配事件的选择提取
  • 从PatternStream中直接把匹配的复杂事件提取出来
  • PatternSelectFunction
    • 返回事件的列表(List)
  • PatternFlatSelectFunction
    • 没有返回值,通过调用收集器collet()方法实现多次发送输出数据

Process

  • 官方推荐
  • 匹配事件的通用处理
  • 可以访问一个上下文(Context),进行更多的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// TODO 定义模式
val pattern = Pattern
// 第一次登录失败事件
.<LoginEvent>begin("fail")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) {
return value.getEventType().equals("fail");
}
})
// 严格紧邻的三次登录失败
.times(3)
.consecutive();
// TODO 将模式应用到数据流,检测复杂事件
val patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), pattern);
// TODO 将检测到的复杂事件提取出来,进行处理得到报警信息输出
patternStream
.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out) throws Exception {
// TODO 提取三次登录失败事件
val first = match.get("fail").get(0);
val second = match.get("fail").get(1);
val third = match.get("fail").get(2);
out.collect(
first.getUserId() + " 连续三次登录失败!登录时间:" +
first.getTimestamp() + ", " +
second.getTimestamp() + ", " +
third.getTimestamp()
);
}
})
.print();
env.execute();

处理超时事件

概念

  • “超时失败”跟真正的“匹配失败”不同,其实是“部分成功匹配”
  • 往往不应该直接丢弃,而要输出一个提示或报警信息。这就需要捕获并处理超时事件

使用PatternProcessFunction侧输出流

  • 实现CEP提供专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler
  • 将超时的部分匹配事件输出到标签所标识的侧输出流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public static void main(String[] args) throws Exception {
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取订单事件流,并提取时间戳、生成水位线
val orderEventStream = env
.fromElements(
new OrderEvent("user_1", "order_1", "create", 1000L),
new OrderEvent("user_2", "order_2", "create", 2000L),
new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<OrderEvent>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
// TODO 定义模式
val pattern = Pattern
.<OrderEvent>begin("create")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return value.getEventType().equals("create");
}
})
.followedBy("pay")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return value.getEventType().equals("pay");
}
})
// TODO 限制在15分钟之内
.within(Time.minutes(15));
// TODO 将Pattern应用到流上
val patternStream = CEP.pattern(orderEventStream.keyBy(OrderEvent::getOrderId), pattern);
// TODO 定义侧输出流
val timeoutTag = new OutputTag<String>("timeout") {
};
// TODO 提取出完全匹配和超市部分匹配的事件
val res = patternStream.process(new OrderPayMatch());
res.print("payed");
res.getSideOutput(timeoutTag).print("timeout");
env.execute();
}

public static class OrderPayMatch extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {

@Override
public void processMatch(Map<String, List<OrderEvent>> match, Context ctx, Collector<String> out) throws Exception {
// 获取当前支付事件
val payEvent = match.get("pay").get(0);
out.collect("用户 " + payEvent.getUserId() + " 订单 " + payEvent.getOrderId() + " 已支付");
}

@Override
public void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context ctx) throws Exception {
val createEvent = match.get("create").get(0);
val timeoutTag = new OutputTag<String>("timeout") {};
ctx.output(timeoutTag, "用户 " + createEvent.getUserId() + " 订单 " + createEvent.getOrderId() + " 未支付");
}

}

使用PatternTimeoutFunction

  • 老版本
  • 无法直接处理超时事件
  • 通过调用PatternStream的select()方法时多传入一个PatternTimeoutFunction参数来实现

处理迟到数据

概念

  • CEP中沿用了通过设置水位线(watermark)延迟来处理乱序数据的做法
  • 一个事件到来时并不会立即做检测匹配处理,而是先放入一个缓冲区(buffer)
    • 缓冲区内的数据,会按照时间戳由小到大排序
  • 当一个水位线到来时,就会将缓冲区中所有时间戳小于水位线的事件依次取出,进行检测匹配
  • 可以基于PatternStream直接调用sideOutputLateData()方法,传入一个OutputTag,将迟到数据放入侧输出流另行处理

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PatternStream<Event> patternStream = CEP.pattern(input, pattern);

// 定义一个侧输出流的标签
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
// 将迟到数据输出到侧输出流
.sideOutputLateData(lateDataOutputTag)
// 处理正常匹配数据
.select(new PatternSelectFunction<Event, ComplexEvent>() {...}
);

// 从结果中提取侧输出流
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

状态机

概念

  • CEP的底层工作原理其实与正则表达式是一致的,是一个“非确定有限状态自动机”(Nondeterministic Finite Automaton,NFA)

状态机

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public static void main(String[] args) throws Exception {
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取登录事件流,这里与时间无关,就不生成水位线了
val stream = env
.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
)
.keyBy(LoginEvent::getUserId);
// TODO 将数据依次输入状态机进行处理
stream
.flatMap(new StateMachineMapper())
.print("warning");
env.execute();
}

// TODO 实现自定义RichFlatMapFunction
public static class StateMachineMapper extends RichFlatMapFunction<LoginEvent, String> {
// 声明状态机当前的状态
private ValueState<State> currState;

@Override
public void open(Configuration parameters) throws Exception {
currState = getRuntimeContext().getState(new ValueStateDescriptor<>("state", State.class));
}

@Override
public void flatMap(LoginEvent value, Collector<String> out) throws Exception {
// 如果状态为空进行初始化
State state = currState.value();
if (state == null) {
state = State.Initial;
}
val nextState = state.transition(value.getEventType());
switch (nextState) {
case Matched: {
// 判断当前状态的特殊情况,直接跳转
// 检测到匹配,输出报警
out.collect(value.getUserId() + "连续三次登录失败");
break;
}
case Terminal: {
// 直接将状态更新为初始状态
currState.update(State.Initial);
break;
}
default: {
// 状态覆盖跳转
currState.update(nextState);
}
}
}

}

// TODO 实现状态机
public enum State {

// 匹配失败,终止
Terminal,
// 匹配成功
Matched,
// TODO 传入基于该状态可以进行的一系列状态转移
// S2状态
S2(new Transition("fail", Matched), new Transition("success", Terminal)),
// S1状态
S1(new Transition("fail", S2), new Transition("success", Terminal)),
// 初始状态
Initial(new Transition("fail", S1), new Transition("success", Terminal));

// 当前状态的转移规则
private final Transition[] transitions;

State(Transition... transitions) {
this.transitions = transitions;
}

// TODO 状态转移方法
public State transition(String eventType) {
for (Transition transition : transitions) {
if (transition.getEventType().equals(eventType)) {
return transition.getTargetState();
}
}
// 回到初始状态
return Initial;
}

}

// TODO 定义状态转移类,包括两个属性:当前事件类型和目标状态
@Data
@AllArgsConstructor
public static class Transition {
// 触发状态转移的当前事件类型
private final String eventType;
// 转移的目标状态
private final State targetState;
}

调优

资源配置调优

概念

  • 在一定范围内,增加资源的分 配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略

  • 提交方式主要是yarn-per-job,资源的分配在使用脚本提交Flink任务时进行指定

  • 标准的Flink任务提交脚本(Generic CLI模式)

    1
    2
    3
    4
    5
    bin/flink run \
    -t yarn-per-job \ -d \
    -p 5 \ 指定并行度
    -Dyarn.application.queue=test \ 指定 yarn 队列 -Djobmanager.memory.process.size=1024mb \ 指定 JM 的总进程大小 -Dtaskmanager.memory.process.size=1024mb \ 指定每个 TM 的总进程大小 -Dtaskmanager.numberOfTaskSlots=2 \ 指定每个 TM 的 slot 数
    -c com.atguigu.flink.tuning.UvDemo \ /opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

内存

  • 1Core : 4G
  • jobmanager.memory.process.size:2~4G
  • taskmanager.memory.process.size:2~8G
    • Yarn中有限定容器最大内存为8G,容器总内存超过8G需要修改该配置
  • taskmanager.numberOfTaskSlots:1Core : 1~2Slot

并行度

全局

  • 开发完成后先进行压测。任务并行度给 10 以下,测试单个并行度的处理上限
  • 总QPS/单并行度的处理能力 = 并行度
  • 最好根据高峰期的 QPS 压测,并行度*1.2倍

Source端

  • Kafka:Source 的并行度设置为Kafka对应Topic的分区数
  • 如果消费速度仍跟不上数据生产速度,考虑扩大Kafka分区,同时调大并行度等于分区数

Transform端

  • Keyby前:并行度可以和source保持一致
  • Keyby后
    • max(128, 比(并行度*1.5)大的2^n)
    • 并发较大:设置并行度为2的整数次幂,如128、256、512
    • 并发较小:并行度不一定需要设置成2的整数次幂

Sink端

  • 可根据Sink端的数据量及下游的服务抗压能力进行评估
  • 如果Sink端是Kafka,可以设为Kafka对应Topic的分区数

Kafka Source

  • 动态发现分区
  • 从Kafka生成wartmark
  • 设置空闲等待
  • 设置offset消费策略

状态&Checkpoint

RocksDB大状态

概念

  • RocksDB基于LSM Tree实现的(类似 HBase),写数据都是先缓存到内存中, 所以 RocksDB的写请求效率比较高。RocksDB使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中blockcache中查找,如果内存中没有再去磁盘中查询
  • 状态大小仅受可用磁盘空间量的限制,性能瓶颈主要在于RocksDB对磁盘的读请求,每次读写操作都必须对数据进行反序列化或者序列化
  • 当处理性能不够时,仅需要横向扩展并行度即可提高整个Job的吞吐量

优化

  • 通过调整配置项taskmanager.memory.managed.sizetaskmanager.memory.managed.fraction以增加Flink的托管内存
  • 开启增量检查点和本地恢复
  • 调整预定义选项
  • 增大block缓存

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dstate.backend.incremental=true \
-Dstate.backend.local-recovery=true \ -Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM \ -Dstate.backend.rocksdb.block.cache-size=64m \ -Dstate.backend.rocksdb.writebuffer.size=128m \ -Dstate.backend.rocksdb.compaction.level.max-size-level-base=320m \ -Dstate.backend.rocksdb.writebuffer.count=5 \
-Dstate.backend.rocksdb.thread.num=4 \ -Dstate.backend.rocksdb.writebuffer.number-to-merge=3 \ -Dstate.backend.rocksdb.memory.partitioned-index-filters=true \ -Dstate.backend.latency-track.keyed-state-enabled=true \
-c com.atguigu.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

Checkpoint

  • 一般需求:设置为分钟级别(1~5分钟)
  • 状态很大的任务
    • 每次Checkpoint访问HDFS 比较耗时,设置为5~10分钟
    • 调大两次Checkpoint之间的暂停间隔,如4~8分钟
  • 也需要考虑时效性的要求,需要在时效性和性能之间做一个平衡,如果时效性要求高,结合end- to-end时长,设置秒级或毫秒级

ParameterTool

  • 可以通过使用ParameterTool类读取配置,它可以读取环境变量、运行参数、配置文件
  • ParameterTool是可序列化的,可以将它当作参数进行传递给算子的自定义函数类

反压处理

概念

概念

  • Flink拓扑中每个节点(Task)间的数据都以阻塞队列的方式传输,下游来不及消费导致队列被占满后,上游的生产也会被阻塞,最终导致数据源的摄入被阻塞
  • 通常产生于:短时间的负载高峰导致系统接收数据的速率远高于它处理数据的速率
  • 许多日常问题都会导致反压,如垃圾回收停顿可能会导致流入的数据快速堆积,或遇到大促、秒杀活动导致流量陡增

危害

  • 影响checkpoint时长
    • barrier不会越过普通数据,数据处理被阻塞也会导致checkpoint barrier流经整个数据管道的时长变长,导致checkpoint总体时间(End to End Duration)变长
  • 影响state大小
    • barrier对齐时,接受到较快的输入管道的barrier后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的barrier也到达,这些被缓存的数据会被放到state里,导致checkpoint变大
  • 这两个影响对于生产环境的作业来说是十分危险的,因为checkpoint是保证数据一致性的关键,checkpoint时间变长有可能导致checkpoint超时失败,而state大小同样可能拖慢 checkpoint甚至导致OOM(使用Heap-based StateBackend)或者物理内存使用超出容器资源(使用RocksDBStateBackend)的稳定性问题

定位

概念

  • 解决反压首先要做的是定位到造成反压的节点
  • 排查的时先把operator chain禁用,方便定位到具体算子
1
2
3
4
5
6
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \ -Dyarn.application.queue=test \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ -Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \ /opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

Flink Web UI

  • 如果处于反压状态,那么有两种可能性
    1. 该节点的发送速率跟不上它的产生数据速率
      • 这一般会发生在一条输入多条输出的Operator(如flatmap)
      • 这种情况该节点是反压的根源节点,是从Source Task到Sink Task的第一个出现反压的节点
    2. 下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率
      • 这种情况需要继续排查下游节点,一直找到第一个为OK的一般就是根源节点
  • 如果找到第一个出现反压的节点,反压根源要么是就这个节点,要么是它紧接着的下游节点。通常第二种情况更常见。如果无法确定,还需要结合Metrics进一步判断

原因&处理

概念

  • 反压可能是暂时的,可能是由于负载高峰、CheckPoint或作业重启引起的数据积压而导致反压
    • 如果反压是暂时的,应该忽略它
    • 断断续续的反压会影响分析和解决问题
  • 定位到反压节点后,分析造成原因的办法主要是观察Task Thread。按照顺序 一步一步排查

查看是否数据倾斜

  • 可以通过Web UI各个SubTask的Records Sent和Record Received来确认
  • Checkpoint detail里不同SubTask的State size也是一个分析数据倾斜的有用指标

使用火焰图分析

分析GC情况

外部组件交互

  • 如果发现Source端数据读取性能比较低或者Sink端写入性能较差,需要检查第三方组件是否遇到瓶颈,还有就是做维表join时的性能问题
    • Kafka集群是否需要扩容,Kafka 接器是否并行度较低
    • HBase的rowkey是否遇到热点问题,是否请求处理不过来
    • ClickHouse并发能力较弱,是否达到瓶颈
    • ……
  • 关于第三方组件的性能问题,需要结合具体的组件来分析,最常用的思路
    • 异步io + 热缓存优化读写性能
    • 先攒批再读写

数据倾斜

判断

  • 相同Task的多个Subtask中,个别Subtask接收到的数据量明显大于其他Subtask接收到的数据量,通过Flink Web UI可以精确地看到每个Subtask处理了多 少数据,即可判断Flink任务是否存在数据倾斜。通常数据倾斜也会引起反压
  • 有时Checkpoint detail里不同SubTask的State size也是一个分析数据倾斜的有用指标

解决

keyBy前

  • 上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因Kafka的topic中某些partition的数据量较大,某些partition的数据量较少
  • 对于不存在keyBy的Flink任务也会出现该情况
  • 需要让Flink任务强制进行shuffle。使用shuffle、rebalance或rescale算子将数据均匀分配,解决数据倾斜的问题

keyBy后的直接聚合

  • 加随机数实现双重聚合:不可以,会造成重复计算和数据错误
  • 预聚合:定时器 + 状态(普通的算子状态)

keyBy后的开窗聚合

  • 加随机数实现双重聚合

    1
    2
    3
    4
    5
    6
    7
    8
    ds
    .map(id -> id + 随机数)
    .keyBy(id)
    .window()
    .reduce()
    .map(id + 随机数 -> id + 窗口时间)
    .keyBy(id)
    .reduce()
  • 预聚合:不可以,会丢失数据原始时间

Group Aggregate

MiniBatch

  • 微批处理,原理是缓存一定的数据后再触发处理,以减少对State的访问, 从而提升吞吐并减少数据的输出量
  • 通过增加延迟换取高吞吐
    • 如果有超低延迟的要求,不建议开启微批处理
    • 通常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启
  • 默认关闭
1
2
3
4
5
6
7
8
9
10
// 初始化 table environment TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启
miniBatch configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");

LocalGlobal

  • 将原先的Aggregate分成Local+Global两阶段聚合,即MapReduce模型中的 Combine+Reduce处理模式
  • 需要先开启MiniBatch,依赖于MiniBatch的参数
  • 需要UDAF实现Merge方法
  • table.optimizer.agg-phase-strategy:聚合策略
    • AUTO:默认
    • TWO_PHASE:使用LocalGlobal两阶段聚合
    • ONE_PHASE:仅使用Global一阶段聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
// 初始化
table environment TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启
miniBatch configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启
LocalGlobal configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

Split Distinct

  • LocalGlobal优化针对普通聚合(如SUM、COUNT、MAX、MIN和AVG)有较好效果,对于DISTINCT聚合(如COUNT DISTINCT)收效不明显
  • 提供了COUNT DISTINCT自动打散功能,通过HASH_CODE(distinct_key) % BUCKET_NUM打散,不需要手动重写

LocalGlobal与Split Distinct对比

AGG WITH FILTER

  • 提升大量CUOUNT DOSTINCT场景性能
1
2
3
4
5
6
7
8
9
10
11
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b, COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T GROUP BY a
# 改为
SELECT a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T GROUP BY a

TopN

使用最优算法

  • UpdateFastRank:最优算法
    • 需要具备2个条件
      • 输入流有PK (Primary Key) 信息,例如Group BY AVG
      • 排序字段的更新是单调的,且单调方向与排序方向相反
        • 如ORDER BY,COUNT/COUNT_ DISTINCT/SUM(正数)DESC
      • 如果要获取到优化Plan,需要在使用ORDER BY SUM DESC时添加SUM为正
        数的过滤条件
  • AppendFast:结果只追加,不更新
  • RetractRank:普通算法,性能差
    • 不建议在生产环境使用该算法

无排名优化

  • 解决数据膨胀问题

增加TopN的Cache大小

Partition By中加时间字段

  • 每天的排名要带上Day字段,否则TopN结果到最会由于State TTL错乱

去重

  • 保留首行/末行

内置函数


Flink
http://docs.mousse.cc/Flink/
作者
Mocha Mousse
发布于
2025年5月26日
许可协议