在看这篇文章前,请先了解一下:
1,cdh6 flink 安装
2,flink on yarn 提交任务的二种方式
3,cdh hive 2.1.1 升级到 2.3.4
一,添加flink连接hive依赖包
1,安装mysql-connector-java
# yum install mysql-connector-java # cp /usr/share/java/mysql-connector-java.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
2,将hive2.3.4下面相关jar复制到flink home目录下的lib
# cp /hive2.3.4路径/lib/hive-shims-common-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/hive-common-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/hive-metastore-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/hive-exec-2.3.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/libfb303-0.9.3.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/javax.jdo-3.2.0-m3.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/mysql-connector-java.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/antlr-runtime-3.5.2.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/datanucleus-api-jdo-4.2.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/datanucleus-core-4.1.17.jar /opt/cloudera/parcels/FLINK/lib/flink/lib # cp /hive2.3.4路径/lib/datanucleus-rdbms-4.1.19.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
注意:hive2.3.4用的是commons-cli-1.2.jar,flink1.9.1已不支持,用了会报以下错误
2020-03-31 18:01:15,003 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while running the Flink Yarn session.
java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.<init>(FlinkYarnSessionCli.java:197)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.<init>(FlinkYarnSessionCli.java:173)
解决办法:用commons-cli-1.4.jar就可以
# wget https://repo1.maven.org/maven2/commons-cli/commons-cli/1.4/commons-cli-1.4.jar # cp commons-cli-1.4.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
3,下载flink的hive包和hadoop包
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_2.12/1.9.1/flink-hadoop-compatibility_2.12-1.9.1.jar # wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.9.1/flink-connector-hive_2.12-1.9.1.jar # cp flink-hadoop-compatibility_2.12-1.9.1.jar flink-connector-hive_2.12-1.9.1.jar /opt/cloudera/parcels/FLINK/lib/flink/lib
注意:版本最好是对上,flink-connector-hive_2.12-1.9.1.jar,2.12是scala的版本号,1.9.1是flink的版本号。
二,配置flink连接hive
1,修改sql-client-defaults.yaml 配置
# cd /opt/cloudera/parcels/FLINK/lib/flink/conf/ # cp sql-client-defaults.yaml sql-client-defaults_bak # vim sql-client-defaults.yaml ............省略............. #catalogs: [] # empty list # A typical catalog definition looks like: # - name: myhive # type: hive # hive-conf-dir: /opt/hive_conf/ # default-database: ... catalogs: - name: myhive type: hive hive-conf-dir: /etc/hive/conf.cloudera.hive hive-version: 2.3.4
2,多节点同步文件
# scp conf/sql-client-defaults.yaml bigserver4:/opt/cloudera/parcels/FLINK/lib/flink/conf # scp conf/sql-client-defaults.yaml bigserver3:/opt/cloudera/parcels/FLINK/lib/flink/conf # scp conf/sql-client-defaults.yaml bigserver2:/opt/cloudera/parcels/FLINK/lib/flink/conf # scp conf/sql-client-defaults.yaml bigserver5:/opt/cloudera/parcels/FLINK/lib/flink/conf # scp -r lib bigserver4:/opt/cloudera/parcels/FLINK/lib/flink/ # scp -r lib bigserver3:/opt/cloudera/parcels/FLINK/lib/flink/ # scp -r lib bigserver2:/opt/cloudera/parcels/FLINK/lib/flink/ # scp -r lib bigserver5:/opt/cloudera/parcels/FLINK/lib/flink/
3,在后台重新启动flink
三,测试flink sql
先在hive中创建表, hive> CREATE TABLE mytable(name string, value double);
# ./sql-client.sh embedded //启动flink sql
四,遇到的问题总结
1,hive版本不兼容产生的错误
1.1,Flink SQL> select * from mytable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.thrift.TApplicationException: Invalid method name: 'get_table_req'
1.2,Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create environment instance.
at org.apache.flink.table.client.gateway.local.ExecutionContext.createEnvironmentInstance(ExecutionContext.java:195)
at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
... 2 more
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Unsupported Hive version 2.1.1
at org.apache.flink.table.catalog.hive.client.HiveShimLoader.lambda$loadHiveShim$0(HiveShimLoader.java:53)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
2,sql语法错误
2.1,Flink SQL> CREATE TABLE mytable(name string, age int);
[ERROR] Unknown or invalid SQL statement.
不支持create table,
2.2,Flink SQL> select * from tank_test3 limit 10;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
不支持limit
2.3,This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
3,模式错误
Flink SQL> insert into mytable values('hadoop', 2);
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.
解决办法:
Flink SQL> SET execution.type=batch;
[INFO] Session property has been set.
4,集群未启动
Flink SQL> insert into mytable values('hadoop', 2);
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: 拒绝连接
解决办法:
./bin/start-cluster.sh 启动成功后,8081端口会起来
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2392.html