您的位置 首页 golang

Etcd服务注册与发现封装实现–golang

  • 服务注册 register.go
 package register

import (
	"fmt"
	"time"
	etcd3 "github.com/coreos/etcd/clientv3"
	"context"
	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
	"Test/common/service"
	"log"
)

type RegisterClient struct {
	etcdClient      *etcd3.Client
	EtcdEndpoints   []string
	refreshInterval time.Duration
	keyTTL          int
	serviceName     string
	host            string
	port            int
	stopSignal      chan bool
}

// 服务注册,命名方式"/service/{serviceName}/{serviceHost}:{servicePort}", 值为:"{serviceHost}:{servicePort}", 注册是当前服务本身而已,故可直接注册
//Tips: 如果需要,可以将值的内容转换成json格式,增减内容丰富度
func Register(serviceName string, host string, port int, EtcdEndpoints []string, refreshInterval time.Duration, keyTTL int) (*RegisterClient, error) {
	client := &RegisterClient{}
	client.serviceName = serviceName
	client.host = host
	client.port = port
	client.EtcdEndpoints = EtcdEndpoints
	client.refreshInterval = refreshInterval
	
	// 保证存活时间比刷新时间长
	if keyTTL < int(refreshInterval.Seconds()) {
		keyTTL = 2 * int(refreshInterval.Seconds())
	}
	client.keyTTL = keyTTL

	etcdClient, err := etcd3.New(etcd3.Config{
		Endpoints: client.EtcdEndpoints,
	})
	if err != nil {
		return nil, fmt.Errorf("Create etcd client failed: s%s", err)
	}
	client.stopSignal = make(chan bool, 1)
	client.etcdClient = etcdClient
	client.register()
	return client, nil
}

func (c *RegisterClient) register() error {

	go func() {
		// invoke self-register with ticker
		ticker := time.NewTicker(c.refreshInterval)
		for {
			// should get first, if not exist, set it
			resp, err := c.etcdClient.Grant(context.Background(), int64(c.keyTTL))
			if err != nil {
				log.Printf("Register service failed. serviceName: %s, err: %s", c.serviceName, err.Error())
			}
			fmt.Println(c.getKey())
			_, err = c.etcdClient.Get(context.Background(), c.getKey())
			value := fmt.Sprintf("%s:%d", c.host, c.port)

			if err != nil {
				if err == rpctypes.ErrKeyNotFound {
					if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value, etcd3.WithLease(resp.ID)); err != nil {
						//if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value); err != nil {
						log.Printf("Register service failed. serviceName: %s, value: %s, err: %s", c.serviceName, value, err.Error())
					}
				} else {
					log.Printf("Get Value from etcd failed: %s", err)
				}
			} else {

				if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value, etcd3.WithLease(resp.ID)); err != nil {
					//if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value); err != nil {
					log.Printf("Register service failed. serviceName: %s, value: %s, err: %s", c.serviceName, value, err.Error())
				} else {
					log.Printf("Register current service: %s, value: %s", c.serviceName, value)
				}
			}
			select {
			case <-c.stopSignal: // 手动停止注册
				return
			case <-ticker.C: // 等待一定时间间隔
			}
		}
	}()
	return nil
}

// 只有服务在注册之后,才能够取消注册
func (c *RegisterClient) UnRegister() error {
	c.stopSignal <- true

	_, err := c.etcdClient.Get(context.Background(), c.getKey())

	if err != nil {
		log.Printf("Get Value from etcd failed: %s", err.Error())
	} else if _, err := c.etcdClient.Delete(context.Background(), c.getKey()); err != nil {
		log.Printf("UnRegister service failed. serviceName: %s, err: %s", c.serviceName, err.Error())
	}
	return err
}

func (c *RegisterClient) getKey() string {
	return fmt.Sprintf("/%s/%s/%s:%d", service.Prefix, c.serviceName, c.host, c.port)
}
  
  • 服务发现 discovery.go
 /* Copyright 2018 Bruce Liu.  All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package discovery

import (
	etcd3 "github.com/coreos/etcd/clientv3"
	"fmt"
	"errors"
	"math/rand"
	"context"
	"Test/common/service"
	"github.com/coreos/etcd/mvcc/mvccpb"
	"log"
)

// 基本方式:本地维护一个服务发现的结果,优先从本地服务读取值. 第一次获取一个服务的地址时,因本地没有,故从etcd服务获取,然后换到到本地,并同时启动该服务的watcher,有更新时,及时更新本地缓存。
// Tips 一个服务需要发现多个其他服务,故需要有client,且后期通过client来获取服务的值
type DiscoveryClient struct {
	etcdClient    *etcd3.Client
	EtcdEndpoints []string

	// 服务列表: key 为service 名字 todo 本地缓存也需要有一个时效,以及一个反馈服务失效的地方,这个可以后期做服务降级之类的东西的时候再做
	serviceMap map[string][]string

	watcher map[string]bool
}

func NewClient(etcdEndpoints []string) (*DiscoveryClient, error) {
	client := &DiscoveryClient{}
	client.EtcdEndpoints = etcdEndpoints
	etcdClient, err := etcd3.New(etcd3.Config{
		Endpoints: client.EtcdEndpoints,
	})
	if err != nil {
		return nil, fmt.Errorf("Create register etcdClient failed: %s", err.Error())
	}
	client.etcdClient = etcdClient

	client.serviceMap = make(map[string][]string)

	client.watcher = make(map[string]bool)

	return client, nil
}

func (c *DiscoveryClient) GetService(serviceName string) (string, error) {

	if serviceName == "" {
		return "", errors.New("Service name should not be null")
	}

	// 从本地缓存的获取
	if len(c.serviceMap[serviceName]) > 0 {
		i := rand.Intn(len(c.serviceMap[serviceName]))
		return c.serviceMap[serviceName][i], nil
	}

	// 从 etcd服务获取
	resp, err := c.etcdClient.Get(context.Background(), c.getKey(serviceName), etcd3.WithPrefix())
	if err != nil {
		log.Printf("Get service failed: %s, error: %s", serviceName, err.Error())
		return "", err
	}

	// 建立watcher,有变化,及时更新 只启动一个watch?
	if !c.watcher[serviceName] {
		go c.watch(serviceName)
	}

	serviceList := service.ExtractAddress(resp)

	if len(serviceList) == 0 {
		return "", errors.New("Service not found.")
	}

	c.serviceMap[serviceName] = serviceList

	i := rand.Intn(len(c.serviceMap[serviceName]))
	return c.serviceMap[serviceName][i], nil
}

func (c *DiscoveryClient) getKey(serviceName string) string {
	return fmt.Sprintf("/%s/%s", service.Prefix, serviceName)
}

// 开始watch这个service的注册信息
func (c *DiscoveryClient) watch(serviceName string) {
	c.watcher[serviceName] = true
	defer func() {
		// 如果此函数退出,那么则可以重新建立watcher
		c.watcher[serviceName] = false
	}()
	// prefix is the etcd prefix/value to watch
	prefix := c.getKey(serviceName)
	log.Printf("Watch service : %s", serviceName)
	// 创建watch
	rch := c.etcdClient.Watch(context.Background(), prefix, etcd3.WithPrefix())
	for wresp := range rch {
		for _, ev := range wresp.Events {
			log.Printf("Catch changes: %s, event: %d", serviceName, ev.Type)
			switch ev.Type {
			case mvccpb.PUT:
				find := false
				for _, v := range c.serviceMap[serviceName] {
					if v == string(ev.Kv.Value) {
						find = true
						break
					}
				}
				if !find {
					log.Printf("Add new service: %s, value: %s", serviceName, string(ev.Kv.Value))
					c.serviceMap[serviceName] = append(c.serviceMap[serviceName], string(ev.Kv.Value))
				}
				break
			case mvccpb.DELETE:
				index := -1
				key := string(ev.Kv.Key)
				value := key[len(c.getKey(serviceName))+1:len(key)]

				for i, v := range c.serviceMap[serviceName] {
					if v == value {
						index = i
						break
					}
				}

				if index >= 0 && index < len(c.serviceMap[serviceName]) {
					log.Printf("Delete service: %s, value: %s", serviceName, string(ev.Kv.Value))
					c.serviceMap[serviceName] = append(c.serviceMap[serviceName][0:index], c.serviceMap[serviceName][index+1: len(c.serviceMap[serviceName])]...)
				}
				break
			}
		}
	}

}
  

文章来源:智云一二三科技

文章标题:Etcd服务注册与发现封装实现–golang

文章地址:https://www.zhihuclub.com/97250.shtml

关于作者: 智云科技

热门文章

网站地图