您的位置 首页 java

Java中使用Kafka

Kafka 客户端api分为5大类,常用的是前3类

AdminClient API:管理topic、broker之类的信息

Producer API:发布消息到一个或多个topic

Consumer API:订阅1个或多个topic,处理接收到的消息

Stream API:流处理,将输入流经过一些处理转换为输出流

Connector API:从一些源系统|应用中拉取数据到kafka

kafka -clients

依赖

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.8.0</version>

</dependency>

AdminClient API

可以在zk上查看已存在的topic

brokers/topics:包含topic_id、partitions、replicas、version

conig/topics:包含version、config

win班kafka删除topic时会出现kafka闪退的问题,可以在根据出错提示,在zk上删除对应的topic,并在kafka的本地日志中删除对应topic的文件,重启kafka即可。

[2021-05-30 16:40:52,583] ERROR Error while renaming dir for topic3-0 in log dir D:tmpkafka-logs (kafka.server.LogDirFailureChannel)

java .nio. File .AccessDenied Exception : D:tmpkafka-logstopic3-0 -> D:tmpkafka-logstopic3-0.c1d7bfc008ab43dfaf4506df1a063e3d-delete

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)

at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)

at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)

at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)

at java.nio.file.Files.move(Files.java:1395)

at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:904)

at kafka.log.Log.a n o n f u n anonfunanonfunrenameDir2 ( L o g . s c a l a : 1072 ) a t k a f k a . l o g . L o g . r e n a m e D i r ( L o g . s c a l a : 2453 ) a t k a f k a . l o g . L o g M a n a g e r . a s y n c D e l e t e ( L o g M a n a g e r . s c a l a : 991 ) a t k a f k a . l o g . L o g M a n a g e r . 2(Log.scala:1072) at kafka.log.Log.renameDir(Log.scala:2453) at kafka.log.LogManager.asyncDelete(LogManager.scala:991) at kafka.log.LogManager.2(Log.scala:1072)atkafka.log.Log.renameDir(Log.scala:2453)atkafka.log.LogManager.asyncDelete(LogManager.scala:991)atkafka.log.LogManager.anonfun$asyncDelete3 ( L o g M a n a g e r . s c a l a : 1026 ) a t s c a l a . O p t i o n . f o r e a c h ( O p t i o n . s c a l a : 437 ) a t k a f k a . l o g . L o g M a n a g e r . 3(LogManager.scala:1026) at scala.Option.foreach(Option.scala:437) at kafka.log.LogManager.3(LogManager.scala:1026)atscala.Option.foreach(Option.scala:437)atkafka.log.LogManager.anonfun$asyncDelete2 ( L o g M a n a g e r . s c a l a : 1024 ) a t k a f k a . l o g . L o g M a n a g e r . 2(LogManager.scala:1024) at kafka.log.LogManager.2(LogManager.scala:1024)atkafka.log.LogManager.anonfun$asyncDelete2 22adapted(LogManager.scala:1022)

at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:435)

at scala.collection.mutable.HashSet.foreach(HashSet.scala:361)

at kafka.log.LogManager.asyncDelete(LogManager.scala:1022)

at kafka.server.ReplicaManager.stopPartitions(ReplicaManager.scala:489)

at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:427)

at kafka.server.KafkaApis.handleStopReplica Request (KafkaApis.scala:284)

at kafka.server.KafkaApis.handle(KafkaApis.scala:172)

at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)

at java.lang.Thread.run(Thread.java:748)

Suppressed: java.nio.file.AccessDeniedException: D:tmpkafka-logstopic3-0 -> D:tmpkafka-logstopic3-0.c1d7bfc008ab43dfaf4506df1a063e3d-delete

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)

at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)

at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)

at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)

at java.nio.file.Files.move(Files.java:1395)

at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:901)

… 16 more

Producer

Producer是线程安全的,批量发送消息,并非逐条发送。

Producer客户端实现了负载均衡,会随机发送消息到某个par 分区上。可以自定义par规则,路由特定的消息到指定的 pat 上。

使用最多的是带 回调函数 的发送。

Consumer

Stream

stream用于处理kafka中的数据

Connect

用于kafka与 hadoop 、db等数据源之间的直接操作,类似于elk,日志从应用服务器经过一些处理到es服务器,都是配置,不用写代码。用得不多。

文章来源:智云一二三科技

文章标题:Java中使用Kafka

文章地址:https://www.zhihuclub.com/190958.shtml

关于作者: 智云科技

热门文章

网站地图