canal 同步mysql数据到clickhouse 支持update delete truncate

张映 发表于 2021-09-18

分类目录: clickhouse

标签:, , , ,

clickhouse中的表基本上都ReplicatedMergeTree+Distributed。而canal只支持单表(从源码中也可以看出),所以做了canal二次开发以实现insert update delete truncate功能。单表和整库都支持。

本文所介绍的canal二次开发,只提供思路,并不能照抄代码。

1,下载canal

git clone https://github.com/alibaba/canal.git
git checkout canal-1.1.4
git checkout -b canal-1.1.4 canal-1.1.4

这里为什么要选择canal1.1.4分支来做开发呢。

1.1.5这个包找不到Cannot resolve com.alibaba.otter:connector.tcp:无法跑起来

2,导入项目

导入时选择maven项目,只需要导入client-adapter就行了,服务端并不改动就别导入了,不然要下载很多依赖

3,修改launcher项目下的pom.xml

<!--clickhouse start-->
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.13</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpcore</artifactId>
    <version>4.4.13</version>
</dependency>
<dependency>
    <groupId>org.lz4</groupId>
    <artifactId>lz4-java</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>net.jpountz.lz4</groupId>
    <artifactId>lz4</artifactId>
    <version>1.1.2</version>
</dependency>
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.1</version>
</dependency>
<!--clickhouse end-->

连接clickhouse所需要的依赖

4,修改launcher项目下resources/application.yml,

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  canalServerHost: 10.0.54.5:11111   //canal 服务端IP
  syncBatchSize: 1000
  batchSize: 500
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://10.0.40.200:3306/tanktest?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
      username: canal
      password: Canal-123
  canalAdapters:
    - instance: tanktest
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: rdb
              key: mysql1
              properties:
                jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
                jdbc.url: jdbc:clickhouse://10.0.54.5:8123/tanktest
                jdbc.username: default
                jdbc.password: test1234

5,修改rdb项目下resources/rdb/mytest_user.yml,文件名可以自定义

dataSourceKey: defaultDS
destination: tanktest
groupId: g1
outerAdapterKey: mysql1
onCluster: true     //这是我自定义的
clusterName: clickhouse_test_netjoy  //这是我自定义的
concurrent: true
dbMapping:
  mirrorDb: true
  database: tanktest

6,修改RdbSyncService.java

/**
 * 更新操作
 *
 * @param config 配置项
 * @param dml DML数据
 */
private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
    Map<String, Object> data = dml.getData();
    if (data == null || data.isEmpty()) {
        return;
    }

    Map<String, Object> old = dml.getOld();
    if (old == null || old.isEmpty()) {
        return;
    }

    DbMapping dbMapping = config.getDbMapping();

    Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

    Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

    StringBuilder updateSql = new StringBuilder();
    if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
        updateSql.append("ALTER TABLE  ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" UPDATE ");
    }else{
        updateSql.append("ALTER TABLE  ").append(SyncUtil.getDbTableName(dbMapping)).append(" UPDATE ");
    }

    List<Map<String, ?>> values = new ArrayList<>();
    boolean hasMatched = false;
    for (String srcColumnName : old.keySet()) {
        List targetColumnNames = new ArrayList<>();
        columnsMap.forEach((targetColumn, srcColumn) -> {
            if (srcColumnName.equalsIgnoreCase(srcColumn)) {
                targetColumnNames.add(targetColumn);
            }
        });
        if (!targetColumnNames.isEmpty()) {
            hasMatched = true;
            for (String targetColumnName : targetColumnNames) {
                updateSql.append(targetColumnName).append("=?, ");
                Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
                if (type == null) {
                    throw new RuntimeException("Target column: " + targetColumnName + " not matched");
                }
                BatchExecutor.setValue(values, type, data.get(srcColumnName));
            }
        }
    }
    if (!hasMatched) {
        logger.warn("Did not matched any columns to update ");
        return;
    }
    int len = updateSql.length();
    updateSql.delete(len - 2, len).append(" WHERE ");

    // 拼接主键
    if(dbMapping.getMirrorDb()==false){
        appendCondition(dbMapping, updateSql, ctype, values, data, old);
    }else{
        String id = data.get(Util.cleanColumn("id")).toString();
        updateSql.append(" id = '").append(id).append("'");
    }

    batchExecutor.execute(updateSql.toString(), values);
    if (logger.isTraceEnabled()) {
        logger.trace("Update target table, sql: {}", updateSql);
    }
}

/**
 * 删除操作
 *
 * @param config
 * @param dml
 */
private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
    Map<String, Object> data = dml.getData();
    if (data == null || data.isEmpty()) {
        return;
    }

    DbMapping dbMapping = config.getDbMapping();

    Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

    StringBuilder sql = new StringBuilder();
    if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
        sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName()).append(" DELETE WHERE ");
    }else{
        sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" DELETE WHERE ");
    }

    List<Map<String, ?>> values = new ArrayList<>();

    // 拼接主键
    if(dbMapping.getMirrorDb()==false){
        appendCondition(dbMapping, sql, ctype, values, data);
    }else{
        String id = data.get(Util.cleanColumn("id")).toString();
        sql.append(" id = '").append(id).append("'");
    }

    batchExecutor.execute(sql.toString(), values);
    if (logger.isTraceEnabled()) {
        logger.trace("Delete from target table, sql: {}", sql);
    }
}

/**
 * truncate操作
 *
 * @param config
 */
private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
    DbMapping dbMapping = config.getDbMapping();
    StringBuilder sql = new StringBuilder();
    if (config.getOnCluster() && !config.getClusterName().isEmpty()) {
        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping)).append(" ON CLUSTER ").append(config.getClusterName());
    }else{
        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
    }

    batchExecutor.execute(sql.toString(), new ArrayList<>());
    if (logger.isTraceEnabled()) {
        logger.trace("Truncate target table, sql: {}", sql);
    }
}

大家注意到没有append(" id = '").append(id).append("'");,删除和更新就是根id来的,也就是mysql表和clickhouse表中必须有id字段,最好是主键。mysqlbinlog有个特点,只要有数据变动,表中所有字段都会被记录下来。

7,修改RdbMirrorDbSyncService.java

/**
 * 初始化表配置
 *
 * @param key 配置key: destination.database.table
 * @param baseConfigMap db sync config
 * @param dml DML
 */
private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {
    MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);
    if (mappingConfig == null) {
        // 构造表配置
        mappingConfig = new MappingConfig();
        mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
        mappingConfig.setDestination(baseConfigMap.getDestination());
        mappingConfig.setGroupId(baseConfigMap.getGroupId());
        mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
        mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
        mappingConfig.setClusterName(baseConfigMap.getClusterName());    //设置自定义配置
        mappingConfig.setOnCluster(baseConfigMap.getOnCluster());        //设置自定义配置
        MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
        mappingConfig.setDbMapping(dbMapping);
        dbMapping.setDatabase(dml.getDatabase());
        dbMapping.setTable(dml.getTable());
        dbMapping.setTargetDb(dml.getDatabase());
        dbMapping.setTargetTable(dml.getTable());
        dbMapping.setMirrorDb(baseConfigMap.getDbMapping().getMirrorDb());  //设置MirrorDb自定义配置
        dbMapping.setMapAll(true);
        List pkNames = dml.getPkNames();
        Map<String, String> pkMapping = new LinkedHashMap<>();
        pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
        dbMapping.setTargetPk(pkMapping);

        mirrorDbConfig.getTableConfig().put(key, mappingConfig);
    }
}

注释的地方都是新增的,从代码可以看出,canal1.1.4 adapter中的MirrorDb配置是无效的,因为根本没有读取。

    /**
     * 批量同步方法
     *
     * @param dmls 批量 DML
     */
    public void sync(List dmls) {
        List dmlList = new ArrayList<>();
        for (Dml dml : dmls) {
            String destination = StringUtils.trimToEmpty(dml.getDestination());
            String database = dml.getDatabase();
            MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
            if (mirrorDbConfig == null) {
                continue;
            }
            if (mirrorDbConfig.getMappingConfig() == null) {
                continue;
            }
            if (dml.getGroupId() != null && StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {
                if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {
                    continue; // 如果groupId不匹配则过滤
                }
            }

//            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
//                // DDL
//                if (logger.isDebugEnabled()) {
//                    logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
//                }
//                executeDdl(mirrorDbConfig, dml);
//                rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
//                mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
//            } else {
                // DML
                initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
                dmlList.add(dml);
//            }
        }

这个方法里面部分代码被我注释了,truncate是dml操作,却被认为成了ddl操作,这是个bug。

8,rdb MappingConfig.java添加以下内容

    private String    clusterName;    // 集群名称

    private boolean   onCluster = false; // 是否使用集群

    public String getClusterName() {
        return clusterName;
    }

    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }

    public boolean getOnCluster() {
        return onCluster;
    }

    public void setOnCluster(boolean onCluster) {
        this.onCluster = onCluster;
    }

到这儿代码层面的都结束了

9,debug client-adapter

先打包rdb,命令行也行。右键rdb项目,run maven》install也行

调试的入口,launcher项目的CanalAdapterApplication.java

canal debug

canal debug

10,client-adapter打包

调试通过后执行

# mvn package -DskipTests //client-adapter项目的根目录打包

=======================================省略====================================
[INFO] Copying files to /Users/zhangying/Documents/www/canal/client-adapter/launcher/target/canal-adapter
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for canal client adapter module for otter 1.1.4 1.1.4:
[INFO]
[INFO] canal client adapter module for otter 1.1.4 ........ SUCCESS [ 0.009 s]
[INFO] canal client adapter common module for otter 1.1.4 . SUCCESS [ 3.874 s]
[INFO] canal client adapter logger example module for otter 1.1.4 SUCCESS [ 1.255 s]
[INFO] canal client adapter hbase module for otter 1.1.4 .. SUCCESS [ 13.855 s]
[INFO] canal client adapter elasticsearch module for otter 1.1.4 SUCCESS [ 11.110 s]
[INFO] canal client adapter rdb module for otter 1.1.4 .... SUCCESS [ 0.178 s]
[INFO] canal client adapter launcher module for otter 1.1.4 SUCCESS [ 7.947 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 38.554 s
[INFO] Finished at: 2021-09-18T16:27:55+08:00
[INFO] ------------------------------------------------------------------------

将打包好的代码传到服务端以后,一定要修改一下目录权限,不然起不来

chmod 777 -R plugin
chmod 777 -R logs
chmod 777 -R conf

11,原来canal1.1.4是不支持,全库同步的update,delete

RdbSyncService.java文件中,有以下代码

/**
 * 拼接主键 where条件
 */
private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
                             List<Map<String, ?>> values, Map<String, Object> d) {
    appendCondition(dbMapping, sql, ctype, values, d, null);
}

private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
                             List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
    // 拼接主键
    for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
        String targetColumnName = entry.getKey();
        String srcColumnName = entry.getValue();
        if (srcColumnName == null) {
            srcColumnName = Util.cleanColumn(targetColumnName);
        }
        sql.append(targetColumnName).append("=? AND ");
        Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
        if (type == null) {
            throw new RuntimeException("Target column: " + targetColumnName + " not matched");
        }

全库同步时根本没有targePk,也就是说

for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {

全库同步时,根本走不到这个for循环里面去,导致报错。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/clickhouse/2537.html/comment-page-1

1 条评论

  1. Michael 留言

    非常感谢您的这篇文章!写的非常详细,解决了我在使用 canal 中遇到的很多问题。
    还有一点拙见:truncate 应该是 DDL 而非 DML,这个应该不用改动。
    以及 RdbtEtlService 中的 110 行:
    "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
    是否也应改成 clickhouse 的 alter table ... delete where 语法为好?
    再次感谢您的文章!