cdh flink sql 整合 hive

张映 发表于 2020-04-02

分类目录: hadoop/spark/scala

标签:, , ,

在看这篇文章前,请先了解一下:

1,cdh6 flink 安装
2,flink on yarn 提交任务的二种方式
3,cdh hive 2.1.1 升级到 2.3.4

一,添加flink连接hive依赖包

1,安装mysql-connector-java

# yum install mysql-connector-java
# cp /usr/share/java/mysql-connector-java.jar /opt/cloudera/parcels/FLINK/lib/flink/lib

2,将hive2.3.4下面相关jar复制到flink home目录下的lib

# cp /hive2.3.4路径/lib/hive-shims-common-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/hive-common-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/hive-metastore-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/hive-exec-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/libfb303-0.9.3.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/javax.jdo-3.2.0-m3.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/mysql-connector-java.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/antlr-runtime-3.5.2.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/datanucleus-api-jdo-4.2.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/datanucleus-core-4.1.17.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
# cp /hive2.3.4路径/lib/datanucleus-rdbms-4.1.19.jar /opt/cloudera/parcels/FLINK/lib/flink/lib

注意:hive2.3.4用的是commons-cli-1.2.jar,flink1.9.1已不支持,用了会报以下错误

2020-03-31 18:01:15,003 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while running the Flink Yarn session.
java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.<init>(FlinkYarnSessionCli.java:197)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.<init>(FlinkYarnSessionCli.java:173)

解决办法:用commons-cli-1.4.jar就可以

# wget https://repo1.maven.org/maven2/commons-cli/commons-cli/1.4/commons-cli-1.4.jar
# cp commons-cli-1.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib

3,下载flink的hive包和hadoop包

# wget https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_2.12/1.9.1/flink-hadoop-compatibility_2.12-1.9.1.jar
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.9.1/flink-connector-hive_2.12-1.9.1.jar
# cp flink-hadoop-compatibility_2.12-1.9.1.jar flink-connector-hive_2.12-1.9.1.jar /opt/cloudera/parcels/FLINK/lib/flink/lib

注意:版本最好是对上,flink-connector-hive_2.12-1.9.1.jar,2.12是scala的版本号,1.9.1是flink的版本号。

flink scala版本

flink scala版本

二,配置flink连接hive

1,修改sql-client-defaults.yaml 配置

# cd /opt/cloudera/parcels/FLINK/lib/flink/conf/
# cp sql-client-defaults.yaml sql-client-defaults_bak

# vim sql-client-defaults.yaml
............省略.............
#catalogs: [] # empty list
# A typical catalog definition looks like:
# - name: myhive
# type: hive
# hive-conf-dir: /opt/hive_conf/
# default-database: ...

catalogs:
 - name: myhive
 type: hive
 hive-conf-dir: /etc/hive/conf.cloudera.hive
 hive-version: 2.3.4

2,多节点同步文件

# scp conf/sql-client-defaults.yaml bigserver4:/opt/cloudera/parcels/FLINK/lib/flink/conf
# scp conf/sql-client-defaults.yaml bigserver3:/opt/cloudera/parcels/FLINK/lib/flink/conf
# scp conf/sql-client-defaults.yaml bigserver2:/opt/cloudera/parcels/FLINK/lib/flink/conf
# scp conf/sql-client-defaults.yaml bigserver5:/opt/cloudera/parcels/FLINK/lib/flink/conf

# scp -r lib bigserver4:/opt/cloudera/parcels/FLINK/lib/flink/
# scp -r lib bigserver3:/opt/cloudera/parcels/FLINK/lib/flink/
# scp -r lib bigserver2:/opt/cloudera/parcels/FLINK/lib/flink/
# scp -r lib bigserver5:/opt/cloudera/parcels/FLINK/lib/flink/

3,在后台重新启动flink

三,测试flink sql

先在hive中创建表, hive> CREATE TABLE mytable(name string, value double);

# ./sql-client.sh embedded //启动flink sql
flink sql 测试插入

flink sql 测试插入

flink sql 测试查询

flink sql 测试查询

四,遇到的问题总结

1,hive版本不兼容产生的错误

1.1,Flink SQL> select * from mytable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.thrift.TApplicationException: Invalid method name: 'get_table_req'

1.2,Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.

at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create environment instance.
at org.apache.flink.table.client.gateway.local.ExecutionContext.createEnvironmentInstance(ExecutionContext.java:195)
at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
... 2 more
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Unsupported Hive version 2.1.1
at org.apache.flink.table.catalog.hive.client.HiveShimLoader.lambda$loadHiveShim$0(HiveShimLoader.java:53)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)

2,sql语法错误

2.1,Flink SQL> CREATE TABLE mytable(name string, age int);
[ERROR] Unknown or invalid SQL statement.

不支持create table,

2.2,Flink SQL> select * from tank_test3 limit 10;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

不支持limit

2.3,This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

3,模式错误

Flink SQL> insert into mytable values('hadoop', 2);
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

解决办法:

Flink SQL> SET execution.type=batch;
[INFO] Session property has been set.

4,集群未启动

Flink SQL> insert into mytable values('hadoop', 2);
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: 拒绝连接

解决办法:

./bin/start-cluster.sh  启动成功后,8081端口会起来



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2392.html