Flink-State
Flink 中 State 用于保存 Task 的状态,Checkpoint 的时候,会将 State 保存到外存中。
State 有两种,Keyed State 和 Operator State,每一种则可以有两种形式存在:Managed 和 Raw。其中 Keyed State 只能引用在 Keyed Stream 上,在 Flink 中使用 keyBy() 创建一个 keyed Stream. Flink 保证同一个 key 的 Tuple 会被发送到同一个 task 进行处理,Operator State 使用 ListState(用于 rescale),上层是一个 HashMap
State 的入口在 AbstractStateBackend,AbstractStateBackend 的子类有
- MemoryStateBackend
- RocksDBStateBackend
- FsStateBackend
本文主要分析 MemoryStateBackend 和 RocksDBStateBackend。如无特殊说明,本文所有代码均使用附[1]
MemoryStateBackend
MemoryStateBackend 使用内存作为存储主要包括一些配置项(内存大小 等)以及创建 operator/keyed state 的接口,本文会创建一个 HeapKeyedStateBackend
HeapKeyedStateBackend 会提供创建各种 State 的接口(ValueState,ListState,MapState,SortedMapState,ReducingState,FoldingState,AggregatingState,RawQueueState,RawSecodaryState,RawStortedSecondaryState 等),返回当前 state 的 snapshot(checkpoint 会用到) 以及从指定的 KeyedStateHandles 恢复的接口。
其中 createValueState 则创建一个 HeapValueState
一些重要的变量和方法如下所示:
- 变量
- entries – 记录具体的值,类型为 Map
>> 每个 value 由 唯一确定 - defaultValue – 表示 StateValue 的初始值,没有则为 null
- backend – 当前 state 所属的 stateBackend
- stateDescriptor – 当前 state 的描述信息,主要包括当前 state 的 namespace、value 已经面向用户的 descriptors 等
- entries – 记录具体的值,类型为 Map
- 方法
- createNewMap() – 创建一个 map,跟进是否为 Queryable 的 state,选择是否创建 ConcurrentHashMap
- snapshot(int, keyGroup, DataOutputView outputView) – 将指定 keyGroup 的 key 进行一次 snapshot,写入到指定的 outputView 中
- restore(int keyGroup, DataInputView intputView) – 从 inputView 中恢复
- get(int keyGroup, K key, N namespace) – 获取特定的 value
- clear(int keyGroup, K key, N namespace) – 清空特定的 value
- clear(Set
> stateKeys) – 清空特定的 value - value() – 返回特定的 value
- update(V value) – 更新特定的 value
- update(int keyGroup, K key, N namespace, V value)
程序流程解释
程序和 State 相关的流程如下:
- 50 行创建一个 MemoryState,且设置 DefaultValue 为 (0, 0)
- 每次处理一个 Tuple 的时候,会首先读取当前 key 对应的 value(第 25 行)
- 然后进行处理后,更新 state 值(28 - 34 行)
- 最后跟进 state 的值判断是否进行相应处理 – 往下有发送平均值,以及清空 state (37 - 40 行)
state 的值变化如下所示(红色的 0 为调用 clear 后生成)
f0 | f1 |
---|---|
0 | 0 |
1 | 3 |
2 | 8 |
0 | 0 |
1 | 7 |
2 | 11 |
0 | 0 |
1 | 2 |
RocksDBStateBackend
RocksDBStateBackend 和 MemoryStateBackend 的区别主要在于,使用 RocksDB 替代 Memory 来存储 State。
由于 RocksDB 是一个 Key-Value Store,因此存储数据结构,和 MemoryStateBackend 稍微不一样。其中 key 为 serialized(keyGroup, key, namespace), value 为 serialized(value)
自问自答
- 有哪些 StateBackend 实现,区别都是什么,每一种的优劣是什么
- StateBackend 的实现在文首给出来,其中 MemoryStateBackend 会很快,但是不仅行持久化,RocksDBStateBackend 使用 RocksDB 进行 State 存储,速度快,且会存储到持久化介质上
- MemoryStateBackend 的局限:
- 每个 state 的大小有上限限制,默认 5M,可配置
- state 大小不能超过 akka frame size(其他的 statebackend 是否可以呢?)
- JobManager 的内存需要能够存放所有的 state
- 建议使用场景:开发和调试阶段;state 不大的场景
- RocksDBStateBackend
- RocksDBStateBackend 将 in-flight 数据存在 RocksDB 中,会存放在 TaskManager 的 data 目录下
- 经常使用异步 Snapshot
- 局限如下:
- 每个 key value 的大小不能超过 2^31(因为RocksDB 的 JNI bridge API 使用 byte[] 格式)
- 建议使用场景:
- state 量很大,window 窗口很长的 job
- 其他需要 high-available 的场景
- 支持增量 checkpoint
- FsStateBackend
- 将 in-flight 数据存储在 TaskManager 的内存中,checkpoint 的时候将数据存储到外存
- 默认使用异步 snapshot
- 建议使用场景:
- 大 state,window 窗口很长的 job
- 其他需要 high-available 的场景
- 作业选择不同 StateBackend 的标准是什么
- 跟进速度和是否需要持久化到外存选择?
- 不同 StateBackend 保存的数据结构是什么样的
- MemoryStateBackend 包括多种 State,其中 HeapValueState 的数据结构为 Map
>> - RocksDBStateBackend 则存储的是 serialized(keyGroup, key, namespace) <-> serialized(value) 的形式
- MemoryStateBackend 包括多种 State,其中 HeapValueState 的数据结构为 Map
- 不同 StateBackend 中内存占用怎么估计/计算
- 暂时还不知道
- 如果需要迁移 State 数据,怎么完成的(作业的并发进行调整)
- 哪些节点/角色会有 StateBackend,都用来做什么
- 所有需要保存 State 的节点(?)JobManager、Task 等
- Keyed Stream 中不同的 key 会被发送到同一个 task 吗?key 往下发送的逻辑是什么
- 根据 hash(key) 往下游发送
- keyGroup 和什么有关,生成规则是什么,在查问题的时候会用到吗(某个 value 属于哪个 keyGroup)
- keyGroup 的存在主要用于 rescale task,能够避免 random io 等
- 现在 keyGroup 在 Job 启动的时候确定,后面不会进行变更
- key 属于哪个 keyGroup 由 hash 函数确定
参考文献
[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
附
[1] 本文使用代码
[2] keyGroup 的好处