環(huán)境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21
Flink CDC(Flink Change Data Capture)是基于數(shù)據(jù)庫(kù)的日志CDC技術(shù),實(shí)現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。它搭配Flink計(jì)算框架,能夠高效實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時(shí)集成。Flink CDC的核心功能在于實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫(kù)或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來(lái),以便進(jìn)一步的處理和分析。通過(guò)使用Flink CDC,用戶可以輕松地構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,對(duì)數(shù)據(jù)變動(dòng)進(jìn)行實(shí)時(shí)響應(yīng)和處理,為實(shí)時(shí)分析、實(shí)時(shí)報(bào)表和實(shí)時(shí)決策等場(chǎng)景提供強(qiáng)大的支持。
具體來(lái)說(shuō),F(xiàn)link CDC的應(yīng)用場(chǎng)景包括但不限于實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)更新、實(shí)時(shí)數(shù)據(jù)同步和遷移、實(shí)時(shí)數(shù)據(jù)處理等。它還可以確保數(shù)據(jù)一致性,并在數(shù)據(jù)發(fā)生變更時(shí)能夠準(zhǔn)確地捕獲和處理。此外,F(xiàn)link CDC支持與多種數(shù)據(jù)源進(jìn)行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應(yīng)的連接器,方便數(shù)據(jù)的捕獲和處理。
接下來(lái)將詳細(xì)的介紹關(guān)于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數(shù)據(jù)庫(kù)讀取快照數(shù)據(jù)和增量數(shù)據(jù)。
支持的數(shù)據(jù)庫(kù)
Connector | Database | Driver |
mysql-cdc |
| JDBC Driver 8.0.27 |
在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的/my.ini),需要在[mysqld]部分設(shè)置相關(guān)參數(shù)以開(kāi)啟binlog功能,如下:
[mysqld]server-id=1# 格式,行級(jí)格式binlog-format=Row# binlog 日志文件的前綴log-bin=mysql-bin# 指定哪些數(shù)據(jù)庫(kù)需要記錄二進(jìn)制日志binlog_do_db=testjpa除了開(kāi)啟binlog功能外,F(xiàn)link CDC還需要其他配置和權(quán)限來(lái)確保能夠正常連接到MySQL并讀取數(shù)據(jù)。例如,需要授予Flink CDC連接MySQL的用戶必要的權(quán)限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權(quán)限是Flink CDC讀取數(shù)據(jù)和元數(shù)據(jù)所必需的。
mysql> SHOW VARIABLES LIKE 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | ON |+---------------+-------+以上就對(duì)mysql相關(guān)的配置完成了。
<properties> <flink.version>1.19.0</flink.version></properties><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>3.0.1</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version></dependency>@Componentpublic class MonitorMySQLCDC implements InitializingBean { // 該隊(duì)列專門用來(lái)臨時(shí)保存變化的數(shù)據(jù)(實(shí)際生產(chǎn)環(huán)境,你應(yīng)該使用MQ相關(guān)的產(chǎn)品) public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ; private final StringRedisTemplate stringRedisTemplate ; // 保存到redis中key的前綴 private final String PREFIX = "users:" ; // 數(shù)據(jù)發(fā)生變化后的sink處理 private final CustomSink customSink ; public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) { this.customSink = customSink ; this.stringRedisTemplate = stringRedisTemplate ; } @Override public void afterPropertiesSet() throws Exception { // 啟動(dòng)異步線程,實(shí)時(shí)處理隊(duì)列中的數(shù)據(jù) new Thread(() -> { while(true) { try { Map<String, Object> result = queue.take(); this.doAction(result) ; } catch (Exception e) { e.printStackTrace(); } } }).start() ; Properties jdbcProperties = new Properties() ; jdbcProperties.setProperty("useSSL", "false") ; MySqlSource<String> source = MySqlSource.<String>builder() .hostname("127.0.0.1") .port(3306) // 可配置多個(gè)數(shù)據(jù)庫(kù) .databaseList("testjpa") // 可配置多個(gè)表 .tableList("testjpa.users") .username("root") .password("123123") .jdbcProperties(jdbcProperties) // 包括schema的改變 .includeSchemaChanges(true) // 反序列化設(shè)置 // .deserializer(new StringDebeziumDeserializationSchema()) .deserializer(new JsonDebeziumDeserializationSchema(true)) // 啟動(dòng)模式;關(guān)于啟動(dòng)模式下面詳細(xì)介紹 .startupOptions(StartupOptions.initial()) .build() ; // 環(huán)境配置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ; // 設(shè)置 6s 的 checkpoint 間隔 env.enableCheckpointing(6000) ; // 設(shè)置 source 節(jié)點(diǎn)的并行度為 4 env.setParallelism(4) ; env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL") // 添加Sink .addSink(this.customSink) ; env.execute() ; } @SuppressWarnings("unchecked") private void doAction(Map<String, Object> result) throws Exception { Map<String, Object> payload = (Map<String, Object>) result.get("payload") ; String op = (String) payload.get("op") ; switch (op) { // 更新和插入操作 case "u", "c" -> { Map<String, Object> after = (Map<String, Object>) payload.get("after") ; String id = after.get("id").toString(); System.out.printf("操作:%s, ID: %s%n", op, id) ; stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ; } // 刪除操作 case "d" -> { Map<String, Object> after = (Map<String, Object>) payload.get("before") ; String id = after.get("id").toString(); stringRedisTemplate.delete(PREFIX + id) ; } } } }啟動(dòng)模式:
@Componentpublic class CustomSink extends RichSinkFunction<String> { private ObjectMapper mapper = new ObjectMapper(); @Override public void invoke(String value, Context context) throws Exception { System.out.printf("數(shù)據(jù)發(fā)生變化: %s%n", value); TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() { }; Map<String, Object> result = mapper.readValue(value, valueType); Map<String, Object> payload = (Map<String, Object>) result.get("payload"); String op = (String) payload.get("op") ; // 不對(duì)讀操作處理 if (!"r".equals(op)) { MonitorMySQLCDC.queue.put(result); } }}以上就是實(shí)現(xiàn)通過(guò)FlinkCDC實(shí)時(shí)通過(guò)數(shù)據(jù)到Redis的所有代碼。
引入flink web依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version></dependency>環(huán)境配置
Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;web監(jiān)聽(tīng)9090端口。
圖片
通過(guò)web控制臺(tái)你可以管理查看到更多的信息。
本文鏈接:http://m.www897cc.com/showinfo-26-82367-0.htmlSpringBoot整合Flink CDC,實(shí)時(shí)追蹤數(shù)據(jù)變動(dòng),無(wú)縫同步至Redis
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com