clickhouse中的表基本上都ReplicatedMergeTree+Distributed。而canal只支持单表(从源码中也可以看出),所以做了canal二次开发以实现insert update delete truncate功能。单表和整库都支持。
本文所介绍的canal二次开发,只提供思路,并不能照抄代码。
1,下载canal
git clone https://github.com/alibaba/canal.git git checkout canal-1.1.4 git checkout -b canal-1.1.4 canal-1.1.4
这里为什么要选择canal1.1.4分支来做开发呢。
1.1.5这个包找不到Cannot resolve com.alibaba.otter:connector.tcp:无法跑起来
2,导入项目
导入时选择maven项目,只需要导入client-adapter就行了,服务端并不改动就别导入了,不然要下载很多依赖
3,修改launcher项目下的pom.xml
<!--clickhouse start-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1</version>
</dependency>
<!--clickhouse end-->
连接clickhouse所需要的依赖
4,修改launcher项目下resources/application.yml,
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
canalServerHost: 10.0.54.5:11111 //canal 服务端IP
syncBatchSize: 1000
batchSize: 500
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://10.0.40.200:3306/tanktest?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
username: canal
password: Canal-123
canalAdapters:
- instance: tanktest
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
jdbc.url: jdbc:clickhouse://10.0.54.5:8123/tanktest
jdbc.username: default
jdbc.password: test1234
5,修改rdb项目下resources/rdb/mytest_user.yml,文件名可以自定义
dataSourceKey: defaultDS destination: tanktest groupId: g1 outerAdapterKey: mysql1 onCluster: true //这是我自定义的 clusterName: clickhouse_test_netjoy //这是我自定义的 concurrent: true dbMapping: mirrorDb: true database: tanktest
6,修改RdbSyncService.java
/**
* 更新操作
*
* @param config 配置项
* @param dml DML数据
*/
private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
Map<String, Object> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
Map<String, Object> old = dml.getOld();
if (old == null || old.isEmpty()) {
return;
}
DbMapping dbMapping = config.getDbMapping();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
StringBuilder updateSql = new StringBuilder();
if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
updateSql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" UPDATE ");
}else{
updateSql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" UPDATE ");
}
List<Map<String, ?>> values = new ArrayList<>();
boolean hasMatched = false;
for (String srcColumnName : old.keySet()) {
List targetColumnNames = new ArrayList<>();
columnsMap.forEach((targetColumn, srcColumn) -> {
if (srcColumnName.equalsIgnoreCase(srcColumn)) {
targetColumnNames.add(targetColumn);
}
});
if (!targetColumnNames.isEmpty()) {
hasMatched = true;
for (String targetColumnName : targetColumnNames) {
updateSql.append(targetColumnName).append("=?, ");
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
BatchExecutor.setValue(values, type, data.get(srcColumnName));
}
}
}
if (!hasMatched) {
logger.warn("Did not matched any columns to update ");
return;
}
int len = updateSql.length();
updateSql.delete(len - 2, len).append(" WHERE ");
// 拼接主键
if(dbMapping.getMirrorDb()==false){
appendCondition(dbMapping, updateSql, ctype, values, data, old);
}else{
String id = data.get(Util.cleanColumn("id")).toString();
updateSql.append(" id = '").append(id).append("'");
}
batchExecutor.execute(updateSql.toString(), values);
if (logger.isTraceEnabled()) {
logger.trace("Update target table, sql: {}", updateSql);
}
}
/**
* 删除操作
*
* @param config
* @param dml
*/
private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
Map<String, Object> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
DbMapping dbMapping = config.getDbMapping();
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
StringBuilder sql = new StringBuilder();
if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" DELETE WHERE ");
}else{
sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" DELETE WHERE ");
}
List<Map<String, ?>> values = new ArrayList<>();
// 拼接主键
if(dbMapping.getMirrorDb()==false){
appendCondition(dbMapping, sql, ctype, values, data);
}else{
String id = data.get(Util.cleanColumn("id")).toString();
sql.append(" id = '").append(id).append("'");
}
batchExecutor.execute(sql.toString(), values);
if (logger.isTraceEnabled()) {
logger.trace("Delete from target table, sql: {}", sql);
}
}
/**
* truncate操作
*
* @param config
*/
private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
DbMapping dbMapping = config.getDbMapping();
StringBuilder sql = new StringBuilder();
if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName());
}else{
sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
}
batchExecutor.execute(sql.toString(), new ArrayList<>());
if (logger.isTraceEnabled()) {
logger.trace("Truncate target table, sql: {}", sql);
}
}
大家注意到没有append(" id = '").append(id).append("'");,删除和更新就是根id来的,也就是mysql表和clickhouse表中必须有id字段,最好是主键。mysqlbinlog有个特点,只要有数据变动,表中所有字段都会被记录下来。
7,修改RdbMirrorDbSyncService.java
/**
* 初始化表配置
*
* @param key 配置key: destination.database.table
* @param baseConfigMap db sync config
* @param dml DML
*/
private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {
MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);
if (mappingConfig == null) {
// 构造表配置
mappingConfig = new MappingConfig();
mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
mappingConfig.setDestination(baseConfigMap.getDestination());
mappingConfig.setGroupId(baseConfigMap.getGroupId());
mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
mappingConfig.setClusterName(baseConfigMap.getClusterName()); //设置自定义配置
mappingConfig.setOnCluster(baseConfigMap.getOnCluster()); //设置自定义配置
MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
mappingConfig.setDbMapping(dbMapping);
dbMapping.setDatabase(dml.getDatabase());
dbMapping.setTable(dml.getTable());
dbMapping.setTargetDb(dml.getDatabase());
dbMapping.setTargetTable(dml.getTable());
dbMapping.setMirrorDb(baseConfigMap.getDbMapping().getMirrorDb()); //设置MirrorDb自定义配置
dbMapping.setMapAll(true);
List pkNames = dml.getPkNames();
Map<String, String> pkMapping = new LinkedHashMap<>();
pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
dbMapping.setTargetPk(pkMapping);
mirrorDbConfig.getTableConfig().put(key, mappingConfig);
}
}
注释的地方都是新增的,从代码可以看出,canal1.1.4 adapter中的MirrorDb配置是无效的,因为根本没有读取。
/**
* 批量同步方法
*
* @param dmls 批量 DML
*/
public void sync(List dmls) {
List dmlList = new ArrayList<>();
for (Dml dml : dmls) {
String destination = StringUtils.trimToEmpty(dml.getDestination());
String database = dml.getDatabase();
MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
if (mirrorDbConfig == null) {
continue;
}
if (mirrorDbConfig.getMappingConfig() == null) {
continue;
}
if (dml.getGroupId() != null && StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {
if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {
continue; // 如果groupId不匹配则过滤
}
}
// if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
// // DDL
// if (logger.isDebugEnabled()) {
// logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
// }
// executeDdl(mirrorDbConfig, dml);
// rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
// mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
// } else {
// DML
initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
dmlList.add(dml);
// }
}
这个方法里面部分代码被我注释了,truncate是dml操作,却被认为成了ddl操作,这是个bug。
8,rdb MappingConfig.java添加以下内容
private String clusterName; // 集群名称
private boolean onCluster = false; // 是否使用集群
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public boolean getOnCluster() {
return onCluster;
}
public void setOnCluster(boolean onCluster) {
this.onCluster = onCluster;
}
到这儿代码层面的都结束了
9,debug client-adapter
先打包rdb,命令行也行。右键rdb项目,run maven》install也行
调试的入口,launcher项目的CanalAdapterApplication.java
10,client-adapter打包
调试通过后执行
# mvn package -DskipTests //client-adapter项目的根目录打包 =======================================省略==================================== [INFO] Copying files to /Users/zhangying/Documents/www/canal/client-adapter/launcher/target/canal-adapter [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary for canal client adapter module for otter 1.1.4 1.1.4: [INFO] [INFO] canal client adapter module for otter 1.1.4 ........ SUCCESS [ 0.009 s] [INFO] canal client adapter common module for otter 1.1.4 . SUCCESS [ 3.874 s] [INFO] canal client adapter logger example module for otter 1.1.4 SUCCESS [ 1.255 s] [INFO] canal client adapter hbase module for otter 1.1.4 .. SUCCESS [ 13.855 s] [INFO] canal client adapter elasticsearch module for otter 1.1.4 SUCCESS [ 11.110 s] [INFO] canal client adapter rdb module for otter 1.1.4 .... SUCCESS [ 0.178 s] [INFO] canal client adapter launcher module for otter 1.1.4 SUCCESS [ 7.947 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 38.554 s [INFO] Finished at: 2021-09-18T16:27:55+08:00 [INFO] ------------------------------------------------------------------------
将打包好的代码传到服务端以后,一定要修改一下目录权限,不然起不来
chmod 777 -R plugin chmod 777 -R logs chmod 777 -R conf
11,原来canal1.1.4是不支持,全库同步的update,delete
RdbSyncService.java文件中,有以下代码
/**
* 拼接主键 where条件
*/
private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
List<Map<String, ?>> values, Map<String, Object> d) {
appendCondition(dbMapping, sql, ctype, values, d, null);
}
private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
// 拼接主键
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
String targetColumnName = entry.getKey();
String srcColumnName = entry.getValue();
if (srcColumnName == null) {
srcColumnName = Util.cleanColumn(targetColumnName);
}
sql.append(targetColumnName).append("=? AND ");
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
全库同步时根本没有targePk,也就是说
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
全库同步时,根本走不到这个for循环里面去,导致报错。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/clickhouse/2537.html/comment-page-1

非常感谢您的这篇文章!写的非常详细,解决了我在使用 canal 中遇到的很多问题。
还有一点拙见:truncate 应该是 DDL 而非 DML,这个应该不用改动。
以及 RdbtEtlService 中的 110 行:
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
是否也应改成 clickhouse 的 alter table ... delete where 语法为好?
再次感谢您的文章!