您的位置 首页 java

这是一篇有深度的文章,没研究过kettle源码是看不懂的

直接上干货

引入依赖

 <properties>
    <pdi.version>8.2.0.0-342</pdi.version>
</properties>

<!-- kettle  dependency start-->
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-core</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-dbdialog</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-engine</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>metastore</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>pentaho-metadata</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-ui-swt</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-vfs-browser</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.pentaho.di.plugins</groupId>
            <artifactId>kettle-sap-plugin-core</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.pentaho.di.plugins</groupId>
            <artifactId>kettle-json-plugin-core</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <!-- big data plugin start  -->
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-api- hdfs </artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho- Hadoop -shims-osgi-jaas</artifactId>
            <version>8.2.0.0-342</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-legacy</artifactId>
            <version>8.2.0.0-342</version>
            <exclusions> <!-- 解决依赖冲突问题  下同 -->
                <exclusion>
                    <groupId>hsqldb</groupId>
                    <artifactId>hsqldb</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>libthrift</groupId><!--  hive 报错,可能是这个依赖冲突 -->
                    <artifactId>libthrift</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-plugin</artifactId>
            <version>${pdi.version}</version>
            <type>pom</type>
            <exclusions>
                <exclusion>
                    <groupId>libthrift</groupId>
                    <artifactId>libthrift</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.hibernate</groupId>
                    <artifactId>hibernate-validator</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework.security</groupId>
                    <artifactId>spring-security-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>pentaho</groupId>
                    <artifactId>pentaho-big-data-assemblies-pmr-libraries</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>5.1.0.Final</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-kettle-plugins-hdfs</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-impl-cluster</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.osgi</groupId>
            <artifactId>org.osgi.compendium</artifactId>
            <version>4.3.1</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-impl-vfs-hdfs</artifactId>
            <version>8.2.0.0-342</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-impl-shim-initializer</artifactId>
            <version>8.2.0.0-342</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-impl-shim-hdfs</artifactId>
            <version>8.2.0.0-342</version>
            <exclusions>
                <exclusion>
                    <groupId>*</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-impl-shim-common</artifactId>
            <version>8.2.0.0-342</version>
        </dependency>
        <!--  -->
<!-- 对应客户端软件里 hadoop -configurations里面的hdp30配置 -->
        <dependency>
            <groupId>org.pentaho</groupId><!-- hdp30依赖,不同发行商引入不同的依赖 -->
            <artifactId>pentaho-hadoop-shims-hdp30-package</artifactId>
            <version>8.2.2018.11.00-342</version>
            <type>zip</type>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.netbeans</groupId>
                    <artifactId>mof</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.eclipse.jetty</groupId>
                    <artifactId>jetty-xml</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>pentaho-big-data-impl-shim-shimTests</artifactId>
            <version>8.2.0.0-342</version>
        </dependency>
        <!-- big data plugin end -->
        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>commons-xul-core</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>commons-xul-swt</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>commons-xul-swing</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.pentaho.di.plugins</groupId>
            <artifactId>pdi-xml-plugin-core</artifactId>
            <version>${pdi.version}</version>
        </dependency>
        <!--kettle dependency end-->

<dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>

<!-- 将 big data  plugin引入需要用到的依赖,依赖中涉及osgi,BundleContext -->
<dependency>
            <groupId>org. apache .felix</groupId>
            <artifactId>org.apache.felix.framework</artifactId>
            <version>4.2.1</version>
        </dependency>
<dependency>
            <groupId>org.osgi</groupId>
            <artifactId>org.osgi.core</artifactId>
            <version>4.3.1</version>
            <scope>provided</scope>
        </dependency>

<!-- hadoop client相关依赖 -->
<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId> hbase -client</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>  

仓库设置

 <repositories>
        <repository>
            <id>pentaho-public</id>
            <name>Pentaho Public</name>
            <url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>interval:15</updatePolicy>
            </snapshots>
        </repository>
        <repository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>

    </repositories>
复制代码  

功能实现

1、kettle-steps.xml

将源码中的kettle-steps.xml文件复制到src/main/resource/下,并添加如下内容:

 <step id="Hadoop File Input">     <description>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileInput</description>  <classname>org.pentaho.big.data.kettle.plugins.hdfs.trans.HadoopFileInputMeta</classname><category>i18n:org.pentaho.di.trans.step:BaseStep.Category.BigData</category>  <tooltip>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileInput</tooltip>  <iconfile>ui/images/HDI.svg</iconfile> <documentation_url></documentation_url>  <cases_url/> <forum_url/></step>
<step id="HadoopFileOutput">  <description>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileOutput</description>  <classname>org.pentaho.big.data.kettle.plugins.hdfs.trans.HadoopFileOutputMeta</classname><category>i18n:org.pentaho.di.trans.step:BaseStep.Category.BigData</category>  <tooltip>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileOutput</tooltip>  <iconfile>ui/images/HDO.svg</iconfile> <documentation_url></documentation_url>  <cases_url/> <forum_url/>
</step>
        
复制代码  

2、messages多语言

新建src/main/resources/org/pentaho/di/trans/step/messages/messages_en_US.properties、src/main/resources/org/pentaho/di/trans/step/messages/messages_zh_CN.properties文件,

加入下内容:

 BaseStep.Category.BigData=Big Data
BaseStep.TypeLongDesc.HadoopFileInput=Hadoop file input
BaseStep.TypeLongDesc.HadoopFileOutput=Hadoop file output
复制代码  

3、图标

将源码中HDI.svg、HDO.svg两个图标复制到 src/main/resources/ui/images/下

4、其他配置

将big-data-plugin-8.2legacysrctestresourcesplugin.properties文件复制到自己项目的srcmainresourcesplugin.properties。

将big-data-plugin-8.2assembliespmr-librariessrcmainresourcesclassespmr.properties文件复制到srcmainresourcespmr.properties。

将big-data-plugin-8.2legacysrcmainresourcesMETA-INFversion.properties文件复制到srcmainresourcesMETA-INFversion.properties。

将pdi-ce-8.2.0.0-342data-integrationpluginspentaho-big-data-plugin/hadoop-configurations/下配置文件复制到srcmainresourceshadoop-configurations。将红框中的文件复制。如下图:

5、Hadoop Plugin初始化配置

 @Override
    public void contextInitialized(ServletContextEvent context) {
        try {
            // 日志缓冲不超过5000行,缓冲时间不超过720秒
            KettleLogStore.init(5000, 720);
            KettleEnvironment.init();

// HadoopSpoonPlugin插件注册,读取plugin.properties文件  start
            String classname = "org.pentaho.di.core.hadoop.HadoopSpoonPlugin";

            Class<LifecyclePluginType> pluginType = LifecyclePluginType.class;

            Map<Class<?>, String> classMap = new HashMap<Class<?>, String>();

            PluginMainClassType mainClassTypesAnnotation = pluginType.getAnnotation( PluginMainClassType.class );
            classMap.put( mainClassTypesAnnotation.value(), classname );

// 解决打包成jar包时从jar包里读取文件url的问题
            URL resource = getClass().getClassLoader()
                    .getResource("");
            String tempUri = resource.toString().replace("classes!", "classes");
            URL url = new URL(tempUri);
            System.out.println("new url " + url);
            // end

            Plugin plugin = new Plugin(new String[] {HadoopSpoonPlugin.PLUGIN_ID },
                    StepPluginType.class,
                    LifecyclePluginType.class.getAnnotation(PluginMainClassType.class).value(),
                    "", "", "", null, false, false,
                    classMap, new ArrayList<String>(), null, url);
            PluginRegistry.getInstance().registerPlugin(LifecyclePluginType.class, plugin);
            // HadoopSpoonPlugin插件注册 end

            // HadoopConfigurationBootstrap实例化
             HadoopConfigurationBootstrap hadoopConfigurationBootstrap = HadoopConfigurationBootstrap.getInstance();
            // 赋值Prompter
             hadoopConfigurationBootstrap.setPrompter(new HadoopConfigurationPrompter() {
                 @Override
                 public String getConfigurationSelection(final List<HadoopConfigurationInfo> hadoopConfigurationInfos) {
                     return "hdp30";
                 }

                 @Override
                 public void promptForRestart() {

                 }
             });

            // hadoop环境初始化,根据plugin.active.hadoop.configuration属性到对应目录下读取hadoop-configurations里的配置文件
            hadoopConfigurationBootstrap.onEnvironmentInit();
// OSGI的东西
             BundleContext bundleContext = SpringUtil.getBean("bundleContext");
            
             ShimBridgingServiceTracker shimBridgingServiceTracker = new ShimBridgingServiceTracker();
// 初始化一些配置
             HadoopFileSystemFactoryLoader hadoopFileSystemFactoryLoader =
                     new HadoopFileSystemFactoryLoader( bundleContext, shimBridgingServiceTracker, hadoopConfigurationBootstrap );


            PropsUI.init("KettleWebConsole", Props.TYPE_PROPERTIES_KITCHEN);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
复制代码  

6、Kettle启动配置

 @Component
public class KettleStart implements ApplicationContextAware, LifeEventHandler {


    private volatile static KettleStart kettleStart;
    public static KettleDatabaseRepositoryMeta meta;

    private LogChannelInterface log;
    private TransExecutionConfiguration transExecutionConfiguration;
    private TransExecutionConfiguration transPreviewExecutionConfiguration;
    private TransExecutionConfiguration transDebugExecutionConfiguration;
    private JobExecutionConfiguration jobExecutionConfiguration;
    public PropsUI props;

    // loads the lifecycle listeners
    private LifecycleSupport lifecycleSupport = new LifecycleSupport();


    private KettleStart() {
        metaStore = new DelegatingMetaStore();
        try {
            IMetaStore localMetaStore = MetaStoreConst.openLocalPentahoMetaStore();
            metaStore.addMetaStore( localMetaStore );
            metaStore.setActiveMetaStoreName( localMetaStore.getName() );
        } catch ( MetaStoreException e ) {
            log.logError( "Unable to open local Pentaho Metastore", e );
        }

        props = PropsUI.getPropsUIInstance();
        log = new LogChannel( PropsUI.getAppName());
        loadSettings();

        transExecutionConfiguration = new TransExecutionConfiguration();
        transExecutionConfiguration.setGatheringMetrics( true );
        transPreviewExecutionConfiguration = new TransExecutionConfiguration();
        transPreviewExecutionConfiguration.setGatheringMetrics( true );
        transDebugExecutionConfiguration = new TransExecutionConfiguration();
        transDebugExecutionConfiguration.setGatheringMetrics( true );

        jobExecutionConfiguration = new JobExecutionConfiguration();

        variables = new RowMetaAndData( new RowMeta() );

        try {
            lifecycleSupport.onStart( this );
        } catch ( LifecycleException e ) {
            e.printStackTrace();
        }
    }

    public void loadSettings() {
        LogLevel logLevel = LogLevel.getLogLevelForCode(props.getLogLevel());
        DefaultLogLevel.setLogLevel(logLevel);
        log.setLogLevel(logLevel);
        KettleLogStore.getAppender().setMaxNrLines(props.getMaxNrLinesInLog());

        DBCache.getInstance().setActive(props.useDBCache());
    }

    @Bean
    public static KettleStart getInstance() {
        if (kettleStart == null) {
            synchronized (KettleStart.class) {
                if(kettleStart == null) {
                    kettleStart = new KettleStart();
                }
            }
        }
        return kettleStart;
    }

    private Repository repository;

    public Repository getRepository() {
        return repository;
    }

    private Repository defaultRepository;


    public Repository getDefaultRepository() {
        return this.defaultRepository;
    }

    public void selectRepository(Repository repo) {
        if(repository != null) {
            repository.disconnect();
        }
        repository = repo;
    }

    private DelegatingMetaStore metaStore;

    public DelegatingMetaStore getMetaStore() {
        return metaStore;
    }

    public LogChannelInterface getLog() {
        return log;
    }

    private RowMetaAndData variables = null;
    private ArrayList<String> arguments = new ArrayList<String>();

    public String[] getArguments() {
        return arguments.toArray(new String[arguments.size()]);
    }

    public JobExecutionConfiguration getJobExecutionConfiguration() {
        return jobExecutionConfiguration;
    }

    public TransExecutionConfiguration getTransDebugExecutionConfiguration() {
        return transDebugExecutionConfiguration;
    }

    public TransExecutionConfiguration getTransPreviewExecutionConfiguration() {
        return transPreviewExecutionConfiguration;
    }

    public TransExecutionConfiguration getTransExecutionConfiguration() {
        return transExecutionConfiguration;
    }

    public RowMetaAndData getVariables() {
        return variables;
    }


}
复制代码  

7、bean配置

big-data-plugin插件一些类初始化在kettle客户端操作的,所以这里我们需要手动构建

 // OSGI的东西
@Bean
    @Scope("singleton")
    public BundleContext bundleContext() throws BundleException {
        FrameworkFactory frameworkFactory = ServiceLoader
                .load(FrameworkFactory.class).iterator().next();
        Framework framework = frameworkFactory.newFramework(new HashMap<>());
        framework.start();
        BundleContext bundleContext = framework.getBundleContext();
        return bundleContext;
    }

    @Bean(value = "namedClusterService" )
    @Scope("singleton")
    public NamedClusterService namedClusterService() {
        BundleContext bundleContext = SpringUtil.getBean("bundleContext");
        NamedClusterManager namedClusterManager = new NamedClusterManager();
        namedClusterManager.setBundleContext(bundleContext);
        return namedClusterManager;
    }

    @Bean(value = "baseMessagesMessageGetterFactoryImpl")
    @Scope("singleton")
    public BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl() {
        return new BaseMessagesMessageGetterFactoryImpl();
    }

    @Bean(value = "runtimeTestActionHandlers")
    @Scope("singleton")
    public RuntimeTestActionHandler runtimeTestActionHandlers() {
        BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl = SpringUtil.getBean("baseMessagesMessageGetterFactoryImpl");
        return new LoggingRuntimeTestActionHandlerImpl(baseMessagesMessageGetterFactoryImpl);
    }

    @Bean(value = "loggingRuntimeTestActionHandlerImpl")
    @Scope("singleton")
    public LoggingRuntimeTestActionHandlerImpl loggingRuntimeTestActionHandlerImpl() {
        BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl = SpringUtil.getBean("baseMessagesMessageGetterFactoryImpl");
        return new LoggingRuntimeTestActionHandlerImpl(baseMessagesMessageGetterFactoryImpl);
    }

    @Bean(value = "runtimeTestActionService")
    @Scope("singleton")
    public RuntimeTestActionService runtimeTestActionService() {
        LoggingRuntimeTestActionHandlerImpl runtimeTestActionHandler = SpringUtil.getBean("runtimeTestActionHandlers");
        List<RuntimeTestActionHandler> runtimeTestActionHandlers = new ArrayList<>();
        runtimeTestActionHandlers.add(runtimeTestActionHandler);
        LoggingRuntimeTestActionHandlerImpl loggingRuntimeTestActionHandlerImpl = SpringUtil.getBean("loggingRuntimeTestActionHandlerImpl");
        return new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, loggingRuntimeTestActionHandlerImpl);
    }


    @Bean(value = "runtimeTesterImpl")
    @Scope("singleton")
    public LoggingRuntimeTestActionHandlerImpl runtimeTesterImpl() {
        BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl = SpringUtil.getBean("baseMessagesMessageGetterFactoryImpl");
        return new LoggingRuntimeTestActionHandlerImpl(baseMessagesMessageGetterFactoryImpl);
    }

    @Bean(value = "runtimeTester")
    @Scope("singleton")
    public RuntimeTester runtimeTester() {
        List<String> orderedModules = new ArrayList<>();
        orderedModules.add("Hadoop Configuration");
        orderedModules.add("Hadoop File System");
        orderedModules.add("Map Reduce");
        orderedModules.add("Oozie");
        orderedModules.add("Zookeeper");
        return new RuntimeTesterImpl(null, null, orderedModules.toString());
    }

// 由于HadoopFileInputMeta、HadoopFileOutputMeta构造方法有参,而kettle源码实例化时是以无参的方式实例化,所以这里直接创建bean,需要用到的地方引用
    @Bean(value = "hadoopFileInputMeta")
    @Scope("prototype")
    public HadoopFileInputMeta hadoopFileInputMeta() throws KettlePluginException {
        NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
        RuntimeTestActionService runtimeTestActionService = SpringUtil.getBean("runtimeTestActionService");
        RuntimeTester runtimeTester = SpringUtil.getBean("runtimeTester");
        return new HadoopFileInputMeta(namedClusterService, runtimeTestActionService, runtimeTester);
    }

    @Bean(value = "hadoopFileOutputMeta")
    @Scope("prototype")
    public HadoopFileOutputMeta hadoopFileOutputMeta() {
        NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
        RuntimeTestActionService runtimeTestActionService = SpringUtil.getBean("runtimeTestActionService");
        RuntimeTester runtimeTester = SpringUtil.getBean("runtimeTester");
        return new HadoopFileOutputMeta(namedClusterService, runtimeTestActionService, runtimeTester);
    }

    @Bean(value = "clusterInitializerProviders")
    @Scope("singleton")
    public ClusterInitializerProvider clusterInitializerProviders() {
        return new ClusterInitializerProviderImpl(HadoopConfigurationBootstrap.getInstance());
    }

    @Bean(value = "clusterInitializer")
    @Scope("singleton")
    public ClusterInitializer clusterInitializer() {
        ClusterInitializerProvider clusterInitializerProviders = SpringUtil.getBean("clusterInitializerProviders");
        List<ClusterInitializerProvider> list = new ArrayList<>();
        list.add(clusterInitializerProviders);
        return new ClusterInitializerImpl(list);
    }



    @Bean(value = "hdfsFileNameParser")
    @Scope("singleton")
    public HDFSFileNameParser hdfsFileNameParser() {
        return HDFSFileNameParser.getInstance();
    }

    @Bean(value = "hadoopFileSystemFactories")
    @Scope("singleton")
    public HadoopFileSystemFactory hadoopFileSystemFactories() throws ConfigurationException {
        HadoopConfigurationBootstrap hadoopConfigurationBootstrap = HadoopConfigurationBootstrap.getInstance();
        HadoopConfiguration activeConfiguration = hadoopConfigurationBootstrap.getProvider().getActiveConfiguration();
        return new HadoopFileSystemFactoryImpl(true, activeConfiguration, null);
    }

    @Bean(value = "hadoopFileSystemService")
    @Scope("singleton")
    public HadoopFileSystemLocator hadoopFileSystemService() {
        HadoopFileSystemFactory hadoopFileSystemFactory = SpringUtil.getBean("hadoopFileSystemFactories");
        List<HadoopFileSystemFactory> hadoopFileSystemFactories = new ArrayList<>();
        hadoopFileSystemFactories.add(hadoopFileSystemFactory);
        ClusterInitializer clusterInitializer = SpringUtil.getBean("clusterInitializer");
        return new HadoopFileSystemLocatorImpl(hadoopFileSystemFactories, clusterInitializer);
    }

// 实例化后会在PluginType里创建hdfs类型,在根据schema获取hdfs文件处理器时会用到
    @Bean(value = "hDFSFileProvider")
    @Scope("singleton")
    public HDFSFileProvider hDFSFileProvider() throws FileSystemException {
        HadoopFileSystemLocator hadoopFileSystemService = SpringUtil.getBean("hadoopFileSystemService");
        NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
        HDFSFileNameParser hdfsFileNameParser = SpringUtil.getBean("hdfsFileNameParser");
        return new HDFSFileProvider(hadoopFileSystemService, namedClusterService, hdfsFileNameParser, "hdfs");
    }
复制代码  

8、源码修改

(1)HadoopFileInputMeta.java

解决属性读取不到问题。方法loadSourceRep里修改如下:

(2) KettleDatabaseRepositoryStepDelegate.java

方法loadStepMeta里修改如下:

 if ( sp == null ) {
                    stepMeta.setStepMetaInterface( new MissingTrans( stepMeta.getName(), stepMeta.getStepID() ) );
                } else {
    // 解决kettle底层无参构造hadoopFileInputMeta,hadoopFileOutputMeta无namedClusterService实例导致报错问题
                    if ("HadoopFileInput".equals(stepMeta.getStepID())) {
                        HadoopFileInputMeta hadoopFileInputMeta = SpringUtil.getBean("hadoopFileInputMeta");
                        stepMeta.setStepMetaInterface(hadoopFileInputMeta);
                    } else if("HadoopFileOutput".equals(stepMeta.getStepID())) {
                        HadoopFileOutputMeta hadoopFileOutputMeta = SpringUtil.getBean("hadoopFileOutputMeta");
                        stepMeta.setStepMetaInterface(hadoopFileOutputMeta);
                    } else {
                        stepMeta.setStepMetaInterface( (StepMetaInterface) registry.loadClass( sp ) );
                    }
                }
复制代码  

(3) PluginPropertiesUtil.java

解决打成jar包后读取hadoop-configuration下的文件包找不到路径错误。

getPath获取的是不带schema的路径:例如:file:/D:/etl/target/classes/

而toString会带上schema,例如:jar:file:/D:/test/DC-etl-web.jar!/BOOT-INF/classes/

错误的根本原因看下图:

(4) HDFSFileProvider.java

修改内容见下图:

(5) HadoopFileSystemFactoryImpl.java

这里是比较重要的修改。由于kettle客户端软件使用hadoop集群时,换一个集群是需要替换hadoop-configuration下对应的配置文件(例如core-site.xml, hdfs-site.xml) 并且重启客户端;而我们集成到SpringBoot项目时,不可能换一个集群就重启项目,如何不重启就能使用任意一个hadoop集群,无论是否kerberos鉴权。修改源码部分见下:

 @SneakyThrows
    @Override
    public HadoopFileSystem create( NamedCluster namedCluster, URI uri ) throws IOException {
        final URI finalUri = uri != null ? uri : URI.create( "" );
        final HadoopShim hadoopShim = hadoopConfiguration.getHadoopShim();
        final Configuration configuration = hadoopShim.createConfiguration();
        configuration.set("fs.defaultFS", uri.toString());
        // 解决用户名鉴权和kerberos不能同时访问问题
        configuration.set("ipc.client.fallback-to-simple-auth-allowed", "true");

        FileSystem fileSystem = null;
// 在新增hadoop cluster时,选择core-site.xml,hdfs-site.xml、hdfs.keytab、krb5.conf等文件插入数据库中;在此处根据URL(例如:hdfs://localhost:9000/)去表里读取鉴权和配置信息
        EtlElementAuthService rElementAuthService = SpringUtil.getBean(EtlElementAuthService.class);
        QueryWrapper<EtlElementAuthEntity> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("url", uri.toString());
        queryWrapper.eq("del_flag", "0");
        EtlElementAuthEntity etlElementAuthEntity = rElementAuthService.getOne(queryWrapper);

        if(ObjectUtil.isNull(etlElementAuthEntity)) {
            throw new Exception("此hadoop cluster不存在鉴权信息");
        }

        if(EtlElementAuthEntity.KERBEROS.equals(etlElementAuthEntity.getKerberosAuth())) {
            // kerberos鉴权
            org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
            //base64文件转file,保存
            String kerberosConf = etlElementAuthEntity.getKerberosConf();
            String kerberosKeytab = etlElementAuthEntity.getKerberosKeytab();
            String kerberosUser = etlElementAuthEntity.getKerberosUser();
            String uuid = IdUtil.randomUUID();
            String confName = base64ToFile(kerberosConf,"conf" + uuid);
            String keytabName = base64ToFile(kerberosKeytab,"keytab" + uuid);

            System.setProperty("java.security.krb5.conf", confName);

            String coreSite = etlElementAuthEntity.getCoreSite();
            ByteArrayInputStream coreSiteInputStream = new ByteArrayInputStream(base64ToByte(coreSite));
            String hdfsSite = etlElementAuthEntity.getHdfsSite();
            ByteArrayInputStream hdfsSiteInputStream = new ByteArrayInputStream(base64ToByte(hdfsSite));
            conf.addResource(coreSiteInputStream);
            conf.addResource(hdfsSiteInputStream);

            conf.set("fs.defaultFS", uri.toString());

            UserGroupInformation.setConfiguration(conf);
            configuration.set("HADOOP_USER_NAME", "hadoop");
            System.out.println("Security enabled " + UserGroupInformation.isSecurityEnabled());
            try {
                UserGroupInformation.loginUserFromKeytab(kerberosUser, keytabName);
            } catch (IOException e) {
                e.printStackTrace();
                throw new KettleDatabaseException("kerbors认证失败",e);
            }finally {
                //验证完之后删除文件
                File file=new File(confName);
                file.delete();
                File keytab=new File(keytabName);
                keytab.delete();
            }

            fileSystem = (FileSystem) hadoopShim.getFileSystem( configuration ).getDelegate();

        } else {
            if(StrUtil.isNotBlank(namedCluster.getHdfsUsername())) {
                fileSystem = (FileSystem) hadoopShim.getFileSystem(uri, configuration, namedCluster.getHdfsUsername() ).getDelegate();
            } else {
                fileSystem = (FileSystem) hadoopShim.getFileSystem( configuration ).getDelegate();
            }
        }


        if ( fileSystem instanceof LocalFileSystem ) {
            LOGGER.error(  "Got a local filesystem, was expecting an hdfs connection" );
            throw new IOException( "Got a local filesystem, was expecting an hdfs connection" );
        }

        return new HadoopFileSystemImpl(new HadoopFileSystemCallable() {
            @Override
            public FileSystem getFileSystem() {
                try {
                    if( EtlElementAuthEntity.USERPASSWORD.equals(etlElementAuthEntity.getKerberosAuth()) && StrUtil.isNotBlank(namedCluster.getHdfsUsername())) {
                        return (FileSystem) hadoopShim.getFileSystem( finalUri, configuration, namedCluster.getHdfsUsername() ).getDelegate();
                    } else {
                        return (FileSystem) hadoopShim.getFileSystem( finalUri, configuration, null ).getDelegate();
                    }

                } catch ( IOException e ) {
                    LOGGER.debug( "Error looking up/creating the file system ", e );
                    return null;
                } catch ( InterruptedException e ) {
                    LOGGER.debug( "Error looking up/creating the file system ", e );
                    return null;
                }
            }
        } );
    }


    @SneakyThrows
    public static byte[] base64ToByte(String base64) {
        File file = null;
        //创建文件目录
        int i = base64.indexOf(",");
        if(i>0){
            base64=base64.substring(i);
        }
        byte[] bytes = Base64.getMimeDecoder().decode(base64);
        return bytes;
    }


//BASE64解码成File文件
    @SneakyThrows
    public static String base64ToFile(String base64,String fileName) {
        File file = null;
        //创建文件目录
        int i = base64.indexOf(",");
        if(i>0){
            base64=base64.substring(i);
        }
        //截取逗号前的文件名字
        BufferedOutputStream bos = null;
        java.io.FileOutputStream fos = null;
        try {
            byte[] bytes = Base64.getMimeDecoder().decode(base64);
            file=new File(fileName);
            if(file.exists()){
                file.delete();
            }
            fos = new java.io.FileOutputStream(file);
            bos = new BufferedOutputStream(fos);
            bos.write(bytes);
            file.createNewFile();
            return fileName;
        }finally {
            if (bos != null) {
                try {
                    bos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (fos != null) {
                try {
                    fos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
复制代码  
 CREATE TABLE `kettle_bigdata_auth` (
  `id_element` bigint NOT NULL COMMENT 'PRIMARY KEY',
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'hadoop cluster url',
  `kerberos_auth` tinyint DEFAULT '0' COMMENT '是否kerberos认证, 0:否 1:是',
  `kerberos_conf` longblob COMMENT 'kerberos conf文件',
  `kerberos_keytab` longblob COMMENT 'kerberos 秘钥文件',
  `kerberos_user` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'kerberos认证用户名',
  `principal` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'kerberos用户名',
  `core_site` longblob COMMENT 'core-site.xml文件',
  `hdfs_site` longblob COMMENT 'hdfs_site.xml文件',
  `del_flag` char(1) COLLATE utf8mb4_general_ci DEFAULT '0' COMMENT '删除标记',
  PRIMARY KEY (`id_element`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='hadoop cluster 鉴权相关表';
复制代码  

错误解决

1、ClassNotFoundException: org.apache.commons.io.Charsets

pom.xml中将commons-io版本升级,我这里用的是2.5版本

2、No FileSystem for scheme: hdfs

加上这个bean,类实例化时会加上

 @Bean(value = "hDFSFileProvider")
    @Scope("singleton")
    public HDFSFileProvider hDFSFileProvider() throws FileSystemException {
        HadoopFileSystemLocator hadoopFileSystemService = SpringUtil.getBean("hadoopFileSystemService");
        NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
        HDFSFileNameParser hdfsFileNameParser = SpringUtil.getBean("hdfsFileNameParser");
        return new HDFSFileProvider(hadoopFileSystemService, namedClusterService, hdfsFileNameParser, "hdfs");
    }
复制代码  

3、Could not initialize class org.apache.hadoop.security.UserGroupInformation

缺少hadoop-common依赖,为了方便直接将如下依赖都加上:

 <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
复制代码  

4、could not initialize class org.apache.hadoop.security.credentials

hadoop鉴权问题,前面内容已介绍

5、引入pentaho-hadoop-shims-hdp30-package依赖下载不了问题

只要排除如下依赖就可以了

 <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>pentaho-hadoop-shims-hdp30-package</artifactId>
            <version>8.2.2018.11.00-342</version>
            <type>zip</type>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.netbeans</groupId>
                    <artifactId>mof</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.eclipse.jetty</groupId>
                    <artifactId>jetty-xml</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>  

6、java.nio.file.FileSystemNotFoundException

见源码修改 -(3)

7、class org.apache.hive.service.rpc.thrift.TCLIService$Client has interface or…

隐约记得是依赖问题,可以按照上面的pomxml添加;

8、Got a local filesystem, was expecting an hdfs connection

原因是根据路径找到的不是hdfs schema; 而是file schema。 解决办法还是路径问题

9、SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]

hadoop集群kerberos鉴权问题,参考上文的:源码修改 – (5)

10、No rules applied to hdfs/xxx@FAYSON.COM

core-site.xml文件里少属性

 <property>
    <name>hadoop.security.auth_to_local</name>
    <value>RULE:[1:$1@$0](.*@QFAYSON.COME$)s/@QFAYSON.COME$//
RULE:[2:$1@$0](.*@QFAYSON.COME$)s/@QFAYSON.COME$//
DEFAULT</value>
</property>
复制代码  

11、Server asks us to fall back to SIMPLE auth, but this client is configured to only allow secure connections

需要配置允许同时kerberos认证和用户名密码认证。

 final Configuration configuration = hadoopShim.createConfiguration();
configuration.set("fs.defaultFS", uri.toString());
// 解决用户名鉴权和kerberos不能同时访问问题
configuration.set("ipc.client.fallback-to-simple-auth-allowed", "true");
复制代码  

注意

在需要用到HadoopFileInput、HadoopFileOutput的地方都需要手动赋值。例如:

 PluginInterface sp = PluginRegistry.getInstance().getPlugin(StepPluginType.class, pluginId);
            if(sp != null) {
                if(plugin instanceof HadoopFileOutput) {
                    StepMetaInterface stepMetaInterface = SpringUtil.getBean("hadoopFileOutputMeta");
                    clazz = stepMetaInterface.getClass();
                } else {
                    StepMetaInterface stepMetaInterface = PluginRegistry.getInstance().loadClass(sp, StepMetaInterface.class);
                    clazz = stepMetaInterface.getClass();
                }
            } else {
                sp = PluginRegistry.getInstance().getPlugin(JobEntryPluginType.class, pluginId);
                JobEntryInterface jobEntryInterface = PluginRegistry.getInstance().loadClass(sp, JobEntryInterface.class);
                clazz = jobEntryInterface.getClass();
            }  

文章来源:智云一二三科技

文章标题:这是一篇有深度的文章,没研究过kettle源码是看不懂的

文章地址:https://www.zhihuclub.com/194707.shtml

关于作者: 智云科技

热门文章

网站地图