您的位置 首页 golang

Go语言(二十)日志采集项目(二)Etcd的使用


日志采集项目(二)Etcd的使用

ETCD 介绍

  • 概念: 高可用的分布式key-value存储,实现配置共享和服务发现
  • 类似项目: zookeeper和consul
  • 开发语言: Go
  • 接口: 提供restful的http接口,使用简单
  • 实现算法: 基于raft算法的强一致性,高可用的服务存储目录

ETCD的应用场景

  • 服务发现和注册
  • 配置中心
  • 分布式锁
  • master选举

ETCD环境搭建

[root@centos7-node1 etcd]# nohup ./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://0.0.0.0:2380 &        #启动etcd
  • etcdctl使用
[root@centos7-node1 ~]# cd /opt/application/etcd/[root@centos7-node1 etcd]# ./etcdctl --endpoints "http://localhost:2379" put /logagent/conf 333333[root@centos7-node1 etcd]# ./etcdctl --endpoints "http://localhost:2379" watch  /logagent/conf   [root@centos7-node1 etcd]# ./etcdctl --endpoints "http://localhost:2379" del /logagent/conf
  • go实现watch功能
安装v3插件go get go.etcd.io/etcd/clientv3

代码

package mainimport (   "context"   "fmt"   "go.etcd.io/etcd/clientv3"   "time")func main() {   client,err := clientv3.New(clientv3.Config{      Endpoints: []string{"192.168.56.11:2379"},      DialTimeout: time.Second*3,   })   defer client.Close()   fmt.Printf("conn succ\n")   for {      resultChan := client.Watch(context.Background(),"/logagent/conf")      for v := range resultChan{         if v.Err() != nil {            fmt.Printf("watch faild,err:%v\n",err)            continue         }         for _,e := range v.Events {            fmt.Printf("event_type:%v,key:%v,val:%v\n",e.Type,e.Kv.Key,string(e.Kv.Value))         }      }   }}
  • go 实现put功能
package mainimport (   "context"   "fmt"   "go.etcd.io/etcd/clientv3"   "time")func main() {   client,err := clientv3.New(clientv3.Config{      Endpoints: []string{"192.168.56.11:2379"},      DialTimeout: time.Second*3,   })   defer client.Close()   fmt.Printf("conn succ\n")   _,err = client.Put(context.Background(),"/logagent/conf","sddadas")   if err != nil {      fmt.Printf("Put faild,err:%v\n",err)   }}
  • kafka消费代码
package mainimport (   "fmt"   "github.com/Shopify/sarama"   "sync")var wg sync.WaitGroupfunc main() {   //连接配置   consumer,err := sarama.NewConsumer([]string{"192.168.56.11:9092"},nil)   if err != nil {      fmt.Printf("consumer message faild,error:%v\n",err)      return   }   fmt.Printf("conn succ\n")   pt,err := consumer.Partitions("nginx_log")   if err != nil {      fmt.Printf("get partions aild,err:%v\n",err)      return   }   for _,p := range pt {      pc, err := consumer.ConsumePartition("nginx_log",p,sarama.OffsetNewest)      if err !=  nil {         fmt.Printf("consumer faild,error:%v\n",err)         continue      }      wg.Add(1)      go func() {         for m := range pc.Messages() {            fmt.Printf("topic:%v,value:%v\n",m.Topic,string(m.Value))         }         wg.Done()      }()   }   wg.Wait()}

文章来源:智云一二三科技

文章标题:Go语言(二十)日志采集项目(二)Etcd的使用

文章地址:https://www.zhihuclub.com/319.shtml

关于作者: 智云科技

热门文章

网站地图