State是指流計(jì)算過程中計(jì)算節(jié)點(diǎn)的中間計(jì)算結(jié)果或元數(shù)據(jù)屬性,比如 在aggregation過程中要在state中記錄中間聚合結(jié)果,比如 Apache Kafka 作為數(shù)據(jù)源時候,我們也要記錄已經(jīng)讀取記錄的offset,這些State數(shù)據(jù)在計(jì)算過程中會進(jìn)行持久化(插入或更新)。本文將詳細(xì)介紹一下Flink State,通過本文,你可以了解到:
感謝關(guān)注,希望本文對你有所幫助。
Flink 中的狀態(tài)分為兩種主要類型:Keyed State 和 Operator State。
值得注意的是:
在 Flink 的 Table API 或 SQL API 中,對于內(nèi)部的 GroupBy/PartitionBy 操作,F(xiàn)link 會自動管理 Keyed State。而對于 Source Connector 記錄 offset 這樣的操作,通常是在底層的 DataStream API 中實(shí)現(xiàn)的,可能直接使用 Operator State 來管理。例如,F(xiàn)link Kafka Consumer 會使用 Operator State 來存儲 Kafka 主題的分區(qū) offset,以便在發(fā)生故障時能夠從上次成功的檢查點(diǎn)恢復(fù)。
State的具體存儲、訪問和維護(hù)是由**狀態(tài)后端(state backend)**決定的。狀態(tài)后端主要負(fù)責(zé)兩件事情:
Flink提供了三種狀態(tài)后端:
RocksDB是一個嵌入式鍵值存儲(key-value store),它可以將數(shù)據(jù)保存到本地磁盤上,為了從RocksDB中讀寫數(shù)據(jù),系統(tǒng)需要對數(shù)據(jù)進(jìn)行序列化和反序列化。
在選擇狀態(tài)后端時,需要考慮應(yīng)用的狀態(tài)大小、恢復(fù)速度、持久性和部署環(huán)境。對于生產(chǎn)環(huán)境,通常推薦使用 RocksDBStateBackend,因?yàn)樗軌蛱峁┝己玫臄U(kuò)展性和容錯性。
在 Apache Flink 中,對于有狀態(tài)的流處理作業(yè),當(dāng)作業(yè)進(jìn)行擴(kuò)容(scaling out)或縮容(scaling in)時,即增加或減少并行子任務(wù)的數(shù)量時,F(xiàn)link 需要重新分配 OperatorState。這個過程稱為狀態(tài)重分配(state redistribution)。
對于 Operator State 的擴(kuò)容處理,F(xiàn)link 提供了不同的重分配模式來處理狀態(tài):
對于 ListState 類型的 Operator State,如果流任務(wù)的并行度從 N 增加到 M,F(xiàn)link 會將每個并行實(shí)例的狀態(tài)分成 M 份,然后將這些分片分配給新的并行實(shí)例。如果并行度減少,則相反,狀態(tài)將會聚合起來。
圖片
擴(kuò)容時:
縮容時:
BroadcastState 的數(shù)據(jù)在擴(kuò)容或縮容時會被復(fù)制到所有的并行實(shí)例中。由于 BroadcastState 是以廣播的方式存儲數(shù)據(jù),所有并行實(shí)例的狀態(tài)都是相同的。
圖片
對于 UnionListState 類型的 Operator State,在擴(kuò)容或縮容時,狀態(tài)的每個元素將保持不變,原始狀態(tài)的所有元素將被統(tǒng)一地分發(fā)到新的并行實(shí)例中。這意味著每個元素僅分配給一個并行實(shí)例,但所有并行實(shí)例的狀態(tài)的并集會包括所有原始狀態(tài)的元素。隨后由任務(wù)自己決定哪些條目該保留,哪些該丟棄。
圖片
思考:Source的擴(kuò)容(并發(fā)數(shù))是否可以超過Source物理存儲的partition數(shù)量呢?
在使用像 Apache Kafka 這樣的消息隊(duì)列作為數(shù)據(jù)源(Source)時,消息隊(duì)列中的數(shù)據(jù)被劃分為多個分區(qū)(partitions)。這種設(shè)計(jì)主要是為了支持?jǐn)?shù)據(jù)的并行處理以及提高吞吐量。在使用 Flink 或類似的流處理框架時,一個常見的做法是將每個分區(qū)分配給一個并行的 Source 實(shí)例(也稱為 Source Task 或 Source Operator)進(jìn)行處理。
如果嘗試將 Source 的并行度(并發(fā)數(shù))設(shè)置得比物理存儲(比如 Kafka 主題)的分區(qū)數(shù)量還要高,那么將會有一些并行實(shí)例分配不到任何分區(qū),因?yàn)榉謪^(qū)的數(shù)量是固定的,且每個分區(qū)只能被一個并行實(shí)例消費(fèi)(至少在 Flink 的默認(rèn)設(shè)置下是這樣)。這會導(dǎo)致資源浪費(fèi),因?yàn)槌龇謪^(qū)數(shù)量的那部分并行實(shí)例不會做任何實(shí)際的數(shù)據(jù)處理工作,但仍然占用系統(tǒng)資源。
因此,在設(shè)置 Source 的并行度時,通常的最佳實(shí)踐是:
如果需要增加并行度以提高處理能力,相應(yīng)地也需要增加物理存儲的分區(qū)數(shù)量。對于 Kafka 來說,可以通過修改主題的分區(qū)配置來實(shí)現(xiàn)。
對于 Apache Flink,如果使用的是 Flink Kafka Connector,并且嘗試將并行度設(shè)置得比 Kafka 主題的分區(qū)數(shù)量還要高,F(xiàn)link 會在作業(yè)啟動時進(jìn)行檢查。如果發(fā)現(xiàn)這種配置不匹配的情況,F(xiàn)link 會拋出異常并終止作業(yè)啟動,以避免資源浪費(fèi)和潛在的配置錯誤。這種設(shè)計(jì)選擇確保了資源的有效利用和處理能力的合理分配,同時也避免了由于配置錯誤而導(dǎo)致的潛在問題。
KeyedState的算子在擴(kuò)容時會根據(jù)新的任務(wù)數(shù)量對key進(jìn)行重分區(qū),為了降低狀態(tài)在不同任務(wù)之間遷移的成本,F(xiàn)link不會單獨(dú)對key進(jìn)行在分配,而是會把所有的鍵值分別存到不同的key-group中,每個key-group都包含了部分鍵值對。一個key-group是State分配的原子單位。
key-group的數(shù)量在job啟動前必須是確定的且運(yùn)行中不能改變。由于key-group是state分配的原子單位,而每個operator并行實(shí)例至少包含一個key-group,因此operator的最大并行度不能超過設(shè)定的key-group的個數(shù),那么在Flink的內(nèi)部實(shí)現(xiàn)上key-group的數(shù)量就是最大并行度的值。
為了決定一個key屬于哪個Key-Group,通常會采用一種叫做一致性哈希(Consistent Hashing)的算法。一致性哈希算法的基本思想是將所有的Key和所有的Key-Group都映射到同一個哈希環(huán)上。對每個Key進(jìn)行哈希運(yùn)算得到一個哈希值,然后在哈希環(huán)上找到一個順時針方向最近的Key-Group,這個Key就屬于這個Key-Group。即:Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism取余操作的來分配的。
如下圖當(dāng)parallelism=2,maxParallelism=10的情況下流上key與key-group的對應(yīng)關(guān)系如下圖所示:
圖片
如上圖key(a)的hashCode是97,與最大并發(fā)10取余后是7,被分配到了KG-7中,流上每個event都會分配到KG-0至KG-9其中一個Key-Group中。
上面的Stateful Operation節(jié)點(diǎn)的最大并行度maxParallelism的值是10,也就是我們一共有10個Key-Group,當(dāng)我們并發(fā)是2的時候和并發(fā)是3的時候分配的情況如下圖:
圖片
先計(jì)算每個Operator實(shí)例至少分配的Key-Group個數(shù),將不能整除的部分N個,平均分給前N個實(shí)例。最終每個Operator實(shí)例管理的Key-Groups會在GroupRange中表示,本質(zhì)是一個區(qū)間值。比如上圖是2->3擴(kuò)容,那每個task的key-group的數(shù)量是:10/3≈3,也即是每個task先分3個key-group,然后把剩余的1個key-group分配給第一task。
值得注意的是:
Key-Group機(jī)制的特點(diǎn)就是每個具體的key(event)不關(guān)心落到具體的哪個task來處理,只關(guān)心會落到哪個Key-Group中:
State是Flink流計(jì)算的關(guān)鍵部分。Flink 中的狀態(tài)分為兩種主要類型:Keyed State 和 Operator State。Flink提供了三種狀態(tài)后端:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。對于Keyed State 和 Operator State應(yīng)對擴(kuò)縮容時有不同的分配方式。
本文鏈接:http://m.www897cc.com/showinfo-26-79982-0.html我們一起深入理解Flink State
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com