您的位置 首页 java

0069-如何使用Java连接Kerberos的Kafka

1.文档编写目的


Kafka从0.8版本以后出了新的API接口,用于异步方式发送消息,性能优于旧的API,本篇文章主要使用新的API接口进行测试。继上一篇文章

如何通过Cloudera Manager为 kafka 启用Kerberos及使用

,本篇文章主要讲述如何使用 Java 连接Kerberos的Kafka集群生产和消费消息。

  • 内容概述

1.环境准备

2.创建Java工程

3.编写生产消息代码

4.编写消费消息代码

5.测试

  • 测试环境

1.RedHat7.2

2.CM和CDH版本为5.11.2

3.Kafka2.2.0-0.10.2

  • 前置条件

1.Intellij已安装且正常运行

2.Maven环境正常

2.环境准备


1.创建topic,test3有3个replication,3个partition

[ec2-user@ip-172-31-22-86~] $ kafka-topics –create –zookeeper ip-172-31-22-86.ap-southeast-1.compute. internal :2181 –replication-factor 3 –partitions 3 –topic test3

0069-如何使用Java连接Kerberos的Kafka

2.krb5.conf配置(直接使用CDH集群的Kerberos配置)

# Configuration snippets may beplaced in this directory as well

includedir /etc/krb5.conf.d/

[logging]

default = FILE:/var/log/krb5libs.log

kdc =FILE:/var/log/krb5kdc.log

admin_server = FILE:/var/log/kadmind.log

[libdefaults]

dns_lookup_realm = false

ticket_lifetime = 24h

renew_lifetime = 7d

forwardable = true

rdns = false

default_realm = CLOUDERA.COM

#default_ccache_name = KEYRING:persistent:%{uid}

[realms]

CLOUDERA.COM = {

kdc =ip-172-31-22-86.ap-southeast-1.compute.internal

admin_server = ip-172-31-22-86.ap-southeast-1.compute.internal

}

[domain_realm]

.ip-172-31-22-86.ap-southeast-1.compute.internal= CLOUDERA.COM

ip-172-31-22-86.ap-southeast-1.compute.internal= CLOUDERA.COM

3.Kerberos的keytab文件

使用kadmin为Kerberos账号生成keytab,fayson.keytab文件生成在当前目录下。

[ec2-user@ip-172-31-22-86~] $ sudo kadmin.local

Authenticating as principal hdfs/admin@CLOUDERA.COM with password.

kadmin.local: xst -norandkey -k fayson.keytab fayson@CLOUDERA.COM

kadmin.local: exit

[ec2-user@ip-172-31-22-86~] $

0069-如何使用Java连接Kerberos的Kafka

4.jaas-cache.conf配置文件

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

keyTab= “/Volumes/Transcend/keytab/fayson.keytab”

principal= “fayson@CLOUDERA.COM” ;

};

5.在当前开发环境下配置集群的主机信息到hosts文件

在/etc/hosts文件中添加

0069-如何使用Java连接Kerberos的Kafka

提示:Fayson使用的AWS环境,所以使用公网IP和hostname对应。如果你的开发环境可以直连Hadoop集群,可以直接配置Hadoop内网IP和hostname对应即可。

3.创建Java工程


1.使用Intellij创建Java Maven工程

2.在pom.xml配置文件中增加Kafka API的Maven依赖

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.2.0</version>
</dependency>
 
0069-如何使用Java连接Kerberos的Kafka

4.编写生产消息代码


package com.cloudera ;

import org.apache.kafka.clients.producer.KafkaProducer ;

import org.apache.kafka.clients.producer.Producer ;

import org.apache.kafka.clients.producer.ProducerConfig ;

import org.apache.kafka.clients.producer.ProducerRecord ;

import java.util.Properties ;

/**

* Created by fayson on 2017/10/24.

*/

public class MyProducer {

public static String TOPIC_NAME = “test3” ;

public static void main( String [] args){

System . setProperty ( “java.security.krb5.conf” , “/Volumes/Transcend/keytab/krb5.conf” );

System . setProperty ( “java.security.auth.login.config” , “/Volumes/Transcend/keytab/jaas-cache.conf” );

System . setProperty ( “javax.security.auth.useSubjectCredsOnly” , “false” );

// System.setProperty(“sun.security.krb5.debug”,”true”);

Properties props = new Properties();

props.put( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , “ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020” );

props.put( ProducerConfig . ACKS_CONFIG , “all” );

props.put( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , “org.apache.kafka.common.serialization.StringSerializer” );

props.put( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , “org.apache.kafka.common.serialization.StringSerializer” );

props.put( “security.protocol” , “SASL_PLAINTEXT” );

props.put( “sasl.kerberos.service.name” , “kafka” );

Producer < String , String > producer = new KafkaProducer< String , String >(props);

for ( int i = 0 ; i < 10 ; i++) {

String key = “key-” + i;

String message = “Message-” + i;

ProducerRecord record= new ProducerRecord< String , String >( TOPIC_NAME , key, message);

producer.send(record);

System . out .println(key + “—-” + message);

}

producer.close();

}

}

5.编写消费消息代码


package com.cloudera ;

import org.apache.kafka.clients.consumer. *;

import org.apache.kafka.common.TopicPartition ;

import java.util.Arrays ;

import java.util.Properties ;

/**

* Created by fayson on 2017/10/24.

*/

public class MyConsumer {

private static String TOPIC_NAME = “test3” ;

public static void main( String [] args){

System . setProperty ( “java.security.krb5.conf” , “/Volumes/Transcend/keytab/krb5.conf” );

System . setProperty ( “java.security.auth.login.config” , “/Volumes/Transcend/keytab/jaas-cache.conf” );

System . setProperty ( “javax.security.auth.useSubjectCredsOnly” , “false” );

Properties props = new Properties();

props.put( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , “ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020” );

props.put( ConsumerConfig . GROUP_ID_CONFIG , “DemoConsumer” );

props.put( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , “org.apache.kafka.common.serialization.StringDeserializer” );

props.put( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , “org.apache.kafka.common.serialization.StringDeserializer” );

props.put( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , “true” );

props.put( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG , “1000” );

props.put( “security.protocol” , “SASL_PLAINTEXT” );

props.put( “sasl.kerberos.service.name” , “kafka” );

KafkaConsumer < String , String > consumer = new KafkaConsumer< String , String >(props);

TopicPartition partition0= new TopicPartition( TOPIC_NAME , 0 );

TopicPartition partition1= new TopicPartition( TOPIC_NAME , 1 );

TopicPartition partition2= new TopicPartition( TOPIC_NAME , 2 );

consumer.assign( Arrays . asList (partition0,partition1, partition2));

ConsumerRecords < String , String > records = null ;

while ( true ){

try {

Thread . sleep ( 10000l );

System . out .println();

records = consumer.poll( Long . MAX_VALUE );

for ( ConsumerRecord < String , String > record : records) {

System . out .println( “Receivedmessage: (” + record.key() + “,” + record.value() + “) at offset ” + record.offset());

}

} catch ( InterruptedException e){

e.printStackTrace();

}

}

}

}

6.代码测试


1.执行消费程序,消费topic为test3的所有partition消息

0069-如何使用Java连接Kerberos的Kafka

启动成功,等待消费test3的消息

0069-如何使用Java连接Kerberos的Kafka

2.执行生产消息程序,向test3的topic生产消息

0069-如何使用Java连接Kerberos的Kafka

向test3的topic发送的消息

0069-如何使用Java连接Kerberos的Kafka

3.查看消费程序读取到的消息

0069-如何使用Java连接Kerberos的Kafka

7.总结


在开发环境下通过Java代码直接连接到已启用Kerberos的Kafka集群时,则需要将krb5.conf和jaas.conf配置加载到程序运行环境中。至于使用Kerberos密码的方式Fayson也不会。

测试使用的topic有3个partiton,如果没有将所有的broker列表配置到bootstrap.servers中,会导致部分消息丢失。

参考文档:

#api

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


您可能还想看

安装


CENTOS6.5安装CDH5.12.1(一)

CENTOS6.5安装CDH5.12.1(二)

CENTOS7.2安装CDH5.10和Kudu1.2(一)

CENTOS7.2安装CDH5.10和Kudu1.2(二)

如何在CDH中安装Kudu&Spark2&Kafka

如何升级Cloudera Manager和CDH

如何卸载CDH(附一键卸载github源码)

如何迁移Cloudera Manager节点

如何在Windows Server2008搭建DNS服务并配置泛域名解析

安全


如何在CDH集群启用Kerberos

如何在Hue中使用Sentry

如何在CDH启用Kerberos的情况下安装及使用Sentry(一)

如何在CDH启用Kerberos的情况下安装及使用Sentry(二)

如何在CDH未启用认证的情况下安装及使用Sentry

如何使用Sentry管理Hive外部表权限

如何使用Sentry管理Hive外部表(补充)

如何在Kerberos与非Kerberos的CDH集群BDR不可用时复制数据

Windows Kerberos客户端配置并访问CDH

数据科学


如何在CDSW中使用R绘制直方图

如何使用Python Impyla客户端连接Hive和 Impala

如何在CDH集群安装Anaconda&搭建Python私有源

如何使用CDSW在CDH中分布式运行所有R代码

如何使用CDSW在CDH集群通过sparklyr提交R的Spark作业

如何使用R连接Hive与Impala

如何在Redhat中安装R的包及搭建R的私有源

如何在Redhat中配置R环境

什么是sparklyr

其他


CDH网络要求(Lenovo参考架构)

大数据售前的中年危机

如何实现CDH元数据库MySQL的主备

如何在CDH中使用HPLSQL实现存储过程

如何在Hive&Impala中使用UDF

Hive多分隔符支持示例


推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

文章来源:智云一二三科技

文章标题:0069-如何使用Java连接Kerberos的Kafka

文章地址:https://www.zhihuclub.com/190969.shtml

关于作者: 智云科技

热门文章

网站地图