代码结构
log4j.properties
ZooKeeper .root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper .log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log
#
# ZooKeeper Logging Configuration
#
# Format is "<default threshold> (, <appender>)+
# DEFAULT: console appender only
log4j .rootLogger=${zookeeper.root.logger}
# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add ROLLINGFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add TRACEFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}
log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
APP
package com.huanfeng;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import Java .util.concurrent.CountDownLatch;
public class App {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
CountDown latch latch=new CountDownLatch(1);
//watch是session级别的,和 node 没有关系
//3000表示3秒钟,也就是只要客户端断开连接,那么此时的 session 绑定的数据只能存在三秒钟
ZooKeeper zk=new ZooKeeper("192.168.25.128:2181", 3000, new Watcher() {
public void process(WatchedEvent event) {
Event.KeeperState state=event.getState();
Event.EventType type=event.getType();
String path=event.getPath();
System.out.println(event.toString()+"session");
//状态
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
latch.countDown();
System.out.println("连接成功");
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
}
//类型
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
});
latch.await();
//这是获取客户端的状态
ZooKeeper.States sta te=zk.getState();
switch (state) {
case CONNECTING:
System.out.println("这个是连接中");
break;
case ASSOCIATING:
break;
case CONNECTED:
System.out.println("这个是连接完成");
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
//此时已经连接完成了
String nodeName = zk.create("/oooxxx", "dataaaa".get byte s(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Stat stat=new Stat();
//watcher是相当于观察的,有数据的时候会触发
byte[] data = zk.getData("/oooxxx", new Watcher() {
@ Override
public void process(WatchedEvent watchedEvent) {
System.out.println("gat /oooxxx");
System.out.println(watchedEvent.toString());
try {
//true表示new zk的watch
//this表示当前的watch
zk.getData("/oooxxx",true,stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, stat);
System.out.println(new String(data));
//源数据
Stat stat1 = zk.setData("/oooxxx", "newData".getBytes(), 0);
Stat stat2 = zk.setData("/oooxxx", "newData1".getBytes(), stat1.getVersion());
//异步的获取,可以 回调 获取数据,获取数据的回调方式
zk.getData("/oooxxx", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
System.out.println("回调获取");
//abc
System.out.println(o.toString());
System.out.println(new String(bytes));
}
},"abc");
Thread .sleep(2000000000);
}
}
DefaultWatch
package com.huanfeng.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
public class DefaultWatch implements Watcher {
CountDownLatch cc ;
public void setCc(CountDownLatch cc) {
this.cc = cc;
}
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
cc.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
}
}
}
MyConf
package com.huanfeng.config;
public class MyConf {
private String conf ;
public String getConf() {
return conf;
}
public void setConf(String conf) {
this.conf = conf;
}
}
TestConfig
package com.huanfeng.config;
import org.apache.zookeeper.ZooKeeper;
import org. junit .After;
import org.junit.Before;
import org.junit.Test;
public class TestConfig {
ZooKeeper zk;
@Before
public void conn (){
zk = ZKUtils.getZK();
}
@After
public void close (){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void getConf(){
WatchCallBack watch CallBack = new WatchCallBack();
watchCallBack.setZk(zk);
MyConf myConf = new MyConf();
watchCallBack.setConf(myConf);
watchCallBack.aWait();
//1,节点不存在
//2,节点存在
while(true){
if(myConf.getConf().equals("")){
watchCallBack.aWait();
}else{
System.out.println(myConf.getConf());
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
WatchCallBack
package com.huanfeng.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
ZooKeeper zk ;
MyConf conf ;
CountDownLatch cc = new CountDownLatch(1);
public MyConf getConf() {
return conf;
}
public void setConf(MyConf conf) {
this.conf = conf;
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public void aWait(){
zk.exists("/AppConf",this,this ,"ABC");
try {
cc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(data != null ){
String s = new String(data);
conf.setConf(s);
cc.countDown();
}
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat != null){
zk.getData("/AppConf",this,this,"sdfs");
}
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
zk.getData("/AppConf",this,this,"sdfs");
break;
case NodeDeleted:
//容忍性
conf.setConf("");
cc = new CountDownLatch(1);
break;
case NodeDataChanged:
zk.getData("/AppConf",this,this,"sdfs");
break;
case NodeChildrenChanged:
break;
}
}
}
ZKUtils
package com.huanfeng.config;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* 这个是一个工具类,通过这个工具类可以完成zookeeper对服务端的连接
*/
public class ZKUtils {
private static ZooKeeper zk;
private static String address = "192.168.25.128:2181/testLock";
private static DefaultWatch watch = new DefaultWatch();
private static CountDownLatch init = new CountDownLatch(1);
public static ZooKeeper getZK(){
try {
zk = new ZooKeeper(address,1000,watch);
watch.setCc(init);
init.await();
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
}