clickhouse中的表基本上都ReplicatedMergeTree+Distributed。而canal只支持单表(从源码中也可以看出),所以做了canal二次开发以实现insert update delete truncate功能。单表和整库都支持。
本文所介绍的canal二次开发,只提供思路,并不能照抄代码。
1,下载canal
git clone https://github.com/alibaba/canal.git git checkout canal-1.1.4 git checkout -b canal-1.1.4 canal-1.1.4
这里为什么要选择canal1.1.4分支来做开发呢。
1.1.5这个包找不到Cannot resolve com.alibaba.otter:connector.tcp:无法跑起来
2,导入项目
导入时选择maven项目,只需要导入client-adapter就行了,服务端并不改动就别导入了,不然要下载很多依赖
3,修改launcher项目下的pom.xml
<!--clickhouse start--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.13</version> </dependency> <dependency> <groupId>org.lz4</groupId> <artifactId>lz4-java</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.3.1</version> </dependency> <!--clickhouse end-->
连接clickhouse所需要的依赖
4,修改launcher项目下resources/application.yml,
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ canalServerHost: 10.0.54.5:11111 //canal 服务端IP syncBatchSize: 1000 batchSize: 500 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://10.0.40.200:3306/tanktest?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false username: canal password: Canal-123 canalAdapters: - instance: tanktest groups: - groupId: g1 outerAdapters: - name: logger - name: rdb key: mysql1 properties: jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver jdbc.url: jdbc:clickhouse://10.0.54.5:8123/tanktest jdbc.username: default jdbc.password: test1234
5,修改rdb项目下resources/rdb/mytest_user.yml,文件名可以自定义
dataSourceKey: defaultDS destination: tanktest groupId: g1 outerAdapterKey: mysql1 onCluster: true //这是我自定义的 clusterName: clickhouse_test_netjoy //这是我自定义的 concurrent: true dbMapping: mirrorDb: true database: tanktest
6,修改RdbSyncService.java
/** * 更新操作 * * @param config 配置项 * @param dml DML数据 */ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException { Map<String, Object> data = dml.getData(); if (data == null || data.isEmpty()) { return; } Map<String, Object> old = dml.getOld(); if (old == null || old.isEmpty()) { return; } DbMapping dbMapping = config.getDbMapping(); Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data); Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config); StringBuilder updateSql = new StringBuilder(); if (config.getOnCluster() && !config.getClusterName().isEmpty()) { updateSql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" UPDATE "); }else{ updateSql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" UPDATE "); } List<Map<String, ?>> values = new ArrayList<>(); boolean hasMatched = false; for (String srcColumnName : old.keySet()) { List targetColumnNames = new ArrayList<>(); columnsMap.forEach((targetColumn, srcColumn) -> { if (srcColumnName.equalsIgnoreCase(srcColumn)) { targetColumnNames.add(targetColumn); } }); if (!targetColumnNames.isEmpty()) { hasMatched = true; for (String targetColumnName : targetColumnNames) { updateSql.append(targetColumnName).append("=?, "); Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase()); if (type == null) { throw new RuntimeException("Target column: " + targetColumnName + " not matched"); } BatchExecutor.setValue(values, type, data.get(srcColumnName)); } } } if (!hasMatched) { logger.warn("Did not matched any columns to update "); return; } int len = updateSql.length(); updateSql.delete(len - 2, len).append(" WHERE "); // 拼接主键 if(dbMapping.getMirrorDb()==false){ appendCondition(dbMapping, updateSql, ctype, values, data, old); }else{ String id = data.get(Util.cleanColumn("id")).toString(); updateSql.append(" id = '").append(id).append("'"); } batchExecutor.execute(updateSql.toString(), values); if (logger.isTraceEnabled()) { logger.trace("Update target table, sql: {}", updateSql); } } /** * 删除操作 * * @param config * @param dml */ private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException { Map<String, Object> data = dml.getData(); if (data == null || data.isEmpty()) { return; } DbMapping dbMapping = config.getDbMapping(); Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config); StringBuilder sql = new StringBuilder(); if (config.getOnCluster() && !config.getClusterName().isEmpty()) { sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" DELETE WHERE "); }else{ sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" DELETE WHERE "); } List<Map<String, ?>> values = new ArrayList<>(); // 拼接主键 if(dbMapping.getMirrorDb()==false){ appendCondition(dbMapping, sql, ctype, values, data); }else{ String id = data.get(Util.cleanColumn("id")).toString(); sql.append(" id = '").append(id).append("'"); } batchExecutor.execute(sql.toString(), values); if (logger.isTraceEnabled()) { logger.trace("Delete from target table, sql: {}", sql); } } /** * truncate操作 * * @param config */ private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException { DbMapping dbMapping = config.getDbMapping(); StringBuilder sql = new StringBuilder(); if (config.getOnCluster() && !config.getClusterName().isEmpty()) { sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()); }else{ sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping)); } batchExecutor.execute(sql.toString(), new ArrayList<>()); if (logger.isTraceEnabled()) { logger.trace("Truncate target table, sql: {}", sql); } }
大家注意到没有append(" id = '").append(id).append("'");,删除和更新就是根id来的,也就是mysql表和clickhouse表中必须有id字段,最好是主键。mysqlbinlog有个特点,只要有数据变动,表中所有字段都会被记录下来。
7,修改RdbMirrorDbSyncService.java
/** * 初始化表配置 * * @param key 配置key: destination.database.table * @param baseConfigMap db sync config * @param dml DML */ private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) { MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key); if (mappingConfig == null) { // 构造表配置 mappingConfig = new MappingConfig(); mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey()); mappingConfig.setDestination(baseConfigMap.getDestination()); mappingConfig.setGroupId(baseConfigMap.getGroupId()); mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey()); mappingConfig.setConcurrent(baseConfigMap.getConcurrent()); mappingConfig.setClusterName(baseConfigMap.getClusterName()); //设置自定义配置 mappingConfig.setOnCluster(baseConfigMap.getOnCluster()); //设置自定义配置 MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping(); mappingConfig.setDbMapping(dbMapping); dbMapping.setDatabase(dml.getDatabase()); dbMapping.setTable(dml.getTable()); dbMapping.setTargetDb(dml.getDatabase()); dbMapping.setTargetTable(dml.getTable()); dbMapping.setMirrorDb(baseConfigMap.getDbMapping().getMirrorDb()); //设置MirrorDb自定义配置 dbMapping.setMapAll(true); List pkNames = dml.getPkNames(); Map<String, String> pkMapping = new LinkedHashMap<>(); pkNames.forEach(pkName -> pkMapping.put(pkName, pkName)); dbMapping.setTargetPk(pkMapping); mirrorDbConfig.getTableConfig().put(key, mappingConfig); } }
注释的地方都是新增的,从代码可以看出,canal1.1.4 adapter中的MirrorDb配置是无效的,因为根本没有读取。
/** * 批量同步方法 * * @param dmls 批量 DML */ public void sync(List dmls) { List dmlList = new ArrayList<>(); for (Dml dml : dmls) { String destination = StringUtils.trimToEmpty(dml.getDestination()); String database = dml.getDatabase(); MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database); if (mirrorDbConfig == null) { continue; } if (mirrorDbConfig.getMappingConfig() == null) { continue; } if (dml.getGroupId() != null && StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) { if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) { continue; // 如果groupId不匹配则过滤 } } // if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) { // // DDL // if (logger.isDebugEnabled()) { // logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue)); // } // executeDdl(mirrorDbConfig, dml); // rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable()); // mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置 // } else { // DML initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml); dmlList.add(dml); // } }
这个方法里面部分代码被我注释了,truncate是dml操作,却被认为成了ddl操作,这是个bug。
8,rdb MappingConfig.java添加以下内容
private String clusterName; // 集群名称 private boolean onCluster = false; // 是否使用集群 public String getClusterName() { return clusterName; } public void setClusterName(String clusterName) { this.clusterName = clusterName; } public boolean getOnCluster() { return onCluster; } public void setOnCluster(boolean onCluster) { this.onCluster = onCluster; }
到这儿代码层面的都结束了
9,debug client-adapter
先打包rdb,命令行也行。右键rdb项目,run maven》install也行
调试的入口,launcher项目的CanalAdapterApplication.java
10,client-adapter打包
调试通过后执行
# mvn package -DskipTests //client-adapter项目的根目录打包 =======================================省略==================================== [INFO] Copying files to /Users/zhangying/Documents/www/canal/client-adapter/launcher/target/canal-adapter [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary for canal client adapter module for otter 1.1.4 1.1.4: [INFO] [INFO] canal client adapter module for otter 1.1.4 ........ SUCCESS [ 0.009 s] [INFO] canal client adapter common module for otter 1.1.4 . SUCCESS [ 3.874 s] [INFO] canal client adapter logger example module for otter 1.1.4 SUCCESS [ 1.255 s] [INFO] canal client adapter hbase module for otter 1.1.4 .. SUCCESS [ 13.855 s] [INFO] canal client adapter elasticsearch module for otter 1.1.4 SUCCESS [ 11.110 s] [INFO] canal client adapter rdb module for otter 1.1.4 .... SUCCESS [ 0.178 s] [INFO] canal client adapter launcher module for otter 1.1.4 SUCCESS [ 7.947 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 38.554 s [INFO] Finished at: 2021-09-18T16:27:55+08:00 [INFO] ------------------------------------------------------------------------
将打包好的代码传到服务端以后,一定要修改一下目录权限,不然起不来
chmod 777 -R plugin chmod 777 -R logs chmod 777 -R conf
11,原来canal1.1.4是不支持,全库同步的update,delete
RdbSyncService.java文件中,有以下代码
/** * 拼接主键 where条件 */ private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype, List<Map<String, ?>> values, Map<String, Object> d) { appendCondition(dbMapping, sql, ctype, values, d, null); } private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype, List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) { // 拼接主键 for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) { String targetColumnName = entry.getKey(); String srcColumnName = entry.getValue(); if (srcColumnName == null) { srcColumnName = Util.cleanColumn(targetColumnName); } sql.append(targetColumnName).append("=? AND "); Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase()); if (type == null) { throw new RuntimeException("Target column: " + targetColumnName + " not matched"); }
全库同步时根本没有targePk,也就是说
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
全库同步时,根本走不到这个for循环里面去,导致报错。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/clickhouse/2537.html/comment-page-1
非常感谢您的这篇文章!写的非常详细,解决了我在使用 canal 中遇到的很多问题。
还有一点拙见:truncate 应该是 DDL 而非 DML,这个应该不用改动。
以及 RdbtEtlService 中的 110 行:
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
是否也应改成 clickhouse 的 alter table ... delete where 语法为好?
再次感谢您的文章!