canal 同步mysql数据到clickhouse 支持update delete truncate

张映 发表于 2021-09-18

分类目录: clickhouse

标签:, , , ,

clickhouse中的表基本上都ReplicatedMergeTree+Distributed。而canal只支持单表(从源码中也可以看出),所以做了canal二次开发以实现insert update delete truncate功能。单表和整库都支持。

本文所介绍的canal二次开发,只提供思路,并不能照抄代码。

1,下载canal

  1. git clone https://github.com/alibaba/canal.git  
  2. git checkout canal-1.1.4  
  3. git checkout -b canal-1.1.4 canal-1.1.4  

这里为什么要选择canal1.1.4分支来做开发呢。

1.1.5这个包找不到Cannot resolve com.alibaba.otter:connector.tcp:无法跑起来

2,导入项目

导入时选择maven项目,只需要导入client-adapter就行了,服务端并不改动就别导入了,不然要下载很多依赖

3,修改launcher项目下的pom.xml

  1. <!--clickhouse start-->  
  2. <dependency>  
  3.     <groupId>org.apache.httpcomponents</groupId>  
  4.     <artifactId>httpclient</artifactId>  
  5.     <version>4.5.13</version>  
  6. </dependency>  
  7. <dependency>  
  8.     <groupId>org.apache.httpcomponents</groupId>  
  9.     <artifactId>httpcore</artifactId>  
  10.     <version>4.4.13</version>  
  11. </dependency>  
  12. <dependency>  
  13.     <groupId>org.lz4</groupId>  
  14.     <artifactId>lz4-java</artifactId>  
  15.     <version>1.8.0</version>  
  16. </dependency>  
  17. <dependency>  
  18.     <groupId>net.jpountz.lz4</groupId>  
  19.     <artifactId>lz4</artifactId>  
  20.     <version>1.1.2</version>  
  21. </dependency>  
  22. <dependency>  
  23.     <groupId>ru.yandex.clickhouse</groupId>  
  24.     <artifactId>clickhouse-jdbc</artifactId>  
  25.     <version>0.3.1</version>  
  26. </dependency>  
  27. <!--clickhouse end-->  

连接clickhouse所需要的依赖

4,修改launcher项目下resources/application.yml,

  1. server:  
  2.   port: 8081  
  3. spring:  
  4.   jackson:  
  5.     date-format: yyyy-MM-dd HH:mm:ss  
  6.     time-zone: GMT+8  
  7.     default-property-inclusion: non_null  
  8. canal.conf:  
  9.   mode: tcp #tcp kafka rocketMQ rabbitMQ  
  10.   canalServerHost: 10.0.54.5:11111   //canal 服务端IP  
  11.   syncBatchSize: 1000  
  12.   batchSize: 500  
  13.   retries: 0  
  14.   timeout:  
  15.   accessKey:  
  16.   secretKey:  
  17.   srcDataSources:  
  18.     defaultDS:  
  19.       url: jdbc:mysql://10.0.40.200:3306/tanktest?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false  
  20.       username: canal  
  21.       password: Canal-123  
  22.   canalAdapters:  
  23.     - instance: tanktest  
  24.       groups:  
  25.         - groupId: g1  
  26.           outerAdapters:  
  27.             - name: logger  
  28.             - name: rdb  
  29.               key: mysql1  
  30.               properties:  
  31.                 jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver  
  32.                 jdbc.url: jdbc:clickhouse://10.0.54.5:8123/tanktest  
  33.                 jdbc.username: default  
  34.                 jdbc.password: test1234  

5,修改rdb项目下resources/rdb/mytest_user.yml,文件名可以自定义

  1. dataSourceKey: defaultDS  
  2. destination: tanktest  
  3. groupId: g1  
  4. outerAdapterKey: mysql1  
  5. onCluster: true     //这是我自定义的  
  6. clusterName: clickhouse_test_netjoy  //这是我自定义的  
  7. concurrent: true  
  8. dbMapping:  
  9.   mirrorDb: true  
  10.   database: tanktest  

6,修改RdbSyncService.java

  1. /** 
  2.  * 更新操作 
  3.  * 
  4.  * @param config 配置项 
  5.  * @param dml DML数据 
  6.  */  
  7. private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {  
  8.     Map<String, Object> data = dml.getData();  
  9.     if (data == null || data.isEmpty()) {  
  10.         return;  
  11.     }  
  12.   
  13.     Map<String, Object> old = dml.getOld();  
  14.     if (old == null || old.isEmpty()) {  
  15.         return;  
  16.     }  
  17.   
  18.     DbMapping dbMapping = config.getDbMapping();  
  19.   
  20.     Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);  
  21.   
  22.     Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);  
  23.   
  24.     StringBuilder updateSql = new StringBuilder();  
  25.     if (config.getOnCluster() && !config.getClusterName().isEmpty()) {  
  26.         updateSql.append("ALTER TABLE  ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" UPDATE ");  
  27.     }else{  
  28.         updateSql.append("ALTER TABLE  ").append(SyncUtil.getDbTableName(dbMapping)).append(" UPDATE ");  
  29.     }  
  30.   
  31.     List<Map<String, ?>> values = new ArrayList<>();  
  32.     boolean hasMatched = false;  
  33.     for (String srcColumnName : old.keySet()) {  
  34.         List targetColumnNames = new ArrayList<>();  
  35.         columnsMap.forEach((targetColumn, srcColumn) -> {  
  36.             if (srcColumnName.equalsIgnoreCase(srcColumn)) {  
  37.                 targetColumnNames.add(targetColumn);  
  38.             }  
  39.         });  
  40.         if (!targetColumnNames.isEmpty()) {  
  41.             hasMatched = true;  
  42.             for (String targetColumnName : targetColumnNames) {  
  43.                 updateSql.append(targetColumnName).append("=?, ");  
  44.                 Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());  
  45.                 if (type == null) {  
  46.                     throw new RuntimeException("Target column: " + targetColumnName + " not matched");  
  47.                 }  
  48.                 BatchExecutor.setValue(values, type, data.get(srcColumnName));  
  49.             }  
  50.         }  
  51.     }  
  52.     if (!hasMatched) {  
  53.         logger.warn("Did not matched any columns to update ");  
  54.         return;  
  55.     }  
  56.     int len = updateSql.length();  
  57.     updateSql.delete(len - 2, len).append(" WHERE ");  
  58.   
  59.     // 拼接主键  
  60.     if(dbMapping.getMirrorDb()==false){  
  61.         appendCondition(dbMapping, updateSql, ctype, values, data, old);  
  62.     }else{  
  63.         String id = data.get(Util.cleanColumn("id")).toString();  
  64.         updateSql.append(" id = '").append(id).append("'");  
  65.     }  
  66.   
  67.     batchExecutor.execute(updateSql.toString(), values);  
  68.     if (logger.isTraceEnabled()) {  
  69.         logger.trace("Update target table, sql: {}", updateSql);  
  70.     }  
  71. }  
  72.   
  73. /** 
  74.  * 删除操作 
  75.  * 
  76.  * @param config 
  77.  * @param dml 
  78.  */  
  79. private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {  
  80.     Map<String, Object> data = dml.getData();  
  81.     if (data == null || data.isEmpty()) {  
  82.         return;  
  83.     }  
  84.   
  85.     DbMapping dbMapping = config.getDbMapping();  
  86.   
  87.     Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);  
  88.   
  89.     StringBuilder sql = new StringBuilder();  
  90.     if (config.getOnCluster() && !config.getClusterName().isEmpty()) {  
  91.         sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" DELETE WHERE ");  
  92.     }else{  
  93.         sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" DELETE WHERE ");  
  94.     }  
  95.   
  96.     List<Map<String, ?>> values = new ArrayList<>();  
  97.   
  98.     // 拼接主键  
  99.     if(dbMapping.getMirrorDb()==false){  
  100.         appendCondition(dbMapping, sql, ctype, values, data);  
  101.     }else{  
  102.         String id = data.get(Util.cleanColumn("id")).toString();  
  103.         sql.append(" id = '").append(id).append("'");  
  104.     }  
  105.   
  106.     batchExecutor.execute(sql.toString(), values);  
  107.     if (logger.isTraceEnabled()) {  
  108.         logger.trace("Delete from target table, sql: {}", sql);  
  109.     }  
  110. }  
  111.   
  112. /** 
  113.  * truncate操作 
  114.  * 
  115.  * @param config 
  116.  */  
  117. private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {  
  118.     DbMapping dbMapping = config.getDbMapping();  
  119.     StringBuilder sql = new StringBuilder();  
  120.     if (config.getOnCluster() && !config.getClusterName().isEmpty()) {  
  121.         sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName());  
  122.     }else{  
  123.         sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));  
  124.     }  
  125.   
  126.     batchExecutor.execute(sql.toString(), new ArrayList<>());  
  127.     if (logger.isTraceEnabled()) {  
  128.         logger.trace("Truncate target table, sql: {}", sql);  
  129.     }  
  130. }  

大家注意到没有append(" id = '").append(id).append("'");,删除和更新就是根id来的,也就是mysql表和clickhouse表中必须有id字段,最好是主键。mysqlbinlog有个特点,只要有数据变动,表中所有字段都会被记录下来。

7,修改RdbMirrorDbSyncService.java

  1. /** 
  2.  * 初始化表配置 
  3.  * 
  4.  * @param key 配置key: destination.database.table 
  5.  * @param baseConfigMap db sync config 
  6.  * @param dml DML 
  7.  */  
  8. private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {  
  9.     MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);  
  10.     if (mappingConfig == null) {  
  11.         // 构造表配置  
  12.         mappingConfig = new MappingConfig();  
  13.         mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());  
  14.         mappingConfig.setDestination(baseConfigMap.getDestination());  
  15.         mappingConfig.setGroupId(baseConfigMap.getGroupId());  
  16.         mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());  
  17.         mappingConfig.setConcurrent(baseConfigMap.getConcurrent());  
  18.         mappingConfig.setClusterName(baseConfigMap.getClusterName());    //设置自定义配置  
  19.         mappingConfig.setOnCluster(baseConfigMap.getOnCluster());        //设置自定义配置  
  20.         MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();  
  21.         mappingConfig.setDbMapping(dbMapping);  
  22.         dbMapping.setDatabase(dml.getDatabase());  
  23.         dbMapping.setTable(dml.getTable());  
  24.         dbMapping.setTargetDb(dml.getDatabase());  
  25.         dbMapping.setTargetTable(dml.getTable());  
  26.         dbMapping.setMirrorDb(baseConfigMap.getDbMapping().getMirrorDb());  //设置MirrorDb自定义配置  
  27.         dbMapping.setMapAll(true);  
  28.         List pkNames = dml.getPkNames();  
  29.         Map<String, String> pkMapping = new LinkedHashMap<>();  
  30.         pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));  
  31.         dbMapping.setTargetPk(pkMapping);  
  32.   
  33.         mirrorDbConfig.getTableConfig().put(key, mappingConfig);  
  34.     }  
  35. }  

注释的地方都是新增的,从代码可以看出,canal1.1.4 adapter中的MirrorDb配置是无效的,因为根本没有读取。

  1.     /** 
  2.      * 批量同步方法 
  3.      * 
  4.      * @param dmls 批量 DML 
  5.      */  
  6.     public void sync(List dmls) {  
  7.         List dmlList = new ArrayList<>();  
  8.         for (Dml dml : dmls) {  
  9.             String destination = StringUtils.trimToEmpty(dml.getDestination());  
  10.             String database = dml.getDatabase();  
  11.             MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);  
  12.             if (mirrorDbConfig == null) {  
  13.                 continue;  
  14.             }  
  15.             if (mirrorDbConfig.getMappingConfig() == null) {  
  16.                 continue;  
  17.             }  
  18.             if (dml.getGroupId() != null && StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {  
  19.                 if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {  
  20.                     continue// 如果groupId不匹配则过滤  
  21.                 }  
  22.             }  
  23.   
  24. //            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {  
  25. //                // DDL  
  26. //                if (logger.isDebugEnabled()) {  
  27. //                    logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));  
  28. //                }  
  29. //                executeDdl(mirrorDbConfig, dml);  
  30. //                rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());  
  31. //                mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置  
  32. //            } else {  
  33.                 // DML  
  34.                 initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);  
  35.                 dmlList.add(dml);  
  36. //            }  
  37.         }  

这个方法里面部分代码被我注释了,truncate是dml操作,却被认为成了ddl操作,这是个bug。

8,rdb MappingConfig.java添加以下内容

  1. private String    clusterName;    // 集群名称  
  2.   
  3. private boolean   onCluster = false; // 是否使用集群  
  4.   
  5. public String getClusterName() {  
  6.     return clusterName;  
  7. }  
  8.   
  9. public void setClusterName(String clusterName) {  
  10.     this.clusterName = clusterName;  
  11. }  
  12.   
  13. public boolean getOnCluster() {  
  14.     return onCluster;  
  15. }  
  16.   
  17. public void setOnCluster(boolean onCluster) {  
  18.     this.onCluster = onCluster;  
  19. }  

到这儿代码层面的都结束了

9,debug client-adapter

先打包rdb,命令行也行。右键rdb项目,run maven》install也行

调试的入口,launcher项目的CanalAdapterApplication.java

canal debug

canal debug

10,client-adapter打包

调试通过后执行

  1. # mvn package -DskipTests //client-adapter项目的根目录打包  
  2.   
  3. =======================================省略====================================  
  4. [INFO] Copying files to /Users/zhangying/Documents/www/canal/client-adapter/launcher/target/canal-adapter  
  5. [INFO] ------------------------------------------------------------------------  
  6. [INFO] Reactor Summary for canal client adapter module for otter 1.1.4 1.1.4:  
  7. [INFO]  
  8. [INFO] canal client adapter module for otter 1.1.4 ........ SUCCESS [ 0.009 s]  
  9. [INFO] canal client adapter common module for otter 1.1.4 . SUCCESS [ 3.874 s]  
  10. [INFO] canal client adapter logger example module for otter 1.1.4 SUCCESS [ 1.255 s]  
  11. [INFO] canal client adapter hbase module for otter 1.1.4 .. SUCCESS [ 13.855 s]  
  12. [INFO] canal client adapter elasticsearch module for otter 1.1.4 SUCCESS [ 11.110 s]  
  13. [INFO] canal client adapter rdb module for otter 1.1.4 .... SUCCESS [ 0.178 s]  
  14. [INFO] canal client adapter launcher module for otter 1.1.4 SUCCESS [ 7.947 s]  
  15. [INFO] ------------------------------------------------------------------------  
  16. [INFO] BUILD SUCCESS  
  17. [INFO] ------------------------------------------------------------------------  
  18. [INFO] Total time: 38.554 s  
  19. [INFO] Finished at: 2021-09-18T16:27:55+08:00  
  20. [INFO] ------------------------------------------------------------------------  

将打包好的代码传到服务端以后,一定要修改一下目录权限,不然起不来

  1. chmod 777 -R plugin  
  2. chmod 777 -R logs  
  3. chmod 777 -R conf  

11,原来canal1.1.4是不支持,全库同步的update,delete

RdbSyncService.java文件中,有以下代码

  1. /** 
  2.  * 拼接主键 where条件 
  3.  */  
  4. private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,  
  5.                              List<Map<String, ?>> values, Map<String, Object> d) {  
  6.     appendCondition(dbMapping, sql, ctype, values, d, null);  
  7. }  
  8.   
  9. private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,  
  10.                              List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {  
  11.     // 拼接主键  
  12.     for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {  
  13.         String targetColumnName = entry.getKey();  
  14.         String srcColumnName = entry.getValue();  
  15.         if (srcColumnName == null) {  
  16.             srcColumnName = Util.cleanColumn(targetColumnName);  
  17.         }  
  18.         sql.append(targetColumnName).append("=? AND ");  
  19.         Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());  
  20.         if (type == null) {  
  21.             throw new RuntimeException("Target column: " + targetColumnName + " not matched");  
  22.         }  

全库同步时根本没有targePk,也就是说

for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {

全库同步时,根本走不到这个for循环里面去,导致报错。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/clickhouse/2537.html

1 条评论

  1. Michael 留言

    非常感谢您的这篇文章!写的非常详细,解决了我在使用 canal 中遇到的很多问题。
    还有一点拙见:truncate 应该是 DDL 而非 DML,这个应该不用改动。
    以及 RdbtEtlService 中的 110 行:
    "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
    是否也应改成 clickhouse 的 alter table ... delete where 语法为好?
    再次感谢您的文章!