您的位置 首页 java

Flink1.10集成Hive快速入门

H i v e 是大数据领域最早出现的 SQL 引擎,发展至今有着丰富的功能和广泛的用户基础。 之后出现的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了与 Hive 集成的功能,从而方便用户使用现有的数据仓库、进行作业迁移等。

Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在最新版Flink1.10版本,标志着对 Blink的整合宣告完成,达到了对 Hive 的生产级别集成,Hive作为数据仓库系统的绝对核心,承担着绝大多数的离线数据ETL计算和数据管理,期待Flink未来对Hive的完美支持。

而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。

添加依赖

要与Hive集成,需要在Flink的lib目录下添加额外的依赖jar包,以使集成在Table API程序或SQL Client中的SQL中起作用。或者,可以将这些依赖项放在文件夹中,并分别使用Table API程序或SQL Client 的 -C -l 选项将它们添加到classpath中。本文使用第一种方式,即将jar包直接复制到$FLINK_HOME/lib目录下。本文使用的Hive版本为2.3.4(对于不同版本的Hive,可以参照官网选择不同的jar包依赖),总共需要3个jar包,如下:

  • flink-connector-hive_2.11-1.10.0.jar

  • flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

  • hive-exec-2.3.4.jar

其中hive-exec-2.3.4.jar在hive的lib文件夹下,另外两个需要自行下载,下载地址:flink-connector-hive_2.11-1.10.0.jar

[ ]

flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

[ ]

切莫拔剑四顾心茫然,话不多说,直接上代码。

Flink1.10集成Hive快速入门

构建程序

添加Maven依赖

  <!-- Flink Dependency -->  
< dependency >
   < groupId > org.apache.flink </ groupId >
   < artifactId > flink-connector-hive_2.11 </ artifactId >
   < version > 1.10.0 </ version >
   < scope > provided </ scope >
</ dependency >

< dependency >
   < groupId > org.apache.flink </ groupId >
   < artifactId > flink-table-api-java-bridge_2.11 </ artifactId >
   < version > 1.10.0 </ version >
   < scope > provided </ scope >
</ dependency >

<!-- Hive Dependency -->
< dependency >
     < groupId > org.apache.hive </ groupId >
     < artifactId > hive-exec </ artifactId >
     < version > ${hive.version} </ version >
     < scope > provided </ scope >
</ dependency >   

实例代码

  package  com.flink.sql.hiveintegration; 

import  org.apache.flink.table.api.EnvironmentSettings;
import  org.apache.flink.table.api.TableEnvironment;
import  org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 *   @Created  with IntelliJ IDEA.
 *   @author  : jmx
 *   @Date : 2020/3/31
 *   @Time : 13:22
 *  
 */

public   class   FlinkHiveIntegration   {

     public   static   void   main (String[] args)   throws  Exception  {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()  // 使用BlinkPlanner
                .inBatchMode()  // Batch模式,默认为StreamingMode
                .build();

         //使用StreamingMode
        /* EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 使用BlinkPlanner
                .inStreamingMode() // StreamingMode
                .build();*/


        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name =  "myhive" ;       // Catalog名称,定义一个唯一的名称表示
        String defaultDatabase =  "qfbap_ods" ;   // 默认数据库名称
        String hiveConfDir =  "/opt/modules/apache-hive-2.3.4-bin/conf" ;   // hive-site.xml路径
        String version =  "2.3.4" ;        // Hive版本号

        HiveCatalog hive =  new  HiveCatalog(name, defaultDatabase, hiveConfDir, version);

        tableEnv.registerCatalog( "myhive" , hive);
        tableEnv.useCatalog( "myhive" );
         // 创建数据库,目前不支持创建hive表
        String createDbSql =  "CREATE DATABASE IF NOT EXISTS myhive.test123" ;

        tableEnv.sqlUpdate(createDbSql);  

    }
}

Flink SQL Client集成Hive

Flink的表和SQL API可以处理用SQL语言编写的查询,但是这些查询需要嵌入到用Java或Scala编写的程序中。此外,这些程序在提交到集群之前需要与构建工具打包。这或多或少地限制了Java/Scala程序员对Flink的使用。

SQL客户端旨在提供一种简单的方式,无需一行Java或Scala代码,即可将表程序编写、调试和提交到Flink集群。Flink SQL客户端CLI允许通过命令行的形式运行分布式程序。使用Flink SQL cli访问Hive,需要配置sql-client-defaults.yaml文件。

sql-client-defaults.yaml配置

目前 HiveTableSink 不支持流式写入(未实现 AppendStreamTableSink)。需要将执行模式改成 batch
模式,否则会报如下错误:

 org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. 

需要修改的配置内容如下:

  #...省略的配置项...  

#==============================================================================
# Catalogs
#==============================================================================
# 配置catalogs,可以配置多个.
catalogs:  # empty list
  - name: myhive
     type : hive
    hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
    hive-version: 2.3.4
    default-database: qfbap_ods

#...省略的配置项...

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table program.

execution:
   # select the implementation responsible for planning table programs
   # possible values are 'blink' (used by default) or 'old'
  planner: blink
   # 'batch' or 'streaming' execution
   type : batch

启动Flink SQL Cli

 bin/sql-client.sh  embedded 

在启动之前,确保Hive的metastore已经开启了,否则会报 Failed to create Hive Metastore client 异常。启动成功,如下图:

Flink1.10集成Hive快速入门

启动之后,就可以在此Cli下执行SQL命令访问Hive的表了,基本的操作如下:

  -- 命令行帮助  
Flink SQL>  help
-- 查看当前会话的catalog,其中myhive为自己配置的,default_catalog为默认的
Flink  SQL show  catalogs;
default_catalog
myhive
-- 使用catalog
Flink SQL>  use   catalog  myhive;
-- 查看当前catalog的数据库
Flink SQL>  show   databases ;
-- 创建数据库
Flink SQL>  create   database  testdb;
-- 删除数据库
Flink SQL>  drop   database  testdb;
-- 创建表
Flink SQL>  create   table  tbl( id   int , name   string );
-- 删除表
Flink SQL>  drop   table  tbl;
-- 查询表
Flink SQL>  select  *  from   code_city;
-- 插入数据
Flink SQL>  insert  overwrite code_city  select   id ,city,province,event_time  from  code_city_delta ;
Flink SQL>  INSERT   into  code_city  values ( 1 , '南京' , '江苏' , '' );

小结

本文以最新版本的Flink为例,对Flink集成Hive进行了实操。首先通过代码的方式与Hive进行集成,然后介绍了如何使用Flink SQL 客户端访问Hive,并对其中会遇到的坑进行了描述,最后给出了Flink SQL Cli的详细使用。相信在未来的版本中Flink SQL会越来越完善,期待Flink未来对Hive的完美支持。

文章来源:智云一二三科技

文章标题:Flink1.10集成Hive快速入门

文章地址:https://www.zhihuclub.com/199535.shtml

关于作者: 智云科技

热门文章

网站地图