KubeEdge分析-beehive

beehive是kubeedge中的一个通信框架,整个kubeEdge的cloud和edge模块都依赖于beehive框架。

不弄清楚beehive的工作原理,就很难分析kubeEdge整体的逻辑。

目录结构

├── pkg
│   ├── common
│   │   ├── config
│   │   │   ├── config.go
│   │   │   └── config_test.go
│   │   └── util
│   │       ├── conn.go
│   │       ├── conn_test.go
│   │       ├── file_util.go
│   │       └── parse_resource.go
│   └── core
│       ├── context
│       │   ├── context.go
│       │   ├── context_channel.go
│       │   ├── context_channel_test.go
│       │   ├── context_factory.go
│       │   └── context_unixsocket.go
│       ├── core.go
│       ├── model
│       │   └── message.go
│       └── module.go

beehive的整体的代码量是比较少的,主要就是core中的几个文件
;初步看beehive实现了两种通信机制,一种是通过unixsocket,另一种是通过golang的channel。

使用

beehive并不是单独能运行的模块,而是直接被其他模块引用的。

    core.Register(MODULE)
    // start all modules
    core.Run()

首先对需要使用beehive的模块进行注册,都注册完成以后,调用Run

init

init是加载的时候默认调用的,

func init() {
    modules = make(map[string]Module)
    disabledModules = make(map[string]Module)
    config.AddConfigChangeCallback(moduleChangeCallback{})
    eventListener := config.EventListener{Name: "eventListener1"}
    config.CONFIG.RegisterListener(eventListener, "modules.enabled")
}

modules被保存到一个map中。disabledModules没啥用,就是可以配置禁用哪些模块。
其他和config相关的内容也没啥用,就是监控配置文件变化来控制模块的启用、停用的。

Register

register只是把模块以名称为key放到map中。

Run

func StartModules() {
    beehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel)

    modules := GetModules()
    for name, module := range modules {
        //Init the module
        beehiveContext.AddModule(name)
        //Assemble typeChannels for sendToGroup
        beehiveContext.AddModuleGroup(name, module.Group())
        go module.Start()
        klog.Infof("Starting module %v", name)
    }
}

run方法调用了StartModules方法,这里详细分析一下

InitContext

参数MsgCtxTypeChannel是硬编码在源码中的,配的是"channel"

func InitContext(contextType string) {
    once.Do(func() {
        ctx, cancel := gocontext.WithCancel(gocontext.Background())
        context = &beehiveContext{
            ctx:    ctx,
            cancel: cancel,
        }
        switch contextType {
        case MsgCtxTypeChannel:
            channelContext := NewChannelContext()
            context.messageContext = channelContext
            context.moduleContext = channelContext
        default:
            klog.Fatalf("Do not support context type:%s", contextType)
        }
    })
}

创建了一个空的cancel context,并把context的引用和cancel方法保存到一个全局的beehiveContext变量中,定义如下:

type beehiveContext struct {
    moduleContext  ModuleContext
    messageContext MessageContext
    ctx            gocontext.Context
    cancel         gocontext.CancelFunc
}

messageContext和moduleContext指向的是同一个用NewChannelContext创建的channelContext。

func NewChannelContext() *ChannelContext {
    channelMap := make(map[string]chan model.Message)
    moduleChannels := make(map[string]map[string]chan model.Message)
    anonChannels := make(map[string]chan model.Message)
    return &ChannelContext{
        channels:     channelMap,
        typeChannels: moduleChannels,
        anonChannels: anonChannels,
    }
}

可以看到,channelContext实际上就包含3个空的map,map的key是一个string类型(从后面的代码看,是一个uuid),
channelMap和anonChannels比较简单,value就是一个类型为model.Message的channel;
moduleChannels比较复杂,value又是一个map,这个map的key是string,value是类型为model.Message的channel。

回到run中,接下来就是遍历每个之前注册的module

beehiveContext.AddModule

// AddModule adds module into module context
func AddModule(module string) {
    context.moduleContext.AddModule(module)
}
//AddModule adds module into module context
func (ctx *ChannelContext) AddModule(module string) {
    channel := ctx.newChannel()
    ctx.addChannel(module, channel)
}
// New Channel
func (ctx *ChannelContext) newChannel() chan model.Message {
    channel := make(chan model.Message, ChannelSizeDefault)
    return channel
}


// addChannel return chan
func (ctx *ChannelContext) addChannel(module string, moduleCh chan model.Message) {
    ctx.chsLock.Lock()
    defer ctx.chsLock.Unlock()

    ctx.channels[module] = moduleCh
}

这里实际上就是给beehiveContext中的moduleContext添加了一个类型为model.Message的channel,并保存到channels这个map中。

beehiveContext.AddModuleGroup

// AddModuleGroup adds module into module context group
func AddModuleGroup(module, group string) {
    context.moduleContext.AddModuleGroup(module, group)
}
//AddModuleGroup adds modules into module context group
func (ctx *ChannelContext) AddModuleGroup(module, group string) {
    if channel := ctx.getChannel(module); channel != nil {
        ctx.addTypeChannel(module, group, channel)
        return
    }
    klog.Warningf("Get bad module name %s when addmodulegroup", module)
}

这里就是把刚才创建的channel以group的名字为key创建一个map,然后把整个map放到moduleContext的typeChannels中。

group的名字是硬编码在实现module接口的具体module上的,比如cloudhub的的Group返回的就是"cloudhub"

func (a *cloudHub) Group() string {
    return "cloudhub"
}

module.Start

最后就是调用每个module的start方法了。

Start是一个接口,每个具体的module都实现了自己的Start方法

具体module的Start方法,后续放到各个模块的分析中,这里就不仔细看了。

context

context目录下主要有context、context_channel以及context_factory。context中定义了接口,context_factory提供对外的操作接口实现,context_channel则是在context_factory中调用。context_factory与context_channel存在同名的接口。

Done

Done方法比较简单,直接调用了go context的Done方法

Cancel

Cancel调用的是beehiveContext的cancel方法,而beehiveContext的cancel方法就是创建go context时候返回的cancel方法

Cleanup

Cleanup调用的是context.moduleContext.Cleanup(module),也就是ChannelContext的Cleanup方法

func (ctx *ChannelContext) Cleanup(module string) {
    if channel := ctx.getChannel(module); channel != nil {
        ctx.delChannel(module)
        // decrease probable exception of channel closing
        time.Sleep(20 * time.Millisecond)
        close(channel)
    }
}
// deleteChannel by module name
func (ctx *ChannelContext) delChannel(module string) {
    // delete module channel from channels map
    ctx.chsLock.Lock()
    _, exist := ctx.channels[module]
    if !exist {
        klog.Warningf("Failed to get channel, module:%s", module)
        return
    }
    delete(ctx.channels, module)

    ctx.chsLock.Unlock()

    // delete module channel from typechannels map
    ctx.typeChsLock.Lock()
    for _, moduleMap := range ctx.typeChannels {
        if _, exist := moduleMap[module]; exist {
            delete(moduleMap, module)
            break
        }
    }
    ctx.typeChsLock.Unlock()
}

这里就是根据module的名称,在ChannelContext的那几个map中删掉对应的channel的引用,
最后关闭channel.

Send

Send方法是调用context.messageContext.Send(module, message),也就是调用了ChannelContext的Send方法。

// Send send msg to a module. Todo: do not stuck
func (ctx *ChannelContext) Send(module string, message model.Message) {
    // avoid exception because of channel colsing
    // TODO: need reconstruction
    defer func() {
        if exception := recover(); exception != nil {
            klog.Warningf("Recover when send message, exception: %+v", exception)
        }
    }()

    if channel := ctx.getChannel(module); channel != nil {
        channel <- message
        return
    }
    klog.Warningf("Get bad module name :%s when send message, do nothing", module)
}

ChannelContext的Send方法根据module的名字先取出channel,然后放入message。

Receive

调用message, err := context.messageContext.Receive(module)。
和上面的Send对应,就是从module名称对应的channel中取出message

SendSync

// SendSync sends message in a sync way
func (ctx *ChannelContext) SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error) {
    // avoid exception because of channel colsing
    // TODO: need reconstruction
    defer func() {
        if exception := recover(); exception != nil {
            klog.Warningf("Recover when sendsync message, exception: %+v", exception)
        }
    }()

    if timeout <= 0 {
        timeout = MessageTimeoutDefault
    }
    deadline := time.Now().Add(timeout)

    // make sure to set sync flag
    message.Header.Sync = true

    // check req/resp channel
    reqChannel := ctx.getChannel(module)
    if reqChannel == nil {
        return model.Message{}, fmt.Errorf("bad request module name(%s)", module)
    }

    sendTimer := time.NewTimer(timeout)
    select {
    case reqChannel <- message:
    case <-sendTimer.C:
        return model.Message{}, errors.New("timeout to send message")
    }
    sendTimer.Stop()

    // new anonymous channel for response
    anonChan := make(chan model.Message)
    anonName := getAnonChannelName(message.GetID())
    ctx.anonChsLock.Lock()
    ctx.anonChannels[anonName] = anonChan
    ctx.anonChsLock.Unlock()
    defer func() {
        ctx.anonChsLock.Lock()
        delete(ctx.anonChannels, anonName)
        close(anonChan)
        ctx.anonChsLock.Unlock()
    }()

    var resp model.Message
    respTimer := time.NewTimer(time.Until(deadline))
    select {
    case resp = <-anonChan:
    case <-respTimer.C:
        return model.Message{}, errors.New("timeout to get response")
    }
    respTimer.Stop()

    return resp, nil
}

上半部分就是在Send的基础上加了一个计时器,如果向通道写入消息超时,那么久返回空消息体和error信息。

下半部分创建了一个匿名的通道(实际上也是有名字的),并保存到ChannelContext的ctx.anonChannels这个map中。

需要注意的是,这里所谓的匿名通道,实际上是有名字的(message.GetID()),只是这个名字可能是message在创建时候生成的一个随机的uuid。

这个名字我们放到分析message的时候再看。

接下来就是在这个新的匿名通道上等待消息,并且也设置了超时时间,如果超时了,就返回空的消息和error。

总结一下,sendSync就是先发送一个message了,并以这个message的id创建一个新的通道,然后在这个通道上等待回复。

SendResponse

调用context.messageContext.SendResp(resp)。

// SendResp send resp for this message when using sync mode
func (ctx *ChannelContext) SendResp(message model.Message) {
    anonName := getAnonChannelName(message.GetParentID())

    ctx.anonChsLock.RLock()
    defer ctx.anonChsLock.RUnlock()
    if channel, exist := ctx.anonChannels[anonName]; exist {
        channel <- message
        return
    }

    klog.Warningf("Get bad anonName:%s when sendresp message, do nothing", anonName)
}

sendResp和之前的sendSync是对应的,就是向sendSync中创建的匿名通道中,发送消息。

这里的通道名词是通过message.GetParentID()获取的,注意创建的时候,名词用的是GetID,这里怎么对应也放到message模块再分析。

SendToGroup

调用context.messageContext.SendToGroup(moduleType, message)
这里就是根据module type的名字查出对应的通道列表,然后分别调用Send方法

SendToGroupSync

调用context.messageContext.SendToGroupSync(moduleType, message, timeout)

// SendToGroupSync : broadcast the message to echo module channel, the module send response back anon channel
// check timeout and the size of anon channel
func (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error {
    // avoid exception because of channel colsing
    // TODO: need reconstruction
    defer func() {
        if exception := recover(); exception != nil {
            klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception)
        }
    }()

    if timeout <= 0 {
        timeout = MessageTimeoutDefault
    }
    deadline := time.Now().Add(timeout)

    channelList := ctx.getTypeChannel(moduleType)
    if channelList == nil {
        return fmt.Errorf("failed to get module type(%s) channel list", moduleType)
    }

    // echo module must sync a response,
    // let anonchan size be module number
    channelNumber := len(channelList)
    anonChan := make(chan model.Message, channelNumber)
    anonName := getAnonChannelName(message.GetID())
    ctx.anonChsLock.Lock()
    ctx.anonChannels[anonName] = anonChan
    ctx.anonChsLock.Unlock()

    cleanup := func() error {
        ctx.anonChsLock.Lock()
        delete(ctx.anonChannels, anonName)
        close(anonChan)
        ctx.anonChsLock.Unlock()

        var uninvitedGuests int
        // cleanup anonchan and check parentid for resp
        for resp := range anonChan {
            if resp.GetParentID() != message.GetID() {
                uninvitedGuests++
            }
        }
        if uninvitedGuests != 0 {
            klog.Errorf("Get some unexpected:%d resp when sendToGroupsync message", uninvitedGuests)
            return fmt.Errorf("got some unexpected(%d) resp", uninvitedGuests)
        }
        return nil
    }

    // make sure to set sync flag before sending
    message.Header.Sync = true

    var timeoutCounter int32
    send := func(ch chan model.Message) {
        sendTimer := time.NewTimer(time.Until(deadline))
        select {
        case ch <- message:
            sendTimer.Stop()
        case <-sendTimer.C:
            atomic.AddInt32(&timeoutCounter, 1)
        }
    }
    for _, channel := range channelList {
        go send(channel)
    }

    sendTimer := time.NewTimer(time.Until(deadline))
    ticker := time.NewTicker(TickerTimeoutDefault)
    for {
        // annonChan is full
        if len(anonChan) == channelNumber {
            break
        }
        select {
        case <-ticker.C:
        case <-sendTimer.C:
            cleanup()
            if timeoutCounter != 0 {
                errInfo := fmt.Sprintf("timeout to send message, several %d timeout when send", timeoutCounter)
                return fmt.Errorf(errInfo)
            }
            klog.Error("Timeout to sendToGroupsync message")
            return fmt.Errorf("Timeout to send message")
        }
    }

    return cleanup()
}

SendToGroupSync并没有调用SendToGroup方法,而是直接调用了Send方法。
首先创建了一个size和group中channel数量一样的匿名通道,然后遍历通道,调用send发送消息,然后等待匿名通道收到消息的数量等于size。

还有个要注意的地方,这里并没有吧匿名通道的消息取出来返回出去,而是调用了cleanup方法直接清理通道。也就是说,用这种方式发送消息,只能得到是否消息都得到回复,而不能得到具体回复的内容。

model/message

message.go中定义了通道中使用的消息格式,

// Message struct
type Message struct {
    Header  MessageHeader `json:"header"`
    Router  MessageRoute  `json:"route,omitempty"`
    Content interface{}   `json:"content"`
}

//MessageRoute contains structure of message
type MessageRoute struct {
    // where the message come from
    Source string `json:"source,omitempty"`
    // where the message will broadcasted to
    Group string `json:"group,omitempty"`

    // what's the operation on resource
    Operation string `json:"operation,omitempty"`
    // what's the resource want to operate
    Resource string `json:"resource,omitempty"`
}

//MessageHeader defines message header details
type MessageHeader struct {
    // the message uuid
    ID string `json:"msg_id"`
    // the response message parentid must be same with message received
    // please use NewRespByMessage to new response message
    ParentID string `json:"parent_msg_id,omitempty"`
    // the time of creating
    Timestamp int64 `json:"timestamp"`
    // the flag will be set in sendsync
    Sync bool `json:"sync,omitempty"`
}

Message由MessageRoute、MessageHeader以及Content接口组成。 整个message都是json转换过来的。

这些类型上关联的方法,这里就不一一看了,主要都是些get、set方法,看几个特殊的:

UpdateID

//UpdateID returns message object updating its ID
func (msg *Message) UpdateID() *Message {
    msg.Header.ID = uuid.NewV4().String()
    return msg
}

可以看出,updateId是新生成了一个uuid放到header中。

NewMessage

func NewMessage(parentID string) *Message {
    msg := &Message{}
    msg.Header.ID = uuid.NewV4().String()
    msg.Header.ParentID = parentID
    msg.Header.Timestamp = time.Now().UnixNano() / 1e6
    return msg
}

生成一个新的message结构,需要传入parentID,自己的ID则是随机生成的

Clone

生成一个新的ID,其他字段与被clone的message一致

NewRespByMessage

// NewRespByMessage returns a new response message by a message received
func (msg *Message) NewRespByMessage(message *Message, content interface{}) *Message {
    return NewMessage(message.GetID()).SetRoute(message.GetSource(), message.GetGroup()).
        SetResourceOperation(message.GetResource(), ResponseOperation).
        FillBody(content)
}

注意到NewMessage的参数是parentID,所以这个Response Message中,实际上就是用了收到的message的id作为parenet的id。
route则是用收到的message的source。


发表评论

您的电子邮箱地址不会被公开。