精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

從來沒有一個人能把Flink講的這么透徹

開發 前端 開發工具
Flink使用java語言開發,提供了scala編程的接口。使用java或者scala開發Flink是需要使用jdk8版本,如果使用Maven,maven版本需要使用3.0.4及以上。

一、 Filnk簡介和編程模型

Flink使用java語言開發,提供了scala編程的接口。使用java或者scala開發Flink是需要使用jdk8版本,如果使用Maven,maven版本需要使用3.0.4及以上。

[[286813]]

Dataflows:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

parallel Dataflows:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

Task和算子鏈:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

JobManager、TaskManager和clients:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

Flink運行時包含兩種類型的進程:

  • JobManger:也叫作masters,協調分布式執行,調度task,協調checkpoint,協調故障恢復。在Flink程序中至少有一個JobManager,高可用可以設置多個JobManager,其中一個是Leader,其他都是standby狀態。
  • TaskManager:也叫workers,執行dataflow生成的task,負責緩沖數據,及TaskManager之間的交換數據。Flink程序中必須有一個TaskManager.

Flink程序可以運行在standalone集群,Yarn或者Mesos資源調度框架中。

clients不是Flink程序運行時的一部分,作用是向JobManager準備和發送dataflow,之后,客戶端可以斷開連接或者保持連接。

TaskSlots 任務槽:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

每個Worker(TaskManager)是一個JVM進程,可以執行一個或者多個task,這些task可以運行在任務槽上,每個worker上至少有一個任務槽。每個任務槽都有固定的資源,例如:TaskManager有三個TaskSlots,那么每個TaskSlot會將TaskMananger中的內存均分,即每個任務槽的內存是總內存的1/3。任務槽的作用就是分離任務的托管內存,不會發生cpu隔離。

通過調整任務槽的數據量,用戶可以指定每個TaskManager有多少任務槽,更多的任務槽意味著更多的task可以共享同一個JVM,同一個JVM中的task共享TCP連接和心跳信息,共享數據集和數據結構,從而減少TaskManager中的task開銷。

總結:task slot的個數代表TaskManager可以并行執行的task數。

二、 Flink 批處理

批處理WordCount:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
  2. DataSource<String> ds = env.readTextFile("./data/words"); 
  3. FlatMapOperator<String, String> flatMap = ds.flatMap(new FlatMapFunction<String, String>() { 
  4. @Override 
  5. public void flatMap(String s, Collector<String> collector) throws Exception { 
  6. String[] ssplit = s.split(" "); 
  7. for (String cs : split) { 
  8. collector.collect(cs); 
  9. }); 
  10. MapOperator<String, Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() { 
  11. @Override 
  12. public Tuple2<String, Integer> map(String s) throws Exception { 
  13. return new Tuple2<String, Integer>(s, 1); 
  14. }); 
  15. UnsortedGrouping<Tuple2<String, Integer>> groupBy = map.groupBy(0); 
  16. AggregateOperator<Tuple2<String, Integer>> sum = groupBy.sum(1); 
  17. // sum.print();//可以觸發算子執行 
  18. //排序,目前不支持全局排序 
  19. SortPartitionOperator<Tuple2<String, Integer>> sort = sum.sortPartition(1, Order.DESCENDING).setParallelism(1); 
  20. sort.writeAsText("./TempResult/result").setParallelism(1); 
  21. env.execute("my-wordcount"); 

三、 Flink 執行流程

數據源分為有界和無界之分,有界數據源可以編寫批處理程序,無界數據源可以編寫流式程序。DataSet API用于批處理,DataStream API用于流式處理。

批處理使用ExecutionEnvironment和DataSet,流式處理使用StreamingExecutionEnvironment和DataStream。

DataSet和DataStream是Flink中表示數據的特殊類,DataSet處理的數據是有界的,DataStream處理的數據是無界的,這兩個類都是不可變的,一旦創建出來就無法添加或者刪除數據元。

Flink程序的執行過程:

  • 獲取flink的執行環境(execution environment)
  • 加載數據-- soure
  • 對加載的數據進行轉換 -- transformation
  • 對結果進行保存或者打印 --sink
  • 觸發flink程序的執行(execute(),count(),collect(),print()),例如:調用ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。

四、 Flink standalone集群搭建

Flink可以在Linux和window中運行,Flink集群需要有一個Master節點和一個或者多個Worker節點組成。

安裝Flink集群之前需要準備:1.每臺幾點需要配置jdk8環境變量。2.需要每臺節點有ssh服務,且有免密通信。

步驟:

1. 進入https://flink.apache.org/downloads.html 下載flink.

下載Flink版本,這里選擇了基于Scala2.11和Hadoop2.6的1.7.1版本.

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

2. 下載好Flink之后上傳到Master(node1)節點上解壓:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

3. 進入../conf/flink-conf.yaml中配置:

  • jobmanager.rpc.address: node1 設置Master節點地址
  • jobmanager.heap.size: 1024m 設置Master使用的最大內存,單位是MB
  • taskmanager.heap.size: 1024m 設置Worker使用的最大內存,單位是MB

4. 配置../conf/slaves ,配置Worker節點列表

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

5. 將配置好的Flink發送到其他worker節點(node2,node3)上。

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

6. 啟動Flink集群,訪問webui

在Master節點上,../bin/start-cluster.sh 啟動集群。訪問webui:http:node1:8081

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

7. 停止集群:在Master節點中../bin/stop-cluster.sh

五、 將Flink任務提交到standalone集群運行

將以上FlinkSocketWordCount 案例打包提交到集群中運行,無論在Master節點還是在Worker節點提交都可以。

首先需要在node5節點中啟動socket 9999端口:

  1. nc –lk 9999 

提交命令如下:

  1. ./flink run /root/test/MyFlink-1.0-SNAPSHOT-jar-with-dependencies.jar --port 9999 

在node5節點上輸入數據后在webUI中查看日志:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

六、 Flink流處理

1. 讀取Socket數據統計WordCount

  1. public class SocketWindowWordCount { 
  2. public static void main(String[] args) throws Exception { 
  3. StreamExecutionEnvironment env = 
  4. StreamExecutionEnvironment.getExecutionEnvironment(); 
  5. DataStreamSource<String> socketStream = env.socketTextStream("node5", 9999); 
  6. SingleOutputStreamOperator<Tuple2<String, Integer>> pairWords = 
  7. socketStream.flatMap(new Splitter()); 
  8. KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = pairWords.keyBy(0); 
  9. WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowStream = 
  10. keyBy.timeWindow(Time.seconds(5)); 
  11. DataStream<Tuple2<String, Integer>> dataStream = windowStream.sum(1); 
  12. dataStream.print(); 
  13. env.execute("socket wordcount"); 
  14. //Splitter 實現了 FlatMapFunction ,將輸入的一行數據按照空格進行切分,返回tuple<word,1> 
  15. public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 
  16. @Override 
  17. public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { 
  18. for (String word: sentence.split(" ")) { 
  19. out.collect(new Tuple2<String, Integer>(word, 1)); 

2. 數據源Source

Source 是Flink獲取數據的地方。以下source中和批處理的source類似,但是以下源作為dataStream流處理時,是一條條處理,最終得到的不是一個總結果,而是每次處理后都會得到一個結果。

  • socketTextStream – 讀取Socket數據流
  • readTextFile() -- 逐行讀取文本文件獲取數據流,每行都返回字符串。
  • fromCollection() – 從集合中創建數據流。
  • fromElements – 從給定的數據對象創建數據流,所有數據類型要一致。
  • addSource – 添加新的源函數,例如從kafka中讀取數據,參見讀取kafka數據案例。

3. 數據寫出 Sink

  • writeAsText() – 以字符串的形式逐行寫入文件,調用每個元素的toString()得到寫入的字符串。
  • writeAsCsv() – 將元組寫出以逗號分隔的csv文件。注意:只能作用到元組數據上。
  • print() – 控制臺直接輸出結果,調用對象的toString()方法得到輸出結果。
  • addSink() – 自定義接收函數。例如將結果保存到kafka中,參見kafka案例。

七、 Flink讀取Socket數據WordCount案例

1. 創建maven項目

2. 導入maven依賴

flink1.7.1 使用jdk1.8,scala2.11或者2.12.這里使用的scala2.11.如果只是使用java開發flink,Scala的版本選擇多少都可以。如果使用Scala開發那么就必須使用Scala對應的版本。

  1. <properties> 
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
  3. <maven.compiler.source>1.8</maven.compiler.source> 
  4. <maven.compiler.target>1.8</maven.compiler.target> 
  5. <flink.version>1.7.1</flink.version> 
  6. </properties> 
  7.  
  8. <dependency> 
  9. <groupId>org.apache.flink</groupId> 
  10. <artifactId>flink-java</artifactId> 
  11. <version>${flink.version}</version> 
  12. </dependency> 
  13. <dependency> 
  14. <groupId>org.apache.flink</groupId> 
  15. <artifactId>flink-streaming-java_2.11</artifactId> 
  16. <version>${flink.version}</version> 
  17. </dependency> 
  18. <dependency> 
  19. <groupId>org.apache.flink</groupId> 
  20. <artifactId>flink-clients_2.11</artifactId> 
  21. <version>${flink.version}</version> 
  22. </dependency> 
  23. <dependency> 
  24. <groupId>org.apache.flink</groupId> 
  25. <artifactId>flink-connector-wikiedits_2.11</artifactId> 
  26. <version>${flink.version}</version> 
  27. </dependency> 

 

3. 創建StreamExecutionEnvironment 或者ExecutionEnvironment(批處理作業)。用于設置執行參數并創建從外部系統讀取的源。

代碼如下:

  1. public class FlinkSocketWordCount { 
  2. public static void main(String[] args) throws Exception { 
  3. final int port ; 
  4. try{ 
  5. final ParameterTool params = ParameterTool.fromArgs(args); 
  6. port = params.getInt("port"); 
  7. }catch (Exception e){ 
  8. System.err.println("No port specified. Please run 'FlinkSocketWordCount --port <port>'"); 
  9. return; 
  10. //獲取執行環境 
  11. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  12. //從socket中獲取數據。 
  13. DataStreamSource<String> text = env.socketTextStream("node5", port); 
  14. SingleOutputStreamOperator<WordWithCount> wordWithCountInfos = text.flatMap(new FlatMapFunction<String, WordWithCount>() { 
  15. @Override 
  16. public void flatMap(String line, Collector<WordWithCount> collector) throws Exception { 
  17. for (String word : line.split(" ")) { 
  18. collector.collect(new WordWithCount(word, 1L)); 
  19. }); 
  20. //keyBy中所寫的字段必須是類WordWithCount中的字段,WordWithCount中如果重寫構造必須寫上無參構造 
  21. KeyedStream<WordWithCount, Tuple> keyedInfos = wordWithCountInfos.keyBy("word"); 
  22. WindowedStream<WordWithCount, Tuple, TimeWindow> windowedInfo = keyedInfos.timeWindow(Time.seconds(5), Time.seconds(1)); 
  23. SingleOutputStreamOperator<WordWithCount> windowCounts = windowedInfo.reduce(new ReduceFunction<WordWithCount>() { 
  24. @Override 
  25. public WordWithCount reduce(WordWithCount w1, WordWithCount w2) throws Exception { 
  26. return new WordWithCount(w1.getWord(), w1.getCount() + w2.getCount()); 
  27. }); 
  28. windowCounts.print(); 
  29. env.execute("Socket Window WordCount"); 
  30. public static class WordWithCount { 
  31. public String word; 
  32. public Long count; 
  33. public WordWithCount() { } 
  34. public WordWithCount(String word, Long count) { 
  35. this.word = word; 
  36. this.count = count; 
  37. public String getWord() { 
  38. return word; 
  39. public void setWord(String word) { 
  40. this.word = word; 
  41. public Long getCount() { 
  42. return count; 
  43. public void setCount(Long count) { 
  44. this.count = count; 
  45. @Override 
  46. public String toString() { 
  47. return word + " : " + count; 

八、 如何指定keys

比如某些算子(join,coGroup,keyBy,groupB y)要求在數據元上定義key。另外有些算子操作(reduce,groupReduce,Aggregate,Windows)允許數據在處理之前根據key進行分組。在Flink中數據模型不是基于Key,Value格式處理的,因此不需將數據處理成鍵值對的格式,key是“虛擬的”,可以人為的來指定,實際數據處理過程中根據指定的key來對數據進行分組,DataSet中使用groupBy來指定key,DataStream中使用keyBy來指定key。如何指定keys?

1. 使用Tuples來指定key

定義元組來指定key可以指定tuple中的第幾個元素當做key,或者指定tuple中的聯合元素當做key。需要使用org.apache.flink.api.java.tuple.TupleXX包下的tuple,最多支持25個元素且Tuple必須new創建。如果Tuple是嵌套的格式,例如:DataStream

2. 使用Field Expression來指定key

可以使用Field Expression來指定key,一般作用的對象可以是類對象,或者嵌套的Tuple格式的數據。

使用注意點:

(1) 對于類對象可以使用類中的字段來指定key。

類對象定義需要注意:

  • 類的訪問級別必須是public
  • 必須寫出默認的空的構造函數
  • 類中所有的字段必須是public的或者必須有getter,setter方法。例如類中有個字段是foo,那么這個字段的getter,setter方法為:getFoo() 和 setFoo().
  • Flink必須支持字段的類型。一般類型都支持

(2) 對于嵌套的Tuple類型的Tuple數據可以使用”xx.f0”表示嵌套tuple中第一個元素,也可以直接使用”xx.0”來表示第一個元素,參照案例GroupByUseFieldExpressions。

3. 使用Key Selector Functions來指定key

使用key Selector這種方式選擇key,非常方便,可以從數據類型中指定想要的key.

九、 累加器(Accumulator)和計數器(Counter)

Accumulator即累加器,可以在分布式統計數據,只有在任務結束之后才能獲取累加器的最終結果。計數器是累加器的具體實現,有:IntCounter,LongCounter和DoubleCounter。

累加器注意事項:

  • 需要在算子內部創建累加器對象
  • 通常在Rich函數中的open方法中注冊累加器,指定累加器的名稱
  • 在當前算子內任意位置可以使用累加器
  • 必須當任務執行結束后,通過env.execute(xxx)執行后的JobExecutionResult對象獲取累加器的值。

IntCounter舉例:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
  2. DataSource<String> dataSource = env.fromElements("a", "b", "c", "d", "e", "f"); 
  3. MapOperator<String, String> map = dataSource.map(new RichMapFunction<String, String>() { 
  4. //1.創建累加器,在算子中創建累加器對象 
  5. private IntCounter numLines = new IntCounter(); 
  6. //2.注冊累加器對象,通常在Rich函數的open方法中使用 
  7. // getRuntimeContext().addAccumulator("num-lines", this.numLines);注冊累加器 
  8. public void open(Configuration parameters) throws Exception { 
  9. getRuntimeContext().addAccumulator("num-lines", this.numLines); 
  10. @Override 
  11. public String map(String s) throws Exception { 
  12. //3.使用累加器 ,可以在任意操作中使用,包括在open或者close方法中 
  13. this.numLines.add(1); 
  14. return s; 
  15. }).setParallelism(8); 
  16. map.writeAsText("./TempResult/result",FileSystem.WriteMode.OVERWRITE); 
  17. JobExecutionResult myJobExecutionResult = env.execute("IntCounterTest"); 
  18. //4.當作業執行完成之后,在JobExecutionResult對象中獲取累加器的值。 
  19. int accumulatorResult = myJobExecutionResult.getAccumulatorResult("num-lines"); 
  20. System.out.println("accumulator value = "+accumulatorResult); 

十、 Flink + kafka 整合使用

1. 在pom.xml中添加Flink Kafka連接器的依賴,如果添加了不要重復添加

  1. <!-- Flink Kafka連接器的依賴--> 
  2. <dependency> 
  3. <groupId>org.apache.flink</groupId> 
  4. <artifactId>flink-connector-kafka-0.11_2.11</artifactId> 
  5. <version>1.7.1</version> 
  6. </dependency> 

2. 從kafka中讀取數據處理,并將結果打印到控制臺

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  2. Properties props = new Properties(); 
  3. props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); 
  4. props.setProperty("group.id", "flink-group"); 
  5. /** 
  6. * 第一個參數是topic 
  7. * 第二個參數是value的反序列化格式 
  8. * 第三個參數是kafka配置 
  9. */ 
  10. FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>("FlinkTopic", new SimpleStringSchema(), props); 
  11. DataStreamSource<String> stringDataStreamSource = env.addSource(consumer011); 
  12. SingleOutputStreamOperator<String> flatMap = stringDataStreamSource.flatMap(new FlatMapFunction<String, String>() { 
  13. @Override 
  14. public void flatMap(String s, Collector<String> outCollector) throws Exception { 
  15. String[] ssplit = s.split(" "); 
  16. for (String currentOne : split) { 
  17. outCollector.collect(currentOne); 
  18. }); 
  19. //注意這里的tuple2需要使用org.apache.flink.api.java.tuple.Tuple2 這個包下的tuple2 
  20. SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() { 
  21. @Override 
  22. public Tuple2<String, Integer> map(String word) throws Exception { 
  23. return new Tuple2<>(word, 1); 
  24. }); 
  25. //keyby 將數據根據key 進行分區,保證相同的key分到一起,默認是按照hash 分區 
  26. KeyedStream<Tuple2<String, Integer>, Tuple> keyByResult = map.keyBy(0); 
  27. WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowResult = keyByResult.timeWindow(Time.seconds(5)); 
  28. SingleOutputStreamOperator<Tuple2<String, Integer>> endResult = windowResult.sum(1); 
  29. //sink 直接控制臺打印 
  30. //執行flink程序,設置任務名稱。console 控制臺每行前面的數字代表當前數據是哪個并行線程計算得到的結果 
  31. endResult.print(); 
  32. //最后要調用execute方法啟動flink程序 
  33. env.execute("kafka word count"); 

3. 將結果寫入kafka

  1. //sink 將結果存入kafka topic中,存入kafka中的是String類型,所有endResult需要做進一步的轉換 
  2. FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>("node1:9092,node2:9092,node3:9092","FlinkResult",new SimpleStringSchema()); 
  3. //將tuple2格式數據轉換成String格式 
  4. endResult.map(new MapFunction<Tuple2<String,Integer>, String>() { 
  5. @Override 
  6. public String map(Tuple2<String, Integer> tp2) throws Exception { 
  7. return tp2.f0+"-"+tp2.f1; 
  8. }).addSink(producer); 

4. 將結果寫入文件

  1. //sink 將結果存入文件,FileSystem.WriteMode.OVERWRITE 文件目錄存在就覆蓋 
  2. endResult.writeAsText("./result/kafkaresult",FileSystem.WriteMode.OVERWRITE); 
  3. // endResult.writeAsText("./result/kafkaresult",FileSystem.WriteMode.NO_OVERWRITE); 

十一、 Flink + Kafka 整合數據一致性保證

1. Flink消費kafka數據起始offset配置

Flink讀取Kafka數據確定開始位置有以下幾種設置方式:

(1) flinkKafkaConsumer.setStartFromEarliest()

從topic的最早offset位置開始處理數據,如果kafka中保存有消費者組的消費位置將被忽略。

(2) flinkKafkaConsumer.setStartFromLatest()

從topic的最新offset位置開始處理數據,如果kafka中保存有消費者組的消費位置將被忽略。

(3) flinkKafkaConsumer.setStartFromTimestamp(…)

從指定的時間戳(毫秒)開始消費數據,Kafka中每個分區中數據大于等于設置的時間戳的數據位置將被當做開始消費的位置。如果kafka中保存有消費者組的消費位置將被忽略。

(4) flinkKafkaConsumer.setStartFromGroupOffsets()

默認的設置。根據代碼中設置的group.id設置的消費者組,去kafka中或者zookeeper中找到對應的消費者offset位置消費數據。如果沒有找到對應的消費者組的位置,那么將按照auto.offset.reset設置的策略讀取offset。

  1. FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>("FlinkTopic", new SimpleStringSchema(), props); 
  2. // consumer011.setStartFromEarliest(); 
  3. // consumer011.setStartFromLatest(); 
  4. // consumer011.setStartFromGroupOffsets(); 
  5. // consumer011.setStartFromTimestamp(111111);  
  6. DataStreamSource<String> dateSource = env.addSource(consumer011); 
  7. dateSource… … 

2. Flink消費kafka數據,消費者offset提交配置

Flink提供了消費kafka數據的offset如何提交給Kafka或者zookeeper(kafka0.8之前)的配置。注意,Flink并不依賴提交給Kafka或者zookeeper中的offset來保證容錯。提交的offset只是為了外部來查詢監視kafka數據消費的情況。

配置offset的提交方式取決于是否為job設置開啟checkpoint。可以使用env.enableCheckpointing(5000)來設置開啟checkpoint。

(1) 關閉checkpoint:

如何禁用了checkpoint,那么offset位置的提交取決于Flink讀取kafka客戶端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否開啟自動提交offset, auto.commit.interval.ms決定自動提交offset的周期。

(2) 開啟checkpoint:

如果開啟了checkpoint,那么當checkpoint保存狀態完成后,將checkpoint中保存的offset位置提交到kafka。這樣保證了Kafka中保存的offset和checkpoint中保存的offset一致,可以通過配置setCommitOffsetsOnCheckpoints(boolean)來配置是否將checkpoint中的offset提交到kafka中(默認是true)。如果使用這種方式,那么properties中配置的kafka offset自動提交參數enable.auto.commit和周期提交參數auto.commit.interval.ms參數將被忽略。

3. 使用checkpoint + 兩階段提交來保證僅一次消費kafka中的數據

當談及“exactly-once semantics”僅一次處理數據時,指的是每條數據只會影響最終結果一次。Flink可以保證當機器出現故障或者程序出現錯誤時,也沒有重復的數據或者未被處理的數據出現,實現僅一次處理的語義。Flink開發出了checkpointing機制,這種機制是在Flink應用內部實現僅一次處理數據的基礎。

checkpoint中包含:

  • 當前應用的狀態
  • 當前消費流數據的位置

在Flink1.4版本之前,Flink僅一次處理數據只限于Flink應用內部(可以使用checkpoint機制實現僅一次數據數據語義),當Flink處理完的數據需要寫入外部系統時,不保證僅一次處理數據。為了提供端到端的僅一次處理數據,在將數據寫入外部系統時也要保證僅一次處理數據,這些外部系統必須提供一種手段來允許程序提交或者回滾寫入操作,同時還要保證與Flink的checkpoint機制協調使用。

在分布式系統中協調提交和回滾的常見方法就是兩階段提交協議。下面給出一個實例了解Flink如何使用兩階段提交協議來實現數據僅一次處理語義。

該實例是從kafka中讀取數據,經過處理數據之后將結果再寫回kafka。kafka0.11版本之后支持事務,這也是Flink與kafka交互時僅一次處理的必要條件。【注意:當Flink處理完的數據寫入kafka時,即當sink為kafka時,自動封裝了兩階段提交協議】。Flink支持僅一次處理數據不僅僅限于和Kafka的結合,只要sink提供了必要的兩階段協調實現,可以對任何sink都能實現僅一次處理數據語義。

其原理如下:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

上圖Flink程序包含以下組件:

  • 一個從kafka中讀取數據的source
  • 一個窗口聚合操作
  • 一個將結果寫往kafka的sink。

要使sink支持僅一次處理數據語義,必須以事務的方式將數據寫往kafka,將兩次checkpoint之間的操作當做一個事務提交,確保出現故障時操作能夠被回滾。假設出現故障,在分布式多并發執行sink的應用程序中,僅僅執行單次提交或回滾事務是不夠的,因為分布式中的各個sink程序都必須對這些提交或者回滾達成共識,這樣才能保證兩次checkpoint之間的數據得到一個一致性的結果。Flink使用兩階段提交協議(pre-commit+commit)來實現這個問題。

Filnk checkpointing開始時就進入到pre-commit階段,具體來說,一旦checkpoint開始,Flink的JobManager向輸入流中寫入一個checkpoint barrier將流中所有消息分隔成屬于本次checkpoint的消息以及屬于下次checkpoint的消息,barrier也會在操作算子間流轉,對于每個operator來說,該barrier會觸發operator的State Backend來為當前的operator來打快照。如下圖示:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

Flink DataSource中存儲著Kafka消費的offset,當完成快照保存后,將chechkpoint barrier傳遞給下一個operator。這種方式只有在Flink內部狀態的場景是可行的,內部狀態指的是由Flink的State Backend管理狀態,例如上面的window的狀態就是內部狀態管理。只有當內部狀態時,pre-commit階段無需執行額外的操作,僅僅是寫入一些定義好的狀態變量即可,checkpoint成功時Flink負責提交這些狀態寫入,否則就不寫入當前狀態。

但是,一旦operator操作包含外部狀態,事情就不一樣了。我們不能像處理內部狀態一樣處理外部狀態,因為外部狀態涉及到與外部系統的交互。這種情況下,外部系統必須要支持可以與兩階段提交協議綁定的事務才能保證僅一次處理數據。

本例中的data sink是將數據寫往kafka,因為寫往kafka是有外部狀態的,這種情況下,pre-commit階段下data sink 在保存狀態到State Backend的同時,還必須pre-commit外部的事務。如下圖:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

當checkpoint barrier在所有的operator都傳遞一遍切對應的快照都成功完成之后,pre-commit階段才算完成。這個過程中所有創建的快照都被視為checkpoint的一部分,checkpoint中保存著整個應用的全局狀態,當然也包含pre-commit階段提交的外部狀態。當程序出現崩潰時,我們可以回滾狀態到最新已經完成快照的時間點。

下一步就是通知所有的operator,告訴它們checkpoint已經完成,這便是兩階段提交的第二個階段:commit階段。這個階段中JobManager會為應用中的每個operator發起checkpoint已經完成的回調邏輯。本例中,DataSource和Winow操作都沒有外部狀態,因此在該階段,這兩個operator無需執行任何邏輯,但是Data Sink是有外部狀態的,因此此時我們需要提交外部事務。如下圖示:

從來沒有一個人能把Flink講的這么透徹,小編的出現算是一個意外

匯總以上信息,總結得出:

(1) 一旦所有的operator完成各自的pre-commit,他們會發起一個commit操作。

(2) 如果一個operator的pre-commit失敗,所有其他的operator 的pre-commit必須被終止,并且Flink會回滾到最近成功完成的checkpoint位置。

(3) 一旦pre-commit完成,必須要確保commit也要成功,內部的operator和外部的系統都要對此進行保證。假設commit失敗【網絡故障原因】,Flink程序就會崩潰,然后根據用戶重啟策略執行重啟邏輯,重啟之后會再次commit。

因此,所有的operator必須對checkpoint最終結果達成共識,即所有的operator都必須認定數據提交要么成功執行,要么被終止然后回滾。

(4) Flink中外部狀態實現兩階段提交

Flink外部狀態實現兩階段提交將邏輯封裝到TwoPhaseComitSinkFunction類中,下面擴展TwoPhaseCommitSinkFunction來實現就文件的sink。若要實現支持exactly-once語義的文件sink,需要實現以下4個方法:

  • beginTransaction:開啟一個事務,創建一個臨時文件,將數據寫入到臨時文件中
  • preCommit:在pre-commit階段,flush緩存數據到磁盤,然后關閉這個文件,確保不會有新的數據寫入到這個文件,同時開啟一個新事務執行屬于下一個checkpoint的寫入操作
  • commit:在commit階段,我們以原子性的方式將上一階段的文件寫入真正的文件目錄下。【注意:數據有延時,不是實時的】
  • abort:一旦異常終止事務,程序如何處理。這里要清除臨時文件。

 

責任編輯:趙寧寧 來源: 架構師之巔
相關推薦

2025-03-13 10:31:20

DeepSeek開源EPLB

2015-06-12 15:29:06

一個人的爆品

2024-11-14 14:30:00

模型結構AI

2013-08-14 10:23:22

創業個人創業互聯網創業

2011-07-06 14:29:49

中國移動王建宙4G

2013-03-08 02:52:03

個人開發項目糾錯

2011-06-16 14:21:43

習慣管理

2025-03-07 09:18:10

2013-06-07 10:42:53

2009-02-26 10:19:56

2014-08-08 15:34:53

安全漏洞漏洞防護安全防守

2017-07-13 12:33:15

戴爾

2014-05-29 10:43:29

斯諾登棱鏡監聽

2022-11-10 09:28:40

框架開發

2009-09-27 16:04:49

CCIE資格

2012-05-29 09:22:50

游戲設計開發

2023-04-12 08:16:35

2025-02-07 09:34:12

2009-03-20 09:12:56

阿里巴巴衛哲馬云

2019-12-17 18:25:35

物聯網電腦互聯網
點贊
收藏

51CTO技術棧公眾號

国产成人精品a视频一区| 91丨国产丨九色丨pron| 中文字幕一区二区三中文字幕| 久久在精品线影院精品国产| 精品亚洲一区二区三区四区五区高| 泷泽萝拉在线播放| 最新国产露脸在线观看| 无码人妻aⅴ一区二区三区有奶水 无码免费一区二区三区 | 国产在线观看a视频| 在线欧美一区| 欧美精选一区二区| 日韩wuma| 国产男女在线观看| 国产精品人人人人| youjizz亚洲| 亚洲你懂的在线视频| 国产精品中文久久久久久久| 亚洲自拍偷拍一区二区| 欧美黄页免费| 亚洲欧美综合色| 国产在线999| 懂色av蜜臀av粉嫩av永久| 成人福利一区二区| 欧美国产一区视频在线观看| 日韩av色在线| 蜜桃传媒一区二区亚洲| 粉嫩一区二区三区| 久久久国产精品不卡| 欧美自拍视频在线| 久久国产精品影院| 日韩精品一区二区三区中文字幕 | 欧美在线视频二区| 亚洲欧美精品一区二区三区| 亚洲国产一区二区三区在线播放 | 国产又黄又猛又粗| 成年人在线观看视频| 日本不卡123| 日日狠狠久久偷偷四色综合免费 | 野花视频免费在线观看| 久久爱.com| 午夜综合激情| 亚洲国产精品yw在线观看| 天天想你在线观看完整版电影免费 | 国产欧美高清视频在线| 色综合久久久网| 久久亚洲精品国产精品紫薇| 国产人妖一区| 久久久精品黄色| 国产在线一区二区三区播放| 久久精品人人做人人爽97| 国产一区二区三区免费播放| 久久久久北条麻妃免费看| 亚洲一区二区三区观看| 亚洲91av| av不卡免费在线观看| 日韩美女在线播放| 日韩视频在线观看视频| 黄色一级视频免费观看| 老司机成人在线| 欧美中文字幕一区| 中国一级大黄大黄大色毛片| 欧美一级片免费| 丝袜美腿成人在线| 欧美xxxx14xxxxx性爽| 欧美性猛交xxxx免费看| 国产欧美日韩视频在线观看| 99www免费人成精品| 日本一区二区三区四区五区| 中文字幕中文字幕精品| 欧美日韩精品欧美日韩精品| 欧美黄网在线观看| 久久99999| √新版天堂资源在线资源| 久久 天天综合| 国内精品久久久久久久| 少妇av片在线观看| 91精品导航| 欧美日韩久久久久久| 日韩不卡一二三| 热色播在线视频| 亚洲欧美日韩综合aⅴ视频| 久久青青草原一区二区| 99久久久国产精品无码免费| 久久激情久久| 欧美激情中文字幕乱码免费| 色撸撸在线视频| 天天躁日日躁狠狠躁欧美巨大小说| 欧美精品在线观看一区二区| www.久久久久久久久久久| 中文在线免费视频| 亚洲一区二区中文在线| 亚洲欧洲三级| 九色在线播放| 99re这里都是精品| 操一操视频一区| 五月婷婷丁香六月| 福利一区二区在线观看| 国产女精品视频网站免费| 中文字幕无线码一区| 久久婷婷av| 成人一区二区电影| 中文字幕免费播放| 国产精品小仙女| 成人久久久久久久| 欧美一级特黄aaaaaa大片在线观看| 久久色中文字幕| 97精品国产97久久久久久粉红| 华人av在线| a一区二区三区亚洲| 视频福利在线| 女人香蕉久久**毛片精品| 亚洲欧美日韩网| 99久久人妻无码中文字幕系列| 日本亚洲视频| 亚洲欧洲一区二区三区在线观看 | 欧美三级三级三级爽爽爽| 成人午夜精品久久久久久久蜜臀| 国产原创精品视频| 黄色精品一区二区| 国产欧美日韩网站| 国产成人精品一区二区三区在线| 精品久久久久久综合日本欧美 | 国产精品一区二区在线观看不卡| 免费看国产精品一二区视频| 亚洲色图另类小说| 亚洲日本一区二区| 久久黄色免费看| 九九热最新地址| 精品无码国产一区二区三区av| 国产尤物视频在线观看| 日本不卡一区二区三区高清视频| 春色成人在线视频| 色婷婷av一区二区三区之红樱桃| 成人动漫中文字幕| 国产一区国产精品| 国产伦理在线观看| 国产精品亚洲激情| 欧美影院在线播放| 午夜一级免费视频| 天天躁日日躁狠狠躁欧美巨大小说| xxx欧美精品| 国产精品xxxxxx| 老司机一区二区| 91亚洲精品久久久| 粉嫩小泬无遮挡久久久久久| 成人av免费在线| 国产精品无码电影在线观看| www.91精品| 最近2019中文字幕第三页视频 | 日韩伦理在线免费观看| 99er精品视频| 久久人体大胆视频| 91久久精品国产91性色69| 亚洲国产精品成人综合色在线婷婷| 亚洲a∨一区二区三区| 日本高清中文字幕在线| 亚洲综合久久久久| 日本在线视频www| 羞羞视频在线观看一区二区| 欧美一区 二区| 欧美黑人一区| 亚洲精品视频在线看| 成人免费在线观看视频网站| 91精彩在线视频| 国产精品网曝门| 最近中文字幕一区二区| 久久精品高清| 欧美激情一区二区三级高清视频 | 亚洲综合电影一区二区三区| 久久99精品久久久久久久久久| 成年男女免费视频网站不卡| 日韩成人中文字幕| 中文字幕黄色网址| 欧美激情麻豆| 国产亚洲成av人在线观看导航 | 97人人模人人爽人人澡| www.精品国产| 日韩性生活视频| 国产成人精品免费看视频| 国产亚洲一区二区三区四区 | 国产在线a视频| 成人免费无遮挡| 欧美久久一区二区| 国产97免费视频| av不卡一区二区三区| 国产精品igao| 日本欧美高清| 国产精品国产福利国产秒拍| 香蕉视频911| 一本高清dvd不卡在线观看| 大吊一区二区三区| 国产成人在线观看| 伊人天天久久大香线蕉av色| 日韩电影av| 中文字幕在线成人| 4438国产精品一区二区| 成人精品鲁一区一区二区| 日本a级片免费观看| 四季av一区二区凹凸精品| 豆国产97在线| 成人看片毛片免费播放器| 欧美俄罗斯乱妇| 国产毛片一区二区三区va在线| 亚洲一区二区精品3399| av黄色在线免费观看| 国产精品一区不卡| 日韩激情免费视频| 88国产精品视频一区二区三区| 好吊色欧美一区二区三区四区| 美女视频一区| 欧美伊久线香蕉线新在线| 成人黄色在线电影| 91精品国产91热久久久做人人| 国产黄色片在线| 波多野洁衣一区| 日本美女视频一区| 久久人人97超碰国产公开结果| 伊人网在线免费| 视频精品在线观看| 国产精品999999| 粉嫩一区二区三区国产精品| 在线观看亚洲a| 国产污视频在线观看| av亚洲精华国产精华精华| 182午夜在线观看| 久久精品三级| 国产精品一区二区免费在线观看| 欧美精品密入口播放| 91色中文字幕| 日本一区免费网站| 日韩视频免费在线| 日韩二区三区| 欧美在线看片a免费观看| 一本色道a无线码一区v| 国产精品久久久久久av| 日本一级理论片在线大全| 中文字幕欧美日韩精品 | 精品99一区二区三区| 中文字幕第20页| www.亚洲精品| 成人做爰69片免费| 国内成人免费视频| 国产麻豆91精品| 久久久久久久久久久网站| 午夜视频在线观看免费视频| 日韩激情av在线播放| 人妻va精品va欧美va| 日韩写真欧美这视频| 2014亚洲天堂| 国产91精品一区二区麻豆亚洲| 日本高清久久久| 蜜桃av一区二区| 亚洲激情在线视频| 国产精品白嫩白嫩大学美女| 成人午夜av影视| 国产成人无码一二三区视频| 91偷拍一区二区三区精品| 日本成人黄色免费看| 伊人成综合网伊人222| 美脚丝袜一区二区三区在线观看| 欧美影院天天5g天天爽| 久久久综合香蕉尹人综合网| 日本国产精品| 久久久午夜精品理论片中文字幕| 国产剧情日韩欧美| 韩国精品主播一区二区在线观看| 国产成人精品在线播放| 成人在线爆射| 国产精品偷伦一区二区 | 欧美xxxxx视频| 在线综合视频网站| 中文字幕一区二区三区在线视频| 成人a视频在线观看| 亚洲国产天堂| 2019国产精品视频| 最新日韩三级| 热re99久久精品国产66热| 男人揉女人奶房视频60分| 欧美黄色aaa| 福利在线午夜| 国产99久久久国产精品免费看 | 麻豆国产在线播放| 一区二区不卡在线视频 午夜欧美不卡在| 在线免费看av网站| 国产人妖乱国产精品人妖| 亚洲一二三四视频| 亚洲精品日产精品乱码不卡| 自拍偷拍欧美亚洲| 欧美色中文字幕| 国产91免费看| 最近2019好看的中文字幕免费| 呦呦在线视频| 日韩美女中文字幕| 欧美影院在线| 欧洲av一区| 欧美午夜在线| 大陆极品少妇内射aaaaaa| 一本色道久久精品| 日韩极品视频在线观看| 老牛影视一区二区三区| 日本成人xxx| 久久伊人蜜桃av一区二区| 免费成人深夜夜行网站| 欧美午夜xxx| hs视频在线观看| 一区二区三区 在线观看视| 日韩porn| 精品中文字幕视频| 香蕉成人app免费看片| 欧洲s码亚洲m码精品一区| 久久av偷拍| av成人观看| 精品国产一级毛片| 色姑娘综合网| 亚洲黄网站黄| 男人揉女人奶房视频60分| 国产一二三精品| 一区二区三区在线观看免费视频| 一区二区在线观看视频| 亚洲视频精品在线| 另类欧美小说| 国产激情一区| 全亚洲最色的网站在线观看| 2020国产精品小视频| 欧美日韩在线观看一区二区三区| 亚洲第一二三区| 国产精品三级一区二区| 久久99精品久久久久久| 国产真实乱人偷精品人妻| 亚洲成a人片综合在线| 色欲人妻综合网| 在线视频一区二区免费| 天天舔天天干天天操| 欧美日韩成人在线视频| 色综合视频一区二区三区日韩| 欧美日韩在线一二三| 亚洲三级视频| 国模私拍在线观看| 国产亚洲欧美日韩俺去了| 国产性xxxx高清| 精品国产髙清在线看国产毛片| 麻豆网站在线| 久久久久久久久91| 看亚洲a级一级毛片| 一区二区三区视频| 在线不卡亚洲| 午夜xxxxx| 99re热视频精品| 日本熟妇一区二区| 亚洲精品一区二区三区福利| 污视频网站免费在线观看| 亚洲bt天天射| 免费成人结看片| 国产精品视频一区二区三区四区五区| www.激情成人| 免费观看成人毛片| 欧美二区三区91| 精精国产xxxx视频在线| 91亚洲午夜在线| 欧美精品三级| 亚洲天堂av网站| 国产精品国产自产拍高清av王其| 午夜一级黄色片| 日韩欧美亚洲国产精品字幕久久久| 日韩在线免费看| 日韩美女在线看| 久久国产电影| 国产网站免费在线观看| 久久久蜜桃精品| 一区二区视频网站| 久久亚洲综合国产精品99麻豆精品福利 | 色天使在线观看| 亚洲精品免费在线观看| 免费成人在线看| 精品成a人在线观看| 国产免费视频传媒| 欧美a一级片| 久久久久久九九| 久久精品导航| 少妇高潮一区二区三区喷水| 欧美一区二区免费视频| 国产美女一区视频| 91极品女神在线| 伊人久久大香| 成年在线观看视频| av福利精品导航| 中文字幕在线观看1| 91精品国产全国免费观看| 依依成人在线视频| 欧美大肥婆大肥bbbbb| 精品少妇3p| 午夜免费福利视频在线观看| 一区二区三区高清| 久青青在线观看视频国产| 91精品在线观| 性一交一乱一区二区洋洋av| 欧美日韩色视频| 亚洲美女在线观看| 精品一区二区三区中文字幕视频 | 久久午夜羞羞影院免费观看|