精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SpringBoot整合Flink CDC,實時追蹤數據變動,無縫同步至Redis

開發 前端
具體來說,Flink CDC的應用場景包括但不限于實時數據倉庫更新、實時數據同步和遷移、實時數據處理等。它還可以確保數據一致性,并在數據發生變更時能夠準確地捕獲和處理。

環境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21

1. 簡介

Flink CDC(Flink Change Data Capture)是基于數據庫的日志CDC技術,實現了全增量一體化讀取的數據集成框架。它搭配Flink計算框架,能夠高效實現海量數據的實時集成。Flink CDC的核心功能在于實時地監視數據庫或數據流中發生的數據變動,并將這些變動抽取出來,以便進一步的處理和分析。通過使用Flink CDC,用戶可以輕松地構建實時數據管道,對數據變動進行實時響應和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。

具體來說,Flink CDC的應用場景包括但不限于實時數據倉庫更新、實時數據同步和遷移、實時數據處理等。它還可以確保數據一致性,并在數據發生變更時能夠準確地捕獲和處理。此外,Flink CDC支持與多種數據源進行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應的連接器,方便數據的捕獲和處理。

接下來將詳細的介紹關于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數據庫讀取快照數據和增量數據。

支持的數據庫

Connector

Database

Driver

mysql-cdc

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.27

2. 實戰案例

2.1 MySQL開啟Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini),需要在[mysqld]部分設置相關參數以開啟binlog功能,如下:

[mysqld]
server-id=1
# 格式,行級格式
binlog-format=Row
# binlog 日志文件的前綴
log-bin=mysql-bin
# 指定哪些數據庫需要記錄二進制日志
binlog_do_db=testjpa

除了開啟binlog功能外,Flink CDC還需要其他配置和權限來確保能夠正常連接到MySQL并讀取數據。例如,需要授予Flink CDC連接MySQL的用戶必要的權限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權限是Flink CDC讀取數據和元數據所必需的。

查看是否開啟了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

以上就對mysql相關的配置完成了。

2.2 依賴管理

<properties>
  <flink.version>1.19.0</flink.version>
</properties>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-base</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  <version>3.0.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-runtime</artifactId>
  <version>${flink.version}</version>
</dependency>

2.3 代碼實現

@Component
public class MonitorMySQLCDC implements InitializingBean {


  // 該隊列專門用來臨時保存變化的數據(實際生產環境,你應該使用MQ相關的產品)
  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;
  
  private final StringRedisTemplate stringRedisTemplate ;
  // 保存到redis中key的前綴
  private final String PREFIX = "users:" ;
  // 數據發生變化后的sink處理
  private final CustomSink customSink ;
  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {
    this.customSink = customSink ;
    this.stringRedisTemplate = stringRedisTemplate ;
  }
  
  @Override
  public void afterPropertiesSet() throws Exception {
    // 啟動異步線程,實時處理隊列中的數據
    new Thread(() -> {
      while(true) {
        try {
          Map<String, Object> result = queue.take();
          this.doAction(result) ;
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start() ;
    Properties jdbcProperties = new Properties() ;
    jdbcProperties.setProperty("useSSL", "false") ;
    MySqlSource<String> source = MySqlSource.<String>builder()
        .hostname("127.0.0.1")
        .port(3306)
        // 可配置多個數據庫
        .databaseList("testjpa")
        // 可配置多個表
        .tableList("testjpa.users")
        .username("root")
        .password("123123")
        .jdbcProperties(jdbcProperties)
        // 包括schema的改變
        .includeSchemaChanges(true)
        // 反序列化設置
        // .deserializer(new StringDebeziumDeserializationSchema())
        .deserializer(new JsonDebeziumDeserializationSchema(true))
        // 啟動模式;關于啟動模式下面詳細介紹
        .startupOptions(StartupOptions.initial())
        .build() ;
    // 環境配置
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
    // 設置 6s 的 checkpoint 間隔
    env.enableCheckpointing(6000) ;
    // 設置 source 節點的并行度為 4
    env.setParallelism(4) ;
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
        // 添加Sink
        .addSink(this.customSink) ;
    env.execute() ;
  }
  
  @SuppressWarnings("unchecked")
  private void doAction(Map<String, Object> result) throws Exception {
    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;
    String op = (String) payload.get("op") ;
    switch (op) {
      // 更新和插入操作
      case "u", "c" -> {
        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;
        String id = after.get("id").toString();
        System.out.printf("操作:%s, ID: %s%n", op, id) ;
        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;
      }
      // 刪除操作
      case "d" -> {
        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;
        String id = after.get("id").toString();
        stringRedisTemplate.delete(PREFIX + id) ;
      } 
    }
  }
  
}

啟動模式:

  • initial (默認):在第一次啟動時對受監視的數據庫表執行初始快照,并繼續讀取最新的 binlog。
  • earliest-offset:跳過快照階段,從可讀取的最早 binlog 位點開始讀取
  • latest-offset:首次啟動時,從不對受監視的數據庫表執行快照, 連接器僅從 binlog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數據更改。
  • specific-offset:跳過快照階段,從指定的 binlog 位點開始讀取。位點可通過 binlog 文件名和位置指定,或者在 GTID 在集群上啟用時通過 GTID 集合指定。
  • timestamp:跳過快照階段,從指定的時間戳開始讀取 binlog 事件。

數據處理Sink

@Component
public class CustomSink extends RichSinkFunction<String> {


  private ObjectMapper mapper = new ObjectMapper();


  @Override
  public void invoke(String value, Context context) throws Exception {
    System.out.printf("數據發生變化: %s%n", value);
    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {
    };
    Map<String, Object> result = mapper.readValue(value, valueType);
    Map<String, Object> payload = (Map<String, Object>) result.get("payload");
    String op = (String) payload.get("op") ;
    // 不對讀操作處理
    if (!"r".equals(op)) {
      MonitorMySQLCDC.queue.put(result);
    }
  }
}

以上就是實現通過FlinkCDC實時通過數據到Redis的所有代碼。

2.4 Web監控頁面

引入flink web依賴

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime-web</artifactId>
  <version>${flink.version}</version>
</dependency>

環境配置

Configuration config = new Configuration() ;
config.set(RestOptions.PORT, 9090) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web監聽9090端口。

圖片圖片

通過web控制臺你可以管理查看到更多的信息。

責任編輯:武曉燕 來源: Spring全家桶實戰案例源碼
相關推薦

2022-07-20 23:15:11

Flink數據集CDC

2025-07-10 08:46:21

ConnectSpringBoot數據

2024-10-18 11:39:55

MySQL數據檢索

2023-05-03 08:58:46

數據庫開源

2021-06-04 07:24:14

Flink CDC數據

2025-04-01 08:38:41

2013-05-13 13:49:43

大數據

2021-08-17 06:48:43

SpringbootKafkaStream

2021-07-07 23:25:18

RedisFlinkSQL

2025-04-29 08:36:28

SpringCanal數據庫

2022-01-05 18:18:01

Flink 數倉連接器

2023-09-08 10:13:30

開發技術

2024-02-01 12:32:35

MySQL數據鎖數據庫

2020-01-10 15:42:13

SpringBootRedis數據庫

2015-06-09 22:25:06

SAP大道至簡

2023-05-31 08:56:24

2025-04-25 08:34:52

2022-05-23 08:23:24

鏈路追蹤SleuthSpring

2020-06-29 07:43:12

緩存RedisSpringBoot

2025-09-26 08:46:30

點贊
收藏

51CTO技術棧公眾號

久久精品成人av| 欧美一区二区中文字幕| www三级免费| 国产欧美日韩综合一区在线播放 | 欧美激情精品久久久久久蜜臀 | 国产欧美三级电影| 精品久久香蕉国产线看观看亚洲 | 欧美精品videos另类| 国产激情视频一区二区在线观看 | 91精品国产乱码久久| 在线成人av| 日韩在线观看免费网站 | www.五月天激情| 久久综合狠狠| 久久久久久久久久久久av| 久久美女免费视频| 国产 日韩 欧美 综合 一区| 欧美曰成人黄网| 九九爱精品视频| 久久bbxx| 久久精品一区四区| 国产在线资源一区| av免费观看在线| 老汉av免费一区二区三区| 91高清免费视频| 青娱乐国产在线| 国产精品传媒精东影业在线| 国产亚洲精品久久久久动| 一起草在线视频| 网站一区二区| 制服丝袜激情欧洲亚洲| 五月婷婷激情久久| 欧美日韩视频免费观看| 欧美日韩国产一区二区| 无码人妻精品一区二区蜜桃网站| 久cao在线| 中文字幕在线一区| 色之综合天天综合色天天棕色 | 91麻豆精品激情在线观看最新| 欧美日韩精品欧美日韩精品一 | 欧美国产欧美亚州国产日韩mv天天看完整 | 午夜精品久久久内射近拍高清| caoprom在线| 亚洲午夜影视影院在线观看| 欧美交换配乱吟粗大25p| 麻豆av在线导航| 国产精品毛片久久久久久| 色综合久久av| 国产69精品久久app免费版| 久久久亚洲欧洲日产国码αv| 精品无人区一区二区三区 | 91色九色蝌蚪| 免费毛片一区二区三区久久久| 亚洲三区在线播放| 2014亚洲片线观看视频免费| 蜜桃视频在线观看成人| 精品无人乱码| 亚洲国产精品成人综合色在线婷婷 | 午夜免费久久看| 老太脱裤让老头玩ⅹxxxx| 白浆视频在线观看| 欧美性xxxxxxxxx| 日本女优爱爱视频| 亚洲一区av| 精品99久久久久久| 狠狠人妻久久久久久综合蜜桃| 最新国产一区| 日韩中文在线视频| 欧美成人国产精品高潮| 亚洲巨乳在线| 国产精品久久久久久久久免费| 911美女片黄在线观看游戏| 精品影院一区二区久久久| 91在线观看免费网站| 亚洲精品综合网| 91婷婷韩国欧美一区二区| 五月天久久狠狠| 污污的网站在线免费观看| 午夜精品福利一区二区蜜股av| 国产午夜伦鲁鲁| 国产精品成人国产| 精品欧美一区二区三区精品久久| 亚洲天堂资源在线| 日产精品一区二区| 欧美激情区在线播放| 国产伦精品一区二区三区视频网站| 免费久久99精品国产| 亚洲综合成人婷婷小说| 视频在线不卡| 国产精品不卡在线观看| r级无码视频在线观看| 福利一区二区| 精品欧美乱码久久久久久1区2区| 亚洲一区二区三区日韩| 欧美日韩理论| 浅井舞香一区二区| 国产超碰人人模人人爽人人添| 91免费看`日韩一区二区| 在线观看成人av电影| 国产中文在线播放| 欧美高清视频www夜色资源网| 中文字幕a在线观看| 日韩黄色大片| 国产91精品久久久久| 国产三级在线观看视频| 国产三级一区二区| 国产精品无码免费专区午夜| 欧美日韩女优| 日韩精品视频在线播放| 日本中文字幕免费在线观看| 日韩电影在线看| 极品尤物一区二区三区| 性网站在线观看| 欧美日韩一区二区三区四区五区| 老司机免费视频| 欧美激情成人在线| 国产日韩av高清| 可以在线观看的av网站| 亚洲va在线va天堂| 香蕉视频色在线观看| 成人影院天天5g天天爽无毒影院| 7777精品视频| 蜜臀av午夜精品| 亚洲老妇xxxxxx| 最新天堂中文在线| av一区二区在线播放| 韩日欧美一区二区| 丰满人妻一区二区三区无码av| 国产精品久久久久影视| 密臀av一区二区三区| 亚洲69av| 奇米成人av国产一区二区三区| 日韩在线视频免费| 一级精品视频在线观看宜春院 | 国产精品久久久久久亚洲伦| 别急慢慢来1978如如2| 竹菊久久久久久久| 欧美最顶级的aⅴ艳星| 五月婷婷开心中文字幕| 精品动漫一区二区三区| 久久久久久久无码| 国产农村妇女毛片精品久久莱园子| 国产精品中出一区二区三区| 成人三级小说| 亚洲第一福利网站| 日本中文字幕网| 波多野结衣在线一区| 久青草视频在线播放| 第一区第二区在线| 韩国日本不卡在线| 天堂av网在线| 在线一区二区三区四区| 国产又粗又猛又爽又黄的视频小说| 免费视频一区二区| 日本在线观看免费| 亚洲夜间福利| 亚洲字幕在线观看| 俄罗斯一级**毛片在线播放| 欧美成人一区二区三区| 麻豆国产尤物av尤物在线观看| 国产麻豆精品theporn| 欧美视频一区二区三区在线观看| 又大又长粗又爽又黄少妇视频| 欧美激情成人在线| 国产欧美在线一区二区| 成人免费观看在线观看| 国产视频欧美视频| 国产精品国产精品国产| 亚洲欧洲国产专区| 国产a级片视频| 国产美女诱惑一区二区| 午夜视频久久久| 精品中文字幕一区二区三区四区 | 久热这里有精品| 成人动漫一区二区在线| 白嫩少妇丰满一区二区| 天天久久综合| 国产精品免费区二区三区观看| 亚洲三级欧美| 久久精品国产久精国产思思| 日韩性xxxx| 精品视频一区三区九区| 久久精品波多野结衣| 久久久久久日产精品| 一级网站在线观看| 免费久久99精品国产自在现线| 杨幂一区欧美专区| 草草视频在线一区二区| 国产精品久久久久久av| 午夜成年人在线免费视频| 亚洲精品日韩在线| 国产成人精品av在线观| 一本大道久久a久久精品综合| 日日操免费视频| 91亚洲精品久久久蜜桃网站| 岛国av在线免费| 制服诱惑一区二区| 日本天堂免费a| 欧美日韩国产免费观看视频| 国产精品二区在线| 日韩午夜电影免费看| 2019亚洲男人天堂| 最新av在线播放| 色吧影院999| 男人天堂亚洲二区| 亚洲成**性毛茸茸| 一本色道久久综合熟妇| 色综合天天综合| 日操夜操天天操| 亚洲欧美激情一区二区| 中国女人特级毛片| 久久综合九色综合欧美就去吻| 一区二区在线免费观看视频| 蜜臀av性久久久久av蜜臀妖精| 国产黄页在线观看| 国产一区二区三区自拍| 免费观看黄色大片| 成人情趣视频网站| 国产一区二区三区奇米久涩| 欧美日韩午夜电影网| 国产乱人伦真实精品视频| 日本免费久久| 青青在线视频一区二区三区| 国产探花在线观看| 欧美激情视频在线观看| 国产在线观看a| 日韩在线视频观看正片免费网站| 都市激情一区| 国产一区二区三区在线看| 香蕉久久一区二区三区| 亚洲国产精品va在线看黑人| 亚洲精品福利网站| 日韩欧美久久一区| 国产情侣在线播放| 69久久夜色精品国产69蝌蚪网| 亚洲中文字幕在线观看| 欧美色图天堂网| 中国女人一级一次看片| 欧美在线不卡一区| 中文人妻熟女乱又乱精品| 一本色道久久综合狠狠躁的推荐| 9i看片成人免费看片| 欧美三级xxx| 中文字幕黄色片| 在线日韩一区二区| 中文字幕第31页| 3d成人动漫网站| 一级日韩一级欧美| 91精品久久久久久久久99蜜臂| 国产精品国产精品国产专区| 欧美精品电影在线播放| 99久久国产免费| 欧美r级电影在线观看| 成人小说亚洲一区二区三区| 亚洲第一视频网站| 头脑特工队2免费完整版在线观看| 日韩大陆欧美高清视频区| 深夜福利视频在线观看| 一本色道久久88精品综合| av电影在线观看| 欧美成人午夜激情视频| 交100部在线观看| 国产成人精品av| 日韩成人在线一区| 91在线在线观看| 天堂网av成人| 亚州欧美一区三区三区在线| 91精品国产成人观看| 免费不卡av在线| 日韩高清在线一区| 日本高清免费观看| 97久久超碰精品国产| 山东少妇露脸刺激对白在线| 亚洲免费在线播放| 久久久国产高清| 欧美三日本三级三级在线播放| 国产成人精品免费看视频| 亚洲国产成人在线播放| 国产一区二区三区福利| 欧美成年人网站| 伊人久久精品一区二区三区| 成人网在线视频| 丝袜av一区| 男女爱爱视频网站| 免费视频一区| 亚洲高清在线不卡| 久久这里只精品最新地址| 国产精品免费在线视频| 亚洲成av人片一区二区| 亚洲天堂手机在线| 亚洲精品久久视频| 成人影院在线看| 日本欧美国产在线| 综合伊人久久| 少妇精品久久久久久久久久| 亚洲五月婷婷| 第四色婷婷基地| www.亚洲人| 一区二区国产精品精华液| 色综合视频一区二区三区高清| 国产精品午夜福利| 亚洲人成电影网站色xx| 欧美女同一区| 91精品久久久久久久久青青| 美国成人xxx| 波多野结衣 作品| 麻豆专区一区二区三区四区五区| 熟妇高潮一区二区| 亚洲免费在线观看| 亚洲天堂999| 亚洲乱码国产乱码精品精天堂| 日本高清在线观看| 成人网欧美在线视频| 欧美日韩亚洲在线观看| 国产成人无码一二三区视频| 波多野结衣中文字幕一区二区三区| 久艹在线观看视频| 欧美日韩亚洲高清一区二区| 欧美香蕉爽爽人人爽| 7m第一福利500精品视频| 欧美精品三级在线| 亚洲国产精品女人| 久久99精品久久久久久国产越南| 深爱五月激情网| 精品美女国产在线| 日本高清视频免费观看| 欧美成人国产va精品日本一级| 成人高清一区| 日韩三级电影| 青娱乐精品在线视频| 国产毛片欧美毛片久久久| 色综合久久中文字幕| 日韩精品视频无播放器在线看| 午夜精品久久久99热福利| 91精品入口| 久久久久久人妻一区二区三区| 福利一区在线观看| 久久精品视频9| 精品精品国产高清一毛片一天堂| 视频在线观看入口黄最新永久免费国产| 91免费欧美精品| 在线中文字幕第一区| 天堂av.com| 一区二区三区中文字幕电影| 99久久久国产精品无码网爆| 久久6免费高清热精品| 51vv免费精品视频一区二区| 福利视频免费在线观看| 不卡免费追剧大全电视剧网站| 亚洲黄色一区二区| 精品在线观看国产| 全球最大av网站久久| 亚洲在线不卡| 国产成人一区二区精品非洲| 国产在线视频二区| 亚洲精品黄网在线观看| 国产精品一区二区av影院萌芽| 日韩久久不卡| 久久se这里有精品| 免费在线观看黄色av| 亚洲国产高清自拍| 欧美成人h版| 亚洲电影网站| 国产精品99久| 国产情侣自拍av| 一本一本久久a久久精品综合小说 一本一本久久a久久精品牛牛影视 | 欧美激情按摩在线| 日韩成人一级| 日韩爱爱小视频| 一区二区三区国产| 爽爽视频在线观看| 国产三级精品网站| 精品999网站| 五月激情四射婷婷| 欧美一级欧美三级| xx欧美视频| 四虎免费在线观看视频| av在线综合网| 中文字幕av片| 久久久之久亚州精品露出| 韩日一区二区三区| 日批视频在线看| 一本一道综合狠狠老| aaa大片在线观看| 欧美一区国产一区| 国产精品一二三四区| 国产寡妇亲子伦一区二区三区四区| 日韩视频在线免费观看| 欧美视频在线观看免费网址| 91porny九色| 欧美成人午夜剧场免费观看| 日韩欧美天堂| 色18美女社区| 欧美午夜美女看片| 91亚洲天堂| 日韩av免费电影| 成人小视频在线| 亚洲手机在线观看| 欧美一级片一区| 亚洲欧美一级二级三级|