您的位置 首页 golang

Nsq原理分析(二)

上一篇文章中对nsq进行了简单的介绍,从nsq 的golang的客户端代码分析了一下nsq的使用,这篇文章会分析nsqd的代码

Nsqd代码分析

nsqd做了什么

  • nsqd接收对topic、channel的创建以及对消息的存储和分发
  • nsqd向nsqlookup注册自己的服务信息,ip 和端口,向nsqlookup注册自己的元数据信息(topic、channel),nsqd也会向nsqdlook查询topic、和channel信息

nsq.png
nsqadmin 是一个简单的管理界面,通过它可以查询topic、channel、消费者等等一些基本信息,nsqadmin是从 nsqlookup中获取信息的,通过nsqadmin也可以创建topic、channel,创建到了nsqlookup中,在nsqlookup中的内存中维护者,nsqd 会在某一个合适的时刻将这些信息拉回本地然后创建
nsqd 启动

func (n *NSQD) Main() error {    ctx := &context{n}    exitCh := make(chan error)    var once sync.Once    exitFunc := func(err error) {        once.Do(func() {            if err != nil {                n.logf(LOG_FATAL, "%s", err)            }            exitCh <- err        })    }    n.tcpServer.ctx = ctx    // 启动 tcp监听    n.waitGroup.Wrap(func() {        exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))    })    // 启动http监听    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)    n.waitGroup.Wrap(func() {        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))    })    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {        httpsServer := newHTTPServer(ctx, true, true)        n.waitGroup.Wrap(func() {            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))        })    }   // 队列扫描,处理超时、延迟等信息    n.waitGroup.Wrap(n.queueScanLoop)    // 向nsqlookup注册自己的元数据信息    n.waitGroup.Wrap(n.lookupLoop)    if n.getOpts().StatsdAddress != "" {        n.waitGroup.Wrap(n.statsdLoop)    }    err := <-exitCh    return err}
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {    logf(lg.INFO, "TCP: listening on %s", listener.Addr())    var wg sync.WaitGroup    for {       //等待请求的到来        clientConn, err := listener.Accept()        if err != nil {            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {                logf(lg.WARN, "temporary Accept() failure - %s", err)                runtime.Gosched()                continue            }            // theres no direct way to detect this error because it is not exposed            if !strings.Contains(err.Error(), "use of closed network connection") {                return fmt.Errorf("listener.Accept() error - %s", err)            }            break        }        wg.Add(1)        // 每当到来一个请求都启动一个goroutine进行处理        go func() {            handler.Handle(clientConn)            wg.Done()        }()    }    // wait to return until all handler goroutines complete    wg.Wait()    logf(lg.INFO, "TCP: closing %s", listener.Addr())    return nil}
unc (p *tcpServer) Handle(clientConn net.Conn) {    p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())    // The client should initialize itself by sending a 4 byte sequence indicating    // the version of the protocol that it intends to communicate, this will allow us    // to gracefully upgrade the protocol away from text/line oriented to whatever...    buf := make([]byte, 4)    _, err := io.ReadFull(clientConn, buf)    if err != nil {        p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)        clientConn.Close()        return    }    //协商协议版本    protocolMagic := string(buf)    p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",        clientConn.RemoteAddr(), protocolMagic)    var prot protocol.Protocol    switch protocolMagic {    case "  V2":        prot = &protocolV2{ctx: p.ctx}    default:        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))        clientConn.Close()        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",            clientConn.RemoteAddr(), protocolMagic)        return    }    p.conns.Store(clientConn.RemoteAddr(), clientConn)    // 开始一个死循环    err = prot.IOLoop(clientConn)    if err != nil {        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)    }    p.conns.Delete(clientConn.RemoteAddr())}
func (p *protocolV2) IOLoop(conn net.Conn) error {    var err error    var line []byte    var zeroTime time.Time    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)    client := newClientV2(clientID, conn, p.ctx)    p.ctx.nsqd.AddClient(client.ID, client)    // synchronize the startup of messagePump in order    // to guarantee that it gets a chance to initialize    // goroutine local state derived from client attributes    // and avoid a potential race with IDENTIFY (where a client    // could have changed or disabled said attributes)    messagePumpStartedChan := make(chan bool)    go p.messagePump(client, messagePumpStartedChan)    // 消息分发,向消费者发送消息    <-messagePumpStartedChan    for {    // 设置socket读取超时,如果consumer未在指定的时间内发送过来,那么会断开连接,导致consumer退出        if client.HeartbeatInterval > 0 {            client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))        } else {            client.SetReadDeadline(zeroTime)        }        // ReadSlice does not allocate new space for the data each request        // ie. the returned slice is only valid until the next call to it        //读取生产者或者消费者发送过来的请求        line, err = client.Reader.ReadSlice('\n')        if err != nil {            if err == io.EOF {                err = nil            } else {                err = fmt.Errorf("failed to read command - %s", err)            }            break        }        // trim the '\n'        line = line[:len(line)-1]        // optionally trim the '\r'        if len(line) > 0 && line[len(line)-1] == '\r' {            line = line[:len(line)-1]        }        params := bytes.Split(line, separatorBytes)        p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)        var response []byte        // 根据不同的命令执行不同的动作        response, err = p.Exec(client, params)        if err != nil {            ctx := ""            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {                ctx = " - " + parentErr.Error()            }            p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)            sendErr := p.Send(client, frameTypeError, []byte(err.Error()))            if sendErr != nil {                p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)                break            }            // errors of type FatalClientErr should forceably close the connection            if _, ok := err.(*protocol.FatalClientErr); ok {                break            }            continue        }        if response != nil {            err = p.Send(client, frameTypeResponse, response)            if err != nil {                err = fmt.Errorf("failed to send response - %s", err)                break            }        }    }    p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)    conn.Close()    close(client.ExitChan)    if client.Channel != nil {        client.Channel.RemoveClient(client.ID)    }    p.ctx.nsqd.RemoveClient(client.ID)    return err}

在继续向下看前,看一下生产者的 PUB 请求在nsqd中做了什么

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {    var err error    if len(params) < 2 {        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")    }    topicName := string(params[1])    if !protocol.IsValidTopicName(topicName) {        return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",            fmt.Sprintf("PUB topic name %q is not valid", topicName))    }    bodyLen, err := readLen(client.Reader, client.lenSlice)    if err != nil {        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")    }    if bodyLen <= 0 {        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",            fmt.Sprintf("PUB invalid message body size %d", bodyLen))    }    if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize {        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",            fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize))    }    messageBody := make([]byte, bodyLen)    _, err = io.ReadFull(client.Reader, messageBody)    if err != nil {        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")    }    if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {        return nil, err    }    // topic 在nsqd中的创建的lazy create,只有当某个生产者向该topic中发送消息时才会创建topic,    topic := p.ctx.nsqd.GetTopic(topicName)    msg := NewMessage(topic.GenerateID(), messageBody)    err = topic.PutMessage(msg)    if err != nil {        return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())    }    client.PublishedMessage(topicName, 1)    return okBytes, nil}
/ GetTopic performs a thread safe operation// to return a pointer to a Topic object (potentially new)func (n *NSQD) GetTopic(topicName string) *Topic {    // most likely, we already have this topic, so try read lock first.    n.RLock()    // 当topic在nsqd中创建过时就直接返回该topic    t, ok := n.topicMap[topicName]    n.RUnlock()    if ok {        return t    }    n.Lock()    t, ok = n.topicMap[topicName]    if ok {        n.Unlock()        return t    }    deleteCallback := func(t *Topic) {        n.DeleteExistingTopic(t.name)    }    //稍后看一下这个函数    t = NewTopic(topicName, &context{n}, deleteCallback)    n.topicMap[topicName] = t    n.Unlock()    n.logf(LOG_INFO, "TOPIC(%s): created", t.name)    // topic is created but messagePump not yet started    // if loading metadata at startup, no lookupd connections yet, topic started after load    if atomic.LoadInt32(&n.isLoading) == 1 {        return t    }    // if using lookupd, make a blocking call to get the topics, and immediately create them.    // this makes sure that any message received is buffered to the right channels    //如果使用了nsqlookup,那么从nsqlookup中查询该topic的channel信息,如果没有在nsqd中创建就创建出来    lookupdHTTPAddrs := n.lookupdHTTPAddrs()    if len(lookupdHTTPAddrs) > 0 {        channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)        if err != nil {            n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)        }        for _, channelName := range channelNames {            if strings.HasSuffix(channelName, "#ephemeral") {                continue // do not create ephemeral channel with no consumer client            }            t.GetChannel(channelName)        }    } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {        n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)    }    // now that all channels are added, start topic messagePump    t.Start()    return t}
// Topic constructorfunc NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {    t := &Topic{        name:              topicName,        channelMap:        make(map[string]*Channel),        memoryMsgChan:     nil,        startChan:         make(chan int, 1),        exitChan:          make(chan int),        channelUpdateChan: make(chan int),        ctx:               ctx,        paused:            0,        pauseChan:         make(chan int),        deleteCallback:    deleteCallback,        idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),    }    // create mem-queue only if size > 0 (do not use unbuffered chan)    if ctx.nsqd.getOpts().MemQueueSize > 0 {        t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)    }    if strings.HasSuffix(topicName, "#ephemeral") {        t.ephemeral = true        t.backend = newDummyBackendQueue()    } else {        dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {            opts := ctx.nsqd.getOpts()            lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)        }        //持久化的结构        t.backend = diskqueue.New(            topicName,            ctx.nsqd.getOpts().DataPath,            ctx.nsqd.getOpts().MaxBytesPerFile,            int32(minValidMsgLength),            int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,            ctx.nsqd.getOpts().SyncEvery,            ctx.nsqd.getOpts().SyncTimeout,            dqLogf,        )    }   // topic中也启动了一个messagePump,在protocolv2中也启动了一个同名函数,前一个是为了向consumer推送消息,这个是向topic下的一个或者多个队列中发送消息    t.waitGroup.Wrap(t.messagePump)    // 通知持久化    t.ctx.nsqd.Notify(t)    return t}func (t *Topic) Start() {    select {    case t.startChan <- 1:    default:    }}

看一下nsqd是如何向nsqlookup注册自己的元数据信息的,在nsqd启动时起了一个goroutine lookuploop

func (n *NSQD) lookupLoop() {    var lookupPeers []*lookupPeer    var lookupAddrs []string    connect := true    hostname, err := os.Hostname()    if err != nil {        n.logf(LOG_FATAL, "failed to get hostname - %s", err)        os.Exit(1)    }    // for announcements, lookupd determines the host automatically    ticker := time.Tick(15 * time.Second)    for {        if connect {            for _, host := range n.getOpts().NSQLookupdTCPAddresses {                if in(host, lookupAddrs) {                    continue                }                n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)                lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,                    connectCallback(n, hostname))                lookupPeer.Command(nil) // start the connection                lookupPeers = append(lookupPeers, lookupPeer)                lookupAddrs = append(lookupAddrs, host)            }            n.lookupPeers.Store(lookupPeers)            connect = false        }        select {        case <-ticker:        // 向nsqlookup发送心跳信息            // send a heartbeat and read a response (read detects closed conns)            for _, lookupPeer := range lookupPeers {                n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)                cmd := nsq.Ping()                _, err := lookupPeer.Command(cmd)                if err != nil {                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)                }            }        case val := <-n.notifyChan:            var cmd *nsq.Command            var branch string            switch val.(type) {            // 注册channel            case *Channel:                // notify all nsqlookupds that a new channel exists, or that it's removed                branch = "channel"                channel := val.(*Channel)                if channel.Exiting() == true {                    cmd = nsq.UnRegister(channel.topicName, channel.name)                } else {                    cmd = nsq.Register(channel.topicName, channel.name)                }            // 注册topic            case *Topic:                           // notify all nsqlookupds that a new topic exists, or that it's removed                branch = "topic"                topic := val.(*Topic)                if topic.Exiting() == true {                    cmd = nsq.UnRegister(topic.name, "")                } else {                    cmd = nsq.Register(topic.name, "")                }            }            for _, lookupPeer := range lookupPeers {                n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)                _, err := lookupPeer.Command(cmd)                if err != nil {                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)                }            }        case <-n.optsNotificationChan:            var tmpPeers []*lookupPeer            var tmpAddrs []string            for _, lp := range lookupPeers {                if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {                    tmpPeers = append(tmpPeers, lp)                    tmpAddrs = append(tmpAddrs, lp.addr)                    continue                }                n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)                lp.Close()            }            lookupPeers = tmpPeers            lookupAddrs = tmpAddrs            connect = true        case <-n.exitChan:            goto exit        }    }exit:    n.logf(LOG_INFO, "LOOKUP: closing")}

在nsqd启动lookuploop这个goroutine时还启动了另一 queueScanLoop goroutine,主要用来监控超时消息的处理。
总结一下

  • nsqd启动时分别监听tcp、http端口
  • 启动loopuploop goroutine 向nsqlookup 注册自己的相关信息
  • 启动 queueScanLoop goroutine 对超时消息进行处理
  • 启动 statsdLoop goroutine 进行性能和topic、channel等一些参数进行统计
  • 当有 producer client 通过 PUB 命令接入进来时,nsqd 会情动一个单独的 goroutine 进行处理,此时会创建 topic、channel,topic会启动一个 messagepump 的 goroutine,将消息发送给下面的各个channel
  • 当有 consumer client 接入进来时, 启动单独 goroutine 进行处理,会启动一个 messagepump goroutine 将消息发送给各个consumer

注意,consumer 消费消息是有超时配置的,消费者的每一条消息要在超时范围内,要不然会导致一些问题。


文章来源:智云一二三科技

文章标题:Nsq原理分析(二)

文章地址:https://www.zhihuclub.com/1205.shtml

关于作者: 智云科技

热门文章

网站地图