您的位置 首页 golang

KubeEdge分析-beehive

beehive是kubeedge中的一个通信框架,整个kubeEdge的cloud和edge模块都依赖于beehive框架。

不弄清楚beehive的工作原理,就很难分析kubeEdge整体的逻辑。

目录结构

├── pkg│   ├── common│   │   ├── config│   │   │   ├── config.go│   │   │   └── config_test.go│   │   └── util│   │       ├── conn.go│   │       ├── conn_test.go│   │       ├── file_util.go│   │       └── parse_resource.go│   └── core│       ├── context│       │   ├── context.go│       │   ├── context_channel.go│       │   ├── context_channel_test.go│       │   ├── context_factory.go│       │   └── context_unixsocket.go│       ├── core.go│       ├── model│       │   └── message.go│       └── module.go

beehive的整体的代码量是比较少的,主要就是core中的几个文件
;初步看beehive实现了两种通信机制,一种是通过unixsocket,另一种是通过golang的channel。

使用

beehive并不是单独能运行的模块,而是直接被其他模块引用的。

    core.Register(MODULE)    // start all modules    core.Run()

首先对需要使用beehive的模块进行注册,都注册完成以后,调用Run

init

init是加载的时候默认调用的,

func init() {    modules = make(map[string]Module)    disabledModules = make(map[string]Module)    config.AddConfigChangeCallback(moduleChangeCallback{})    eventListener := config.EventListener{Name: "eventListener1"}    config.CONFIG.RegisterListener(eventListener, "modules.enabled")}

modules被保存到一个map中。disabledModules没啥用,就是可以配置禁用哪些模块。
其他和config相关的内容也没啥用,就是监控配置文件变化来控制模块的启用、停用的。

Register

register只是把模块以名称为key放到map中。

Run

func StartModules() {    beehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel)    modules := GetModules()    for name, module := range modules {        //Init the module        beehiveContext.AddModule(name)        //Assemble typeChannels for sendToGroup        beehiveContext.AddModuleGroup(name, module.Group())        go module.Start()        klog.Infof("Starting module %v", name)    }}

run方法调用了StartModules方法,这里详细分析一下

InitContext

参数MsgCtxTypeChannel是硬编码在源码中的,配的是"channel"

func InitContext(contextType string) {    once.Do(func() {        ctx, cancel := gocontext.WithCancel(gocontext.Background())        context = &beehiveContext{            ctx:    ctx,            cancel: cancel,        }        switch contextType {        case MsgCtxTypeChannel:            channelContext := NewChannelContext()            context.messageContext = channelContext            context.moduleContext = channelContext        default:            klog.Fatalf("Do not support context type:%s", contextType)        }    })}

创建了一个空的cancel context,并把context的引用和cancel方法保存到一个全局的beehiveContext变量中,定义如下:

type beehiveContext struct {    moduleContext  ModuleContext    messageContext MessageContext    ctx            gocontext.Context    cancel         gocontext.CancelFunc}

messageContext和moduleContext指向的是同一个用NewChannelContext创建的channelContext。

func NewChannelContext() *ChannelContext {    channelMap := make(map[string]chan model.Message)    moduleChannels := make(map[string]map[string]chan model.Message)    anonChannels := make(map[string]chan model.Message)    return &ChannelContext{        channels:     channelMap,        typeChannels: moduleChannels,        anonChannels: anonChannels,    }}

可以看到,channelContext实际上就包含3个空的map,map的key是一个string类型(从后面的代码看,是一个uuid),
channelMap和anonChannels比较简单,value就是一个类型为model.Message的channel;
moduleChannels比较复杂,value又是一个map,这个map的key是string,value是类型为model.Message的channel。

回到run中,接下来就是遍历每个之前注册的module

beehiveContext.AddModule

// AddModule adds module into module contextfunc AddModule(module string) {    context.moduleContext.AddModule(module)}
//AddModule adds module into module contextfunc (ctx *ChannelContext) AddModule(module string) {    channel := ctx.newChannel()    ctx.addChannel(module, channel)}
// New Channelfunc (ctx *ChannelContext) newChannel() chan model.Message {    channel := make(chan model.Message, ChannelSizeDefault)    return channel}// addChannel return chanfunc (ctx *ChannelContext) addChannel(module string, moduleCh chan model.Message) {    ctx.chsLock.Lock()    defer ctx.chsLock.Unlock()    ctx.channels[module] = moduleCh}

这里实际上就是给beehiveContext中的moduleContext添加了一个类型为model.Message的channel,并保存到channels这个map中。

beehiveContext.AddModuleGroup

// AddModuleGroup adds module into module context groupfunc AddModuleGroup(module, group string) {    context.moduleContext.AddModuleGroup(module, group)}
//AddModuleGroup adds modules into module context groupfunc (ctx *ChannelContext) AddModuleGroup(module, group string) {    if channel := ctx.getChannel(module); channel != nil {        ctx.addTypeChannel(module, group, channel)        return    }    klog.Warningf("Get bad module name %s when addmodulegroup", module)}

这里就是把刚才创建的channel以group的名字为key创建一个map,然后把整个map放到moduleContext的typeChannels中。

group的名字是硬编码在实现module接口的具体module上的,比如cloudhub的的Group返回的就是"cloudhub"

func (a *cloudHub) Group() string {    return "cloudhub"}

module.Start

最后就是调用每个module的start方法了。

Start是一个接口,每个具体的module都实现了自己的Start方法

具体module的Start方法,后续放到各个模块的分析中,这里就不仔细看了。

context

context目录下主要有context、context_channel以及context_factory。context中定义了接口,context_factory提供对外的操作接口实现,context_channel则是在context_factory中调用。context_factory与context_channel存在同名的接口。

Done

Done方法比较简单,直接调用了go context的Done方法

Cancel

Cancel调用的是beehiveContext的cancel方法,而beehiveContext的cancel方法就是创建go context时候返回的cancel方法

Cleanup

Cleanup调用的是context.moduleContext.Cleanup(module),也就是ChannelContext的Cleanup方法

func (ctx *ChannelContext) Cleanup(module string) {    if channel := ctx.getChannel(module); channel != nil {        ctx.delChannel(module)        // decrease probable exception of channel closing        time.Sleep(20 * time.Millisecond)        close(channel)    }}
// deleteChannel by module namefunc (ctx *ChannelContext) delChannel(module string) {    // delete module channel from channels map    ctx.chsLock.Lock()    _, exist := ctx.channels[module]    if !exist {        klog.Warningf("Failed to get channel, module:%s", module)        return    }    delete(ctx.channels, module)    ctx.chsLock.Unlock()    // delete module channel from typechannels map    ctx.typeChsLock.Lock()    for _, moduleMap := range ctx.typeChannels {        if _, exist := moduleMap[module]; exist {            delete(moduleMap, module)            break        }    }    ctx.typeChsLock.Unlock()}

这里就是根据module的名称,在ChannelContext的那几个map中删掉对应的channel的引用,
最后关闭channel.

Send

Send方法是调用context.messageContext.Send(module, message),也就是调用了ChannelContext的Send方法。

// Send send msg to a module. Todo: do not stuckfunc (ctx *ChannelContext) Send(module string, message model.Message) {    // avoid exception because of channel colsing    // TODO: need reconstruction    defer func() {        if exception := recover(); exception != nil {            klog.Warningf("Recover when send message, exception: %+v", exception)        }    }()    if channel := ctx.getChannel(module); channel != nil {        channel <- message        return    }    klog.Warningf("Get bad module name :%s when send message, do nothing", module)}

ChannelContext的Send方法根据module的名字先取出channel,然后放入message。

Receive

调用message, err := context.messageContext.Receive(module)。
和上面的Send对应,就是从module名称对应的channel中取出message

SendSync

// SendSync sends message in a sync wayfunc (ctx *ChannelContext) SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error) {    // avoid exception because of channel colsing    // TODO: need reconstruction    defer func() {        if exception := recover(); exception != nil {            klog.Warningf("Recover when sendsync message, exception: %+v", exception)        }    }()    if timeout <= 0 {        timeout = MessageTimeoutDefault    }    deadline := time.Now().Add(timeout)    // make sure to set sync flag    message.Header.Sync = true    // check req/resp channel    reqChannel := ctx.getChannel(module)    if reqChannel == nil {        return model.Message{}, fmt.Errorf("bad request module name(%s)", module)    }    sendTimer := time.NewTimer(timeout)    select {    case reqChannel <- message:    case <-sendTimer.C:        return model.Message{}, errors.New("timeout to send message")    }    sendTimer.Stop()    // new anonymous channel for response    anonChan := make(chan model.Message)    anonName := getAnonChannelName(message.GetID())    ctx.anonChsLock.Lock()    ctx.anonChannels[anonName] = anonChan    ctx.anonChsLock.Unlock()    defer func() {        ctx.anonChsLock.Lock()        delete(ctx.anonChannels, anonName)        close(anonChan)        ctx.anonChsLock.Unlock()    }()    var resp model.Message    respTimer := time.NewTimer(time.Until(deadline))    select {    case resp = <-anonChan:    case <-respTimer.C:        return model.Message{}, errors.New("timeout to get response")    }    respTimer.Stop()    return resp, nil}

上半部分就是在Send的基础上加了一个计时器,如果向通道写入消息超时,那么久返回空消息体和error信息。

下半部分创建了一个匿名的通道(实际上也是有名字的),并保存到ChannelContext的ctx.anonChannels这个map中。

需要注意的是,这里所谓的匿名通道,实际上是有名字的(message.GetID()),只是这个名字可能是message在创建时候生成的一个随机的uuid。

这个名字我们放到分析message的时候再看。

接下来就是在这个新的匿名通道上等待消息,并且也设置了超时时间,如果超时了,就返回空的消息和error。

总结一下,sendSync就是先发送一个message了,并以这个message的id创建一个新的通道,然后在这个通道上等待回复。

SendResponse

调用context.messageContext.SendResp(resp)。

// SendResp send resp for this message when using sync modefunc (ctx *ChannelContext) SendResp(message model.Message) {    anonName := getAnonChannelName(message.GetParentID())    ctx.anonChsLock.RLock()    defer ctx.anonChsLock.RUnlock()    if channel, exist := ctx.anonChannels[anonName]; exist {        channel <- message        return    }    klog.Warningf("Get bad anonName:%s when sendresp message, do nothing", anonName)}

sendResp和之前的sendSync是对应的,就是向sendSync中创建的匿名通道中,发送消息。

这里的通道名词是通过message.GetParentID()获取的,注意创建的时候,名词用的是GetID,这里怎么对应也放到message模块再分析。

SendToGroup

调用context.messageContext.SendToGroup(moduleType, message)
这里就是根据module type的名字查出对应的通道列表,然后分别调用Send方法

SendToGroupSync

调用context.messageContext.SendToGroupSync(moduleType, message, timeout)

// SendToGroupSync : broadcast the message to echo module channel, the module send response back anon channel// check timeout and the size of anon channelfunc (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error {    // avoid exception because of channel colsing    // TODO: need reconstruction    defer func() {        if exception := recover(); exception != nil {            klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception)        }    }()    if timeout <= 0 {        timeout = MessageTimeoutDefault    }    deadline := time.Now().Add(timeout)    channelList := ctx.getTypeChannel(moduleType)    if channelList == nil {        return fmt.Errorf("failed to get module type(%s) channel list", moduleType)    }    // echo module must sync a response,    // let anonchan size be module number    channelNumber := len(channelList)    anonChan := make(chan model.Message, channelNumber)    anonName := getAnonChannelName(message.GetID())    ctx.anonChsLock.Lock()    ctx.anonChannels[anonName] = anonChan    ctx.anonChsLock.Unlock()    cleanup := func() error {        ctx.anonChsLock.Lock()        delete(ctx.anonChannels, anonName)        close(anonChan)        ctx.anonChsLock.Unlock()        var uninvitedGuests int        // cleanup anonchan and check parentid for resp        for resp := range anonChan {            if resp.GetParentID() != message.GetID() {                uninvitedGuests++            }        }        if uninvitedGuests != 0 {            klog.Errorf("Get some unexpected:%d resp when sendToGroupsync message", uninvitedGuests)            return fmt.Errorf("got some unexpected(%d) resp", uninvitedGuests)        }        return nil    }    // make sure to set sync flag before sending    message.Header.Sync = true    var timeoutCounter int32    send := func(ch chan model.Message) {        sendTimer := time.NewTimer(time.Until(deadline))        select {        case ch <- message:            sendTimer.Stop()        case <-sendTimer.C:            atomic.AddInt32(&timeoutCounter, 1)        }    }    for _, channel := range channelList {        go send(channel)    }    sendTimer := time.NewTimer(time.Until(deadline))    ticker := time.NewTicker(TickerTimeoutDefault)    for {        // annonChan is full        if len(anonChan) == channelNumber {            break        }        select {        case <-ticker.C:        case <-sendTimer.C:            cleanup()            if timeoutCounter != 0 {                errInfo := fmt.Sprintf("timeout to send message, several %d timeout when send", timeoutCounter)                return fmt.Errorf(errInfo)            }            klog.Error("Timeout to sendToGroupsync message")            return fmt.Errorf("Timeout to send message")        }    }    return cleanup()}

SendToGroupSync并没有调用SendToGroup方法,而是直接调用了Send方法。
首先创建了一个size和group中channel数量一样的匿名通道,然后遍历通道,调用send发送消息,然后等待匿名通道收到消息的数量等于size。

还有个要注意的地方,这里并没有吧匿名通道的消息取出来返回出去,而是调用了cleanup方法直接清理通道。也就是说,用这种方式发送消息,只能得到是否消息都得到回复,而不能得到具体回复的内容。

model/message

message.go中定义了通道中使用的消息格式,

// Message structtype Message struct {    Header  MessageHeader `json:"header"`    Router  MessageRoute  `json:"route,omitempty"`    Content interface{}   `json:"content"`}//MessageRoute contains structure of messagetype MessageRoute struct {    // where the message come from    Source string `json:"source,omitempty"`    // where the message will broadcasted to    Group string `json:"group,omitempty"`    // what's the operation on resource    Operation string `json:"operation,omitempty"`    // what's the resource want to operate    Resource string `json:"resource,omitempty"`}//MessageHeader defines message header detailstype MessageHeader struct {    // the message uuid    ID string `json:"msg_id"`    // the response message parentid must be same with message received    // please use NewRespByMessage to new response message    ParentID string `json:"parent_msg_id,omitempty"`    // the time of creating    Timestamp int64 `json:"timestamp"`    // the flag will be set in sendsync    Sync bool `json:"sync,omitempty"`}

Message由MessageRoute、MessageHeader以及Content接口组成。 整个message都是json转换过来的。

这些类型上关联的方法,这里就不一一看了,主要都是些get、set方法,看几个特殊的:

UpdateID

//UpdateID returns message object updating its IDfunc (msg *Message) UpdateID() *Message {    msg.Header.ID = uuid.NewV4().String()    return msg}

可以看出,updateId是新生成了一个uuid放到header中。

NewMessage

func NewMessage(parentID string) *Message {    msg := &Message{}    msg.Header.ID = uuid.NewV4().String()    msg.Header.ParentID = parentID    msg.Header.Timestamp = time.Now().UnixNano() / 1e6    return msg}

生成一个新的message结构,需要传入parentID,自己的ID则是随机生成的

Clone

生成一个新的ID,其他字段与被clone的message一致

NewRespByMessage

// NewRespByMessage returns a new response message by a message receivedfunc (msg *Message) NewRespByMessage(message *Message, content interface{}) *Message {    return NewMessage(message.GetID()).SetRoute(message.GetSource(), message.GetGroup()).        SetResourceOperation(message.GetResource(), ResponseOperation).        FillBody(content)}

注意到NewMessage的参数是parentID,所以这个Response Message中,实际上就是用了收到的message的id作为parenet的id。
route则是用收到的message的source。


文章来源:智云一二三科技

文章标题:KubeEdge分析-beehive

文章地址:https://www.zhihuclub.com/305.shtml

关于作者: 智云科技

热门文章

网站地图