clickhouse中的表基本上都ReplicatedMergeTree+Distributed。而canal只支持单表(从源码中也可以看出),所以做了canal二次开发以实现insert update delete truncate功能。单表和整库都支持。
本文所介绍的canal二次开发,只提供思路,并不能照抄代码。
1,下载canal
- git clone https://github.com/alibaba/canal.git
- git checkout canal-1.1.4
- git checkout -b canal-1.1.4 canal-1.1.4
这里为什么要选择canal1.1.4分支来做开发呢。
1.1.5这个包找不到Cannot resolve com.alibaba.otter:connector.tcp:无法跑起来
2,导入项目
导入时选择maven项目,只需要导入client-adapter就行了,服务端并不改动就别导入了,不然要下载很多依赖
3,修改launcher项目下的pom.xml
- <!--clickhouse start-->
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.13</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- <version>4.4.13</version>
- </dependency>
- <dependency>
- <groupId>org.lz4</groupId>
- <artifactId>lz4-java</artifactId>
- <version>1.8.0</version>
- </dependency>
- <dependency>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- <version>1.1.2</version>
- </dependency>
- <dependency>
- <groupId>ru.yandex.clickhouse</groupId>
- <artifactId>clickhouse-jdbc</artifactId>
- <version>0.3.1</version>
- </dependency>
- <!--clickhouse end-->
连接clickhouse所需要的依赖
4,修改launcher项目下resources/application.yml,
- server:
- port: 8081
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
- default-property-inclusion: non_null
- canal.conf:
- mode: tcp #tcp kafka rocketMQ rabbitMQ
- canalServerHost: 10.0.54.5:11111 //canal 服务端IP
- syncBatchSize: 1000
- batchSize: 500
- retries: 0
- timeout:
- accessKey:
- secretKey:
- srcDataSources:
- defaultDS:
- url: jdbc:mysql://10.0.40.200:3306/tanktest?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
- username: canal
- password: Canal-123
- canalAdapters:
- - instance: tanktest
- groups:
- - groupId: g1
- outerAdapters:
- - name: logger
- - name: rdb
- key: mysql1
- properties:
- jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
- jdbc.url: jdbc:clickhouse://10.0.54.5:8123/tanktest
- jdbc.username: default
- jdbc.password: test1234
5,修改rdb项目下resources/rdb/mytest_user.yml,文件名可以自定义
- dataSourceKey: defaultDS
- destination: tanktest
- groupId: g1
- outerAdapterKey: mysql1
- onCluster: true //这是我自定义的
- clusterName: clickhouse_test_netjoy //这是我自定义的
- concurrent: true
- dbMapping:
- mirrorDb: true
- database: tanktest
6,修改RdbSyncService.java
- /**
- * 更新操作
- *
- * @param config 配置项
- * @param dml DML数据
- */
- private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
- Map<String, Object> data = dml.getData();
- if (data == null || data.isEmpty()) {
- return;
- }
- Map<String, Object> old = dml.getOld();
- if (old == null || old.isEmpty()) {
- return;
- }
- DbMapping dbMapping = config.getDbMapping();
- Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
- Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
- StringBuilder updateSql = new StringBuilder();
- if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
- updateSql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" UPDATE ");
- }else{
- updateSql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" UPDATE ");
- }
- List<Map<String, ?>> values = new ArrayList<>();
- boolean hasMatched = false;
- for (String srcColumnName : old.keySet()) {
- List targetColumnNames = new ArrayList<>();
- columnsMap.forEach((targetColumn, srcColumn) -> {
- if (srcColumnName.equalsIgnoreCase(srcColumn)) {
- targetColumnNames.add(targetColumn);
- }
- });
- if (!targetColumnNames.isEmpty()) {
- hasMatched = true;
- for (String targetColumnName : targetColumnNames) {
- updateSql.append(targetColumnName).append("=?, ");
- Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
- if (type == null) {
- throw new RuntimeException("Target column: " + targetColumnName + " not matched");
- }
- BatchExecutor.setValue(values, type, data.get(srcColumnName));
- }
- }
- }
- if (!hasMatched) {
- logger.warn("Did not matched any columns to update ");
- return;
- }
- int len = updateSql.length();
- updateSql.delete(len - 2, len).append(" WHERE ");
- // 拼接主键
- if(dbMapping.getMirrorDb()==false){
- appendCondition(dbMapping, updateSql, ctype, values, data, old);
- }else{
- String id = data.get(Util.cleanColumn("id")).toString();
- updateSql.append(" id = '").append(id).append("'");
- }
- batchExecutor.execute(updateSql.toString(), values);
- if (logger.isTraceEnabled()) {
- logger.trace("Update target table, sql: {}", updateSql);
- }
- }
- /**
- * 删除操作
- *
- * @param config
- * @param dml
- */
- private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
- Map<String, Object> data = dml.getData();
- if (data == null || data.isEmpty()) {
- return;
- }
- DbMapping dbMapping = config.getDbMapping();
- Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
- StringBuilder sql = new StringBuilder();
- if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
- sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" DELETE WHERE ");
- }else{
- sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" DELETE WHERE ");
- }
- List<Map<String, ?>> values = new ArrayList<>();
- // 拼接主键
- if(dbMapping.getMirrorDb()==false){
- appendCondition(dbMapping, sql, ctype, values, data);
- }else{
- String id = data.get(Util.cleanColumn("id")).toString();
- sql.append(" id = '").append(id).append("'");
- }
- batchExecutor.execute(sql.toString(), values);
- if (logger.isTraceEnabled()) {
- logger.trace("Delete from target table, sql: {}", sql);
- }
- }
- /**
- * truncate操作
- *
- * @param config
- */
- private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
- DbMapping dbMapping = config.getDbMapping();
- StringBuilder sql = new StringBuilder();
- if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
- sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName());
- }else{
- sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
- }
- batchExecutor.execute(sql.toString(), new ArrayList<>());
- if (logger.isTraceEnabled()) {
- logger.trace("Truncate target table, sql: {}", sql);
- }
- }
大家注意到没有append(" id = '").append(id).append("'");,删除和更新就是根id来的,也就是mysql表和clickhouse表中必须有id字段,最好是主键。mysqlbinlog有个特点,只要有数据变动,表中所有字段都会被记录下来。
7,修改RdbMirrorDbSyncService.java
- /**
- * 初始化表配置
- *
- * @param key 配置key: destination.database.table
- * @param baseConfigMap db sync config
- * @param dml DML
- */
- private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {
- MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);
- if (mappingConfig == null) {
- // 构造表配置
- mappingConfig = new MappingConfig();
- mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
- mappingConfig.setDestination(baseConfigMap.getDestination());
- mappingConfig.setGroupId(baseConfigMap.getGroupId());
- mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
- mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
- mappingConfig.setClusterName(baseConfigMap.getClusterName()); //设置自定义配置
- mappingConfig.setOnCluster(baseConfigMap.getOnCluster()); //设置自定义配置
- MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
- mappingConfig.setDbMapping(dbMapping);
- dbMapping.setDatabase(dml.getDatabase());
- dbMapping.setTable(dml.getTable());
- dbMapping.setTargetDb(dml.getDatabase());
- dbMapping.setTargetTable(dml.getTable());
- dbMapping.setMirrorDb(baseConfigMap.getDbMapping().getMirrorDb()); //设置MirrorDb自定义配置
- dbMapping.setMapAll(true);
- List pkNames = dml.getPkNames();
- Map<String, String> pkMapping = new LinkedHashMap<>();
- pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
- dbMapping.setTargetPk(pkMapping);
- mirrorDbConfig.getTableConfig().put(key, mappingConfig);
- }
- }
注释的地方都是新增的,从代码可以看出,canal1.1.4 adapter中的MirrorDb配置是无效的,因为根本没有读取。
- /**
- * 批量同步方法
- *
- * @param dmls 批量 DML
- */
- public void sync(List dmls) {
- List dmlList = new ArrayList<>();
- for (Dml dml : dmls) {
- String destination = StringUtils.trimToEmpty(dml.getDestination());
- String database = dml.getDatabase();
- MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
- if (mirrorDbConfig == null) {
- continue;
- }
- if (mirrorDbConfig.getMappingConfig() == null) {
- continue;
- }
- if (dml.getGroupId() != null && StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {
- if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {
- continue; // 如果groupId不匹配则过滤
- }
- }
- // if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
- // // DDL
- // if (logger.isDebugEnabled()) {
- // logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
- // }
- // executeDdl(mirrorDbConfig, dml);
- // rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
- // mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
- // } else {
- // DML
- initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
- dmlList.add(dml);
- // }
- }
这个方法里面部分代码被我注释了,truncate是dml操作,却被认为成了ddl操作,这是个bug。
8,rdb MappingConfig.java添加以下内容
- private String clusterName; // 集群名称
- private boolean onCluster = false; // 是否使用集群
- public String getClusterName() {
- return clusterName;
- }
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
- public boolean getOnCluster() {
- return onCluster;
- }
- public void setOnCluster(boolean onCluster) {
- this.onCluster = onCluster;
- }
到这儿代码层面的都结束了
9,debug client-adapter
先打包rdb,命令行也行。右键rdb项目,run maven》install也行
调试的入口,launcher项目的CanalAdapterApplication.java
10,client-adapter打包
调试通过后执行
- # mvn package -DskipTests //client-adapter项目的根目录打包
- =======================================省略====================================
- [INFO] Copying files to /Users/zhangying/Documents/www/canal/client-adapter/launcher/target/canal-adapter
- [INFO] ------------------------------------------------------------------------
- [INFO] Reactor Summary for canal client adapter module for otter 1.1.4 1.1.4:
- [INFO]
- [INFO] canal client adapter module for otter 1.1.4 ........ SUCCESS [ 0.009 s]
- [INFO] canal client adapter common module for otter 1.1.4 . SUCCESS [ 3.874 s]
- [INFO] canal client adapter logger example module for otter 1.1.4 SUCCESS [ 1.255 s]
- [INFO] canal client adapter hbase module for otter 1.1.4 .. SUCCESS [ 13.855 s]
- [INFO] canal client adapter elasticsearch module for otter 1.1.4 SUCCESS [ 11.110 s]
- [INFO] canal client adapter rdb module for otter 1.1.4 .... SUCCESS [ 0.178 s]
- [INFO] canal client adapter launcher module for otter 1.1.4 SUCCESS [ 7.947 s]
- [INFO] ------------------------------------------------------------------------
- [INFO] BUILD SUCCESS
- [INFO] ------------------------------------------------------------------------
- [INFO] Total time: 38.554 s
- [INFO] Finished at: 2021-09-18T16:27:55+08:00
- [INFO] ------------------------------------------------------------------------
将打包好的代码传到服务端以后,一定要修改一下目录权限,不然起不来
- chmod 777 -R plugin
- chmod 777 -R logs
- chmod 777 -R conf
11,原来canal1.1.4是不支持,全库同步的update,delete
RdbSyncService.java文件中,有以下代码
- /**
- * 拼接主键 where条件
- */
- private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
- List<Map<String, ?>> values, Map<String, Object> d) {
- appendCondition(dbMapping, sql, ctype, values, d, null);
- }
- private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
- List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
- // 拼接主键
- for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
- String targetColumnName = entry.getKey();
- String srcColumnName = entry.getValue();
- if (srcColumnName == null) {
- srcColumnName = Util.cleanColumn(targetColumnName);
- }
- sql.append(targetColumnName).append("=? AND ");
- Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
- if (type == null) {
- throw new RuntimeException("Target column: " + targetColumnName + " not matched");
- }
全库同步时根本没有targePk,也就是说
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
全库同步时,根本走不到这个for循环里面去,导致报错。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/clickhouse/2537.html
非常感谢您的这篇文章!写的非常详细,解决了我在使用 canal 中遇到的很多问题。
还有一点拙见:truncate 应该是 DDL 而非 DML,这个应该不用改动。
以及 RdbtEtlService 中的 110 行:
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
是否也应改成 clickhouse 的 alter table ... delete where 语法为好?
再次感谢您的文章!