日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁(yè) > 科技  > 軟件

SpringBoot整合Flink CDC,實(shí)時(shí)追蹤數(shù)據(jù)變動(dòng),無(wú)縫同步至Redis

來(lái)源: 責(zé)編: 時(shí)間:2024-04-09 17:23:02 259觀看
導(dǎo)讀環(huán)境:SpringBoot2.7.16 + Flink 1.19.0 + JDK211. 簡(jiǎn)介Flink CDC(Flink Change Data Capture)是基于數(shù)據(jù)庫(kù)的日志CDC技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。它搭配Flink計(jì)算框架,能夠高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時(shí)集成。Fl

環(huán)境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21sSH28資訊網(wǎng)——每日最新資訊28at.com

1. 簡(jiǎn)介

Flink CDC(Flink Change Data Capture)是基于數(shù)據(jù)庫(kù)的日志CDC技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。它搭配Flink計(jì)算框架,能夠高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時(shí)集成。Flink CDC的核心功能在于實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫(kù)或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來(lái),以便進(jìn)一步的處理和分析。通過(guò)使用Flink CDC,用戶可以輕松地構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,對(duì)數(shù)據(jù)變動(dòng)進(jìn)行實(shí)時(shí)響應(yīng)和處理,為實(shí)時(shí)分析、實(shí)時(shí)報(bào)表和實(shí)時(shí)決策等場(chǎng)景提供強(qiáng)大的支持。sSH28資訊網(wǎng)——每日最新資訊28at.com

具體來(lái)說(shuō),F(xiàn)link CDC的應(yīng)用場(chǎng)景包括但不限于實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)更新、實(shí)時(shí)數(shù)據(jù)同步和遷移、實(shí)時(shí)數(shù)據(jù)處理等。它還可以確保數(shù)據(jù)一致性,并在數(shù)據(jù)發(fā)生變更時(shí)能夠準(zhǔn)確地捕獲和處理。此外,F(xiàn)link CDC支持與多種數(shù)據(jù)源進(jìn)行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應(yīng)的連接器,方便數(shù)據(jù)的捕獲和處理。sSH28資訊網(wǎng)——每日最新資訊28at.com

接下來(lái)將詳細(xì)的介紹關(guān)于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數(shù)據(jù)庫(kù)讀取快照數(shù)據(jù)和增量數(shù)據(jù)。sSH28資訊網(wǎng)——每日最新資訊28at.com

支持的數(shù)據(jù)庫(kù)sSH28資訊網(wǎng)——每日最新資訊28at.com

ConnectorsSH28資訊網(wǎng)——每日最新資訊28at.com

DatabasesSH28資訊網(wǎng)——每日最新資訊28at.com

DriversSH28資訊網(wǎng)——每日最新資訊28at.com

mysql-cdcsSH28資訊網(wǎng)——每日最新資訊28at.com

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.27sSH28資訊網(wǎng)——每日最新資訊28at.com

2. 實(shí)戰(zhàn)案例

2.1 MySQL開(kāi)啟Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的/my.ini),需要在[mysqld]部分設(shè)置相關(guān)參數(shù)以開(kāi)啟binlog功能,如下:sSH28資訊網(wǎng)——每日最新資訊28at.com

[mysqld]server-id=1# 格式,行級(jí)格式binlog-format=Row# binlog 日志文件的前綴log-bin=mysql-bin# 指定哪些數(shù)據(jù)庫(kù)需要記錄二進(jìn)制日志binlog_do_db=testjpa

除了開(kāi)啟binlog功能外,F(xiàn)link CDC還需要其他配置和權(quán)限來(lái)確保能夠正常連接到MySQL并讀取數(shù)據(jù)。例如,需要授予Flink CDC連接MySQL的用戶必要的權(quán)限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權(quán)限是Flink CDC讀取數(shù)據(jù)和元數(shù)據(jù)所必需的。sSH28資訊網(wǎng)——每日最新資訊28at.com

查看是否開(kāi)啟了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin       | ON    |+---------------+-------+

以上就對(duì)mysql相關(guān)的配置完成了。sSH28資訊網(wǎng)——每日最新資訊28at.com

2.2 依賴管理

<properties>  <flink.version>1.19.0</flink.version></properties><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-base</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>com.ververica</groupId>  <artifactId>flink-sql-connector-mysql-cdc</artifactId>  <version>3.0.1</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-clients</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-table-runtime</artifactId>  <version>${flink.version}</version></dependency>

2.3 代碼實(shí)現(xiàn)

@Componentpublic class MonitorMySQLCDC implements InitializingBean {  // 該隊(duì)列專門用來(lái)臨時(shí)保存變化的數(shù)據(jù)(實(shí)際生產(chǎn)環(huán)境,你應(yīng)該使用MQ相關(guān)的產(chǎn)品)  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;    private final StringRedisTemplate stringRedisTemplate ;  // 保存到redis中key的前綴  private final String PREFIX = "users:" ;  // 數(shù)據(jù)發(fā)生變化后的sink處理  private final CustomSink customSink ;  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {    this.customSink = customSink ;    this.stringRedisTemplate = stringRedisTemplate ;  }    @Override  public void afterPropertiesSet() throws Exception {    // 啟動(dòng)異步線程,實(shí)時(shí)處理隊(duì)列中的數(shù)據(jù)    new Thread(() -> {      while(true) {        try {          Map<String, Object> result = queue.take();          this.doAction(result) ;        } catch (Exception e) {          e.printStackTrace();        }      }    }).start() ;    Properties jdbcProperties = new Properties() ;    jdbcProperties.setProperty("useSSL", "false") ;    MySqlSource<String> source = MySqlSource.<String>builder()        .hostname("127.0.0.1")        .port(3306)        // 可配置多個(gè)數(shù)據(jù)庫(kù)        .databaseList("testjpa")        // 可配置多個(gè)表        .tableList("testjpa.users")        .username("root")        .password("123123")        .jdbcProperties(jdbcProperties)        // 包括schema的改變        .includeSchemaChanges(true)        // 反序列化設(shè)置        // .deserializer(new StringDebeziumDeserializationSchema())        .deserializer(new JsonDebeziumDeserializationSchema(true))        // 啟動(dòng)模式;關(guān)于啟動(dòng)模式下面詳細(xì)介紹        .startupOptions(StartupOptions.initial())        .build() ;    // 環(huán)境配置    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;    // 設(shè)置 6s 的 checkpoint 間隔    env.enableCheckpointing(6000) ;    // 設(shè)置 source 節(jié)點(diǎn)的并行度為 4    env.setParallelism(4) ;    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")        // 添加Sink        .addSink(this.customSink) ;    env.execute() ;  }    @SuppressWarnings("unchecked")  private void doAction(Map<String, Object> result) throws Exception {    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;    String op = (String) payload.get("op") ;    switch (op) {      // 更新和插入操作      case "u", "c" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;        String id = after.get("id").toString();        System.out.printf("操作:%s, ID: %s%n", op, id) ;        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;      }      // 刪除操作      case "d" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;        String id = after.get("id").toString();        stringRedisTemplate.delete(PREFIX + id) ;      }     }  }  }

啟動(dòng)模式:sSH28資訊網(wǎng)——每日最新資訊28at.com

  • initial (默認(rèn)):在第一次啟動(dòng)時(shí)對(duì)受監(jiān)視的數(shù)據(jù)庫(kù)表執(zhí)行初始快照,并繼續(xù)讀取最新的 binlog。
  • earliest-offset:跳過(guò)快照階段,從可讀取的最早 binlog 位點(diǎn)開(kāi)始讀取
  • latest-offset:首次啟動(dòng)時(shí),從不對(duì)受監(jiān)視的數(shù)據(jù)庫(kù)表執(zhí)行快照, 連接器僅從 binlog 的結(jié)尾處開(kāi)始讀取,這意味著連接器只能讀取在連接器啟動(dòng)之后的數(shù)據(jù)更改。
  • specific-offset:跳過(guò)快照階段,從指定的 binlog 位點(diǎn)開(kāi)始讀取。位點(diǎn)可通過(guò) binlog 文件名和位置指定,或者在 GTID 在集群上啟用時(shí)通過(guò) GTID 集合指定。
  • timestamp:跳過(guò)快照階段,從指定的時(shí)間戳開(kāi)始讀取 binlog 事件。

數(shù)據(jù)處理Sink

@Componentpublic class CustomSink extends RichSinkFunction<String> {  private ObjectMapper mapper = new ObjectMapper();  @Override  public void invoke(String value, Context context) throws Exception {    System.out.printf("數(shù)據(jù)發(fā)生變化: %s%n", value);    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {    };    Map<String, Object> result = mapper.readValue(value, valueType);    Map<String, Object> payload = (Map<String, Object>) result.get("payload");    String op = (String) payload.get("op") ;    // 不對(duì)讀操作處理    if (!"r".equals(op)) {      MonitorMySQLCDC.queue.put(result);    }  }}

以上就是實(shí)現(xiàn)通過(guò)FlinkCDC實(shí)時(shí)通過(guò)數(shù)據(jù)到Redis的所有代碼。sSH28資訊網(wǎng)——每日最新資訊28at.com

2.4 Web監(jiān)控頁(yè)面

引入flink web依賴sSH28資訊網(wǎng)——每日最新資訊28at.com

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-runtime-web</artifactId>  <version>${flink.version}</version></dependency>

環(huán)境配置sSH28資訊網(wǎng)——每日最新資訊28at.com

Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web監(jiān)聽(tīng)9090端口。sSH28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片sSH28資訊網(wǎng)——每日最新資訊28at.com

通過(guò)web控制臺(tái)你可以管理查看到更多的信息。sSH28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://m.www897cc.com/showinfo-26-82367-0.htmlSpringBoot整合Flink CDC,實(shí)時(shí)追蹤數(shù)據(jù)變動(dòng),無(wú)縫同步至Redis

聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com

上一篇: 圖解 CSS Grid 布局,一起來(lái)看看 CSS Grid 布局是如何使用的

下一篇: 架構(gòu)見(jiàn)解:使用Instagram示例設(shè)計(jì)高效的多層緩存

標(biāo)簽:
  • 熱門焦點(diǎn)
Top 日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不
亚洲伦理网站| 国产精品久久国产愉拍 | 国产丝袜美腿一区二区三区| 国产女优一区| 尤物视频一区二区| 99视频精品在线| 欧美在线免费播放| 欧美成人精品在线视频| 欧美视频在线观看 亚洲欧| 国产色产综合色产在线视频| 亚洲国产日本| 午夜精品视频| 欧美福利一区| 国产欧美日韩综合一区在线播放 | 亚洲国产精品视频| 亚洲宅男天堂在线观看无病毒| 久久精品五月婷婷| 欧美日韩高清在线播放| 国产伦精品一区二区三区照片91 | 亚洲视频中文字幕| 久久久精品国产一区二区三区 | 国产午夜亚洲精品不卡| 亚洲欧洲一区二区三区久久| 亚洲欧美日韩成人| 欧美成人网在线| 国产日韩欧美综合| 日韩午夜中文字幕| 久久久欧美一区二区| 国产精品高潮粉嫩av| 亚洲欧洲精品一区二区精品久久久| 午夜亚洲福利在线老司机| 欧美第一黄网免费网站| 国产午夜精品美女视频明星a级 | 性欧美暴力猛交69hd| 欧美日韩伦理在线| 在线电影一区| 欧美一级二级三级蜜桃| 欧美色区777第一页| 亚洲国产女人aaa毛片在线| 欧美一区二区三区视频免费播放 | 午夜免费在线观看精品视频| 欧美精品一区二区三| 在线观看视频日韩| 欧美一区二区三区四区视频| 欧美视频四区| 亚洲精品三级| 另类酷文…触手系列精品集v1小说| 国产裸体写真av一区二区| 一区二区三区四区五区在线| 欧美国产先锋| 亚洲第一精品夜夜躁人人躁| 欧美中文字幕在线| 国产精品麻豆va在线播放| 日韩一级视频免费观看在线| 欧美激情欧美狂野欧美精品| 在线欧美视频| 久久久久国产一区二区三区四区| 国产精品一区二区你懂的| 亚洲一区综合| 欧美性天天影院| 一本久道久久久| 欧美日韩成人综合| 亚洲精品视频在线观看免费| 欧美+亚洲+精品+三区| 在线成人中文字幕| 米奇777在线欧美播放| 在线免费观看视频一区| 久久综合色播五月| 永久91嫩草亚洲精品人人| 久久久蜜桃一区二区人| 狠狠色狠色综合曰曰| 久久久av毛片精品| 国产综合久久久久久鬼色| 久久精品国产综合精品| 国内精品视频666| 久久久噜噜噜久久狠狠50岁| 在线电影欧美日韩一区二区私密| 久久理论片午夜琪琪电影网| 黄色成人91| 鲁大师影院一区二区三区| 亚洲国产成人在线| 欧美多人爱爱视频网站| 亚洲精品少妇30p| 欧美日韩黄色大片| 亚洲午夜成aⅴ人片| 国产精品久久二区| 欧美影院视频| 今天的高清视频免费播放成人 | 国产伦精品一区二区三区四区免费| 亚洲欧美综合v| 欧美一区二区三区视频在线观看 | 亚洲作爱视频| 国产精品成人一区二区三区夜夜夜| 亚洲小说春色综合另类电影| 国产精品美女www爽爽爽| 午夜一区二区三区不卡视频| 国产日韩欧美一二三区| 久久久999精品免费| 在线视频观看日韩| 欧美精品一区二区在线观看| 亚洲午夜一级| 国产一区二区三区精品久久久| 巨乳诱惑日韩免费av| 亚洲精品在线观| 国产精品成人午夜| 久久国产精品亚洲va麻豆| 在线精品视频免费观看| 欧美日韩成人激情| 午夜精品短视频| 激情五月综合色婷婷一区二区| 欧美成年人视频| 亚洲视频一区二区在线观看| 国产婷婷色一区二区三区| 麻豆免费精品视频| 在线视频你懂得一区| 国产欧美日韩综合| 欧美成人一区二区三区在线观看| 一区二区三欧美| 国产日韩欧美一区二区三区在线观看 | 久久夜色精品亚洲噜噜国产mv| 亚洲精品中文字幕女同| 国产精品青草久久| 久久视频在线视频| 国产精品99久久久久久www| 国产一区二区福利| 欧美激情亚洲一区| 午夜在线电影亚洲一区| 亚洲国产99| 国产精品乱码一区二三区小蝌蚪| 久久视频在线免费观看| 9l国产精品久久久久麻豆| 国内外成人免费激情在线视频网站| 欧美精品一区二区三区久久久竹菊 | 国产欧美在线看| 欧美国产成人精品| 亚洲免费视频中文字幕| 亚洲国产精品成人| 国产精品视频观看| 欧美第一黄色网| 羞羞答答国产精品www一本| 亚洲精品免费电影| 国产视频在线一区二区| 欧美日韩久久| 麻豆精品一区二区综合av| 亚洲欧美国产精品va在线观看| 1769国产精品| 国产精品影视天天线| 欧美精品一区三区在线观看| 久久国产主播精品| 亚洲网站视频| 亚洲人成在线观看| 国模私拍一区二区三区| 国产精品久久久久久久久久妞妞 | 久久国产66| 亚洲视频观看| 亚洲激情视频网站| 国产一区在线播放| 国产精品第2页| 欧美高清影院| 久久性天堂网| 欧美伊久线香蕉线新在线| 日韩一区二区免费看| 一区二区三区无毛| 国产欧美日本一区二区三区| 欧美日韩成人在线| 欧美11—12娇小xxxx| 久久久久久国产精品一区| 性欧美大战久久久久久久免费观看 | 亚洲最新在线视频| 亚洲第一级黄色片| 国产一区二区三区在线观看视频| 国产精品国产三级国产aⅴ浪潮| 欧美护士18xxxxhd| 牛牛影视久久网| 久久天天躁夜夜躁狠狠躁2022| 欧美一区二区三区电影在线观看| 亚洲天天影视| 一区二区免费在线视频| 日韩视频免费在线| 亚洲人成绝费网站色www| 亚洲成色www8888| 伊人色综合久久天天| 国产综合久久久久久鬼色| 国产精品欧美久久久久无广告| 欧美色视频在线| 欧美日本在线一区| 欧美激情中文不卡| 欧美激情视频网站| 欧美精品在线一区二区| 欧美华人在线视频| 欧美精品成人91久久久久久久| 欧美成年人视频网站| 免费日本视频一区| 蜜臀av国产精品久久久久| 噜噜噜91成人网| 欧美大片在线观看一区| 欧美高清不卡| 欧美激情一区二区三级高清视频| 欧美大片第1页| 欧美精品国产一区| 欧美日韩国产免费观看| 欧美日韩一区精品| 国产精品高潮呻吟| 国产精品一卡二|