beehive是kubeedge中的一个通信框架,整个kubeEdge的cloud和edge模块都依赖于beehive框架。
不弄清楚beehive的工作原理,就很难分析kubeEdge整体的逻辑。
目录结构
├── pkg│ ├── common│ │ ├── config│ │ │ ├── config.go│ │ │ └── config_test.go│ │ └── util│ │ ├── conn.go│ │ ├── conn_test.go│ │ ├── file_util.go│ │ └── parse_resource.go│ └── core│ ├── context│ │ ├── context.go│ │ ├── context_channel.go│ │ ├── context_channel_test.go│ │ ├── context_factory.go│ │ └── context_unixsocket.go│ ├── core.go│ ├── model│ │ └── message.go│ └── module.go
beehive的整体的代码量是比较少的,主要就是core中的几个文件
;初步看beehive实现了两种通信机制,一种是通过unixsocket,另一种是通过golang的channel。
使用
beehive并不是单独能运行的模块,而是直接被其他模块引用的。
core.Register(MODULE) // start all modules core.Run()
首先对需要使用beehive的模块进行注册,都注册完成以后,调用Run
init
init是加载的时候默认调用的,
func init() { modules = make(map[string]Module) disabledModules = make(map[string]Module) config.AddConfigChangeCallback(moduleChangeCallback{}) eventListener := config.EventListener{Name: "eventListener1"} config.CONFIG.RegisterListener(eventListener, "modules.enabled")}
modules被保存到一个map中。disabledModules没啥用,就是可以配置禁用哪些模块。
其他和config相关的内容也没啥用,就是监控配置文件变化来控制模块的启用、停用的。
Register
register只是把模块以名称为key放到map中。
Run
func StartModules() { beehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel) modules := GetModules() for name, module := range modules { //Init the module beehiveContext.AddModule(name) //Assemble typeChannels for sendToGroup beehiveContext.AddModuleGroup(name, module.Group()) go module.Start() klog.Infof("Starting module %v", name) }}
run方法调用了StartModules方法,这里详细分析一下
InitContext
参数MsgCtxTypeChannel是硬编码在源码中的,配的是"channel"
func InitContext(contextType string) { once.Do(func() { ctx, cancel := gocontext.WithCancel(gocontext.Background()) context = &beehiveContext{ ctx: ctx, cancel: cancel, } switch contextType { case MsgCtxTypeChannel: channelContext := NewChannelContext() context.messageContext = channelContext context.moduleContext = channelContext default: klog.Fatalf("Do not support context type:%s", contextType) } })}
创建了一个空的cancel context,并把context的引用和cancel方法保存到一个全局的beehiveContext变量中,定义如下:
type beehiveContext struct { moduleContext ModuleContext messageContext MessageContext ctx gocontext.Context cancel gocontext.CancelFunc}
messageContext和moduleContext指向的是同一个用NewChannelContext创建的channelContext。
func NewChannelContext() *ChannelContext { channelMap := make(map[string]chan model.Message) moduleChannels := make(map[string]map[string]chan model.Message) anonChannels := make(map[string]chan model.Message) return &ChannelContext{ channels: channelMap, typeChannels: moduleChannels, anonChannels: anonChannels, }}
可以看到,channelContext实际上就包含3个空的map,map的key是一个string类型(从后面的代码看,是一个uuid),
channelMap和anonChannels比较简单,value就是一个类型为model.Message的channel;
moduleChannels比较复杂,value又是一个map,这个map的key是string,value是类型为model.Message的channel。
—
回到run中,接下来就是遍历每个之前注册的module
beehiveContext.AddModule
// AddModule adds module into module contextfunc AddModule(module string) { context.moduleContext.AddModule(module)}
//AddModule adds module into module contextfunc (ctx *ChannelContext) AddModule(module string) { channel := ctx.newChannel() ctx.addChannel(module, channel)}
// New Channelfunc (ctx *ChannelContext) newChannel() chan model.Message { channel := make(chan model.Message, ChannelSizeDefault) return channel}// addChannel return chanfunc (ctx *ChannelContext) addChannel(module string, moduleCh chan model.Message) { ctx.chsLock.Lock() defer ctx.chsLock.Unlock() ctx.channels[module] = moduleCh}
这里实际上就是给beehiveContext中的moduleContext添加了一个类型为model.Message的channel,并保存到channels这个map中。
beehiveContext.AddModuleGroup
// AddModuleGroup adds module into module context groupfunc AddModuleGroup(module, group string) { context.moduleContext.AddModuleGroup(module, group)}
//AddModuleGroup adds modules into module context groupfunc (ctx *ChannelContext) AddModuleGroup(module, group string) { if channel := ctx.getChannel(module); channel != nil { ctx.addTypeChannel(module, group, channel) return } klog.Warningf("Get bad module name %s when addmodulegroup", module)}
这里就是把刚才创建的channel以group的名字为key创建一个map,然后把整个map放到moduleContext的typeChannels中。
group的名字是硬编码在实现module接口的具体module上的,比如cloudhub的的Group返回的就是"cloudhub"
func (a *cloudHub) Group() string { return "cloudhub"}
module.Start
最后就是调用每个module的start方法了。
Start是一个接口,每个具体的module都实现了自己的Start方法
具体module的Start方法,后续放到各个模块的分析中,这里就不仔细看了。
context
context目录下主要有context、context_channel以及context_factory。context中定义了接口,context_factory提供对外的操作接口实现,context_channel则是在context_factory中调用。context_factory与context_channel存在同名的接口。
Done
Done方法比较简单,直接调用了go context的Done方法
Cancel
Cancel调用的是beehiveContext的cancel方法,而beehiveContext的cancel方法就是创建go context时候返回的cancel方法
Cleanup
Cleanup调用的是context.moduleContext.Cleanup(module),也就是ChannelContext的Cleanup方法
func (ctx *ChannelContext) Cleanup(module string) { if channel := ctx.getChannel(module); channel != nil { ctx.delChannel(module) // decrease probable exception of channel closing time.Sleep(20 * time.Millisecond) close(channel) }}
// deleteChannel by module namefunc (ctx *ChannelContext) delChannel(module string) { // delete module channel from channels map ctx.chsLock.Lock() _, exist := ctx.channels[module] if !exist { klog.Warningf("Failed to get channel, module:%s", module) return } delete(ctx.channels, module) ctx.chsLock.Unlock() // delete module channel from typechannels map ctx.typeChsLock.Lock() for _, moduleMap := range ctx.typeChannels { if _, exist := moduleMap[module]; exist { delete(moduleMap, module) break } } ctx.typeChsLock.Unlock()}
这里就是根据module的名称,在ChannelContext的那几个map中删掉对应的channel的引用,
最后关闭channel.
Send
Send方法是调用context.messageContext.Send(module, message),也就是调用了ChannelContext的Send方法。
// Send send msg to a module. Todo: do not stuckfunc (ctx *ChannelContext) Send(module string, message model.Message) { // avoid exception because of channel colsing // TODO: need reconstruction defer func() { if exception := recover(); exception != nil { klog.Warningf("Recover when send message, exception: %+v", exception) } }() if channel := ctx.getChannel(module); channel != nil { channel <- message return } klog.Warningf("Get bad module name :%s when send message, do nothing", module)}
ChannelContext的Send方法根据module的名字先取出channel,然后放入message。
Receive
调用message, err := context.messageContext.Receive(module)。
和上面的Send对应,就是从module名称对应的channel中取出message
SendSync
// SendSync sends message in a sync wayfunc (ctx *ChannelContext) SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error) { // avoid exception because of channel colsing // TODO: need reconstruction defer func() { if exception := recover(); exception != nil { klog.Warningf("Recover when sendsync message, exception: %+v", exception) } }() if timeout <= 0 { timeout = MessageTimeoutDefault } deadline := time.Now().Add(timeout) // make sure to set sync flag message.Header.Sync = true // check req/resp channel reqChannel := ctx.getChannel(module) if reqChannel == nil { return model.Message{}, fmt.Errorf("bad request module name(%s)", module) } sendTimer := time.NewTimer(timeout) select { case reqChannel <- message: case <-sendTimer.C: return model.Message{}, errors.New("timeout to send message") } sendTimer.Stop() // new anonymous channel for response anonChan := make(chan model.Message) anonName := getAnonChannelName(message.GetID()) ctx.anonChsLock.Lock() ctx.anonChannels[anonName] = anonChan ctx.anonChsLock.Unlock() defer func() { ctx.anonChsLock.Lock() delete(ctx.anonChannels, anonName) close(anonChan) ctx.anonChsLock.Unlock() }() var resp model.Message respTimer := time.NewTimer(time.Until(deadline)) select { case resp = <-anonChan: case <-respTimer.C: return model.Message{}, errors.New("timeout to get response") } respTimer.Stop() return resp, nil}
上半部分就是在Send的基础上加了一个计时器,如果向通道写入消息超时,那么久返回空消息体和error信息。
下半部分创建了一个匿名的通道(实际上也是有名字的),并保存到ChannelContext的ctx.anonChannels这个map中。
需要注意的是,这里所谓的匿名通道,实际上是有名字的(message.GetID()),只是这个名字可能是message在创建时候生成的一个随机的uuid。
这个名字我们放到分析message的时候再看。
接下来就是在这个新的匿名通道上等待消息,并且也设置了超时时间,如果超时了,就返回空的消息和error。
总结一下,sendSync就是先发送一个message了,并以这个message的id创建一个新的通道,然后在这个通道上等待回复。
SendResponse
调用context.messageContext.SendResp(resp)。
// SendResp send resp for this message when using sync modefunc (ctx *ChannelContext) SendResp(message model.Message) { anonName := getAnonChannelName(message.GetParentID()) ctx.anonChsLock.RLock() defer ctx.anonChsLock.RUnlock() if channel, exist := ctx.anonChannels[anonName]; exist { channel <- message return } klog.Warningf("Get bad anonName:%s when sendresp message, do nothing", anonName)}
sendResp和之前的sendSync是对应的,就是向sendSync中创建的匿名通道中,发送消息。
这里的通道名词是通过message.GetParentID()获取的,注意创建的时候,名词用的是GetID,这里怎么对应也放到message模块再分析。
SendToGroup
调用context.messageContext.SendToGroup(moduleType, message)
这里就是根据module type的名字查出对应的通道列表,然后分别调用Send方法
SendToGroupSync
调用context.messageContext.SendToGroupSync(moduleType, message, timeout)
// SendToGroupSync : broadcast the message to echo module channel, the module send response back anon channel// check timeout and the size of anon channelfunc (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error { // avoid exception because of channel colsing // TODO: need reconstruction defer func() { if exception := recover(); exception != nil { klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception) } }() if timeout <= 0 { timeout = MessageTimeoutDefault } deadline := time.Now().Add(timeout) channelList := ctx.getTypeChannel(moduleType) if channelList == nil { return fmt.Errorf("failed to get module type(%s) channel list", moduleType) } // echo module must sync a response, // let anonchan size be module number channelNumber := len(channelList) anonChan := make(chan model.Message, channelNumber) anonName := getAnonChannelName(message.GetID()) ctx.anonChsLock.Lock() ctx.anonChannels[anonName] = anonChan ctx.anonChsLock.Unlock() cleanup := func() error { ctx.anonChsLock.Lock() delete(ctx.anonChannels, anonName) close(anonChan) ctx.anonChsLock.Unlock() var uninvitedGuests int // cleanup anonchan and check parentid for resp for resp := range anonChan { if resp.GetParentID() != message.GetID() { uninvitedGuests++ } } if uninvitedGuests != 0 { klog.Errorf("Get some unexpected:%d resp when sendToGroupsync message", uninvitedGuests) return fmt.Errorf("got some unexpected(%d) resp", uninvitedGuests) } return nil } // make sure to set sync flag before sending message.Header.Sync = true var timeoutCounter int32 send := func(ch chan model.Message) { sendTimer := time.NewTimer(time.Until(deadline)) select { case ch <- message: sendTimer.Stop() case <-sendTimer.C: atomic.AddInt32(&timeoutCounter, 1) } } for _, channel := range channelList { go send(channel) } sendTimer := time.NewTimer(time.Until(deadline)) ticker := time.NewTicker(TickerTimeoutDefault) for { // annonChan is full if len(anonChan) == channelNumber { break } select { case <-ticker.C: case <-sendTimer.C: cleanup() if timeoutCounter != 0 { errInfo := fmt.Sprintf("timeout to send message, several %d timeout when send", timeoutCounter) return fmt.Errorf(errInfo) } klog.Error("Timeout to sendToGroupsync message") return fmt.Errorf("Timeout to send message") } } return cleanup()}
SendToGroupSync并没有调用SendToGroup方法,而是直接调用了Send方法。
首先创建了一个size和group中channel数量一样的匿名通道,然后遍历通道,调用send发送消息,然后等待匿名通道收到消息的数量等于size。
还有个要注意的地方,这里并没有吧匿名通道的消息取出来返回出去,而是调用了cleanup方法直接清理通道。也就是说,用这种方式发送消息,只能得到是否消息都得到回复,而不能得到具体回复的内容。
model/message
message.go中定义了通道中使用的消息格式,
// Message structtype Message struct { Header MessageHeader `json:"header"` Router MessageRoute `json:"route,omitempty"` Content interface{} `json:"content"`}//MessageRoute contains structure of messagetype MessageRoute struct { // where the message come from Source string `json:"source,omitempty"` // where the message will broadcasted to Group string `json:"group,omitempty"` // what's the operation on resource Operation string `json:"operation,omitempty"` // what's the resource want to operate Resource string `json:"resource,omitempty"`}//MessageHeader defines message header detailstype MessageHeader struct { // the message uuid ID string `json:"msg_id"` // the response message parentid must be same with message received // please use NewRespByMessage to new response message ParentID string `json:"parent_msg_id,omitempty"` // the time of creating Timestamp int64 `json:"timestamp"` // the flag will be set in sendsync Sync bool `json:"sync,omitempty"`}
Message由MessageRoute、MessageHeader以及Content接口组成。 整个message都是json转换过来的。
这些类型上关联的方法,这里就不一一看了,主要都是些get、set方法,看几个特殊的:
UpdateID
//UpdateID returns message object updating its IDfunc (msg *Message) UpdateID() *Message { msg.Header.ID = uuid.NewV4().String() return msg}
可以看出,updateId是新生成了一个uuid放到header中。
NewMessage
func NewMessage(parentID string) *Message { msg := &Message{} msg.Header.ID = uuid.NewV4().String() msg.Header.ParentID = parentID msg.Header.Timestamp = time.Now().UnixNano() / 1e6 return msg}
生成一个新的message结构,需要传入parentID,自己的ID则是随机生成的
Clone
生成一个新的ID,其他字段与被clone的message一致
NewRespByMessage
// NewRespByMessage returns a new response message by a message receivedfunc (msg *Message) NewRespByMessage(message *Message, content interface{}) *Message { return NewMessage(message.GetID()).SetRoute(message.GetSource(), message.GetGroup()). SetResourceOperation(message.GetResource(), ResponseOperation). FillBody(content)}
注意到NewMessage的参数是parentID,所以这个Response Message中,实际上就是用了收到的message的id作为parenet的id。
route则是用收到的message的source。
文章来源:智云一二三科技
文章标题:KubeEdge分析-beehive
文章地址:https://www.zhihuclub.com/305.shtml