Nsq原理分析(二)

上一篇文章中对nsq进行了简单的介绍,从nsq 的golang的客户端代码分析了一下nsq的使用,这篇文章会分析nsqd的代码

Nsqd代码分析

nsqd做了什么

  • nsqd接收对topic、channel的创建以及对消息的存储和分发
  • nsqd向nsqlookup注册自己的服务信息,ip 和端口,向nsqlookup注册自己的元数据信息(topic、channel),nsqd也会向nsqdlook查询topic、和channel信息

nsq.png
nsqadmin 是一个简单的管理界面,通过它可以查询topic、channel、消费者等等一些基本信息,nsqadmin是从 nsqlookup中获取信息的,通过nsqadmin也可以创建topic、channel,创建到了nsqlookup中,在nsqlookup中的内存中维护者,nsqd 会在某一个合适的时刻将这些信息拉回本地然后创建
nsqd 启动

func (n *NSQD) Main() error {
    ctx := &context{n}

    exitCh := make(chan error)
    var once sync.Once
    exitFunc := func(err error) {
        once.Do(func() {
            if err != nil {
                n.logf(LOG_FATAL, "%s", err)
            }
            exitCh <- err
        })
    }

    n.tcpServer.ctx = ctx
    // 启动 tcp监听
    n.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    })
    // 启动http监听
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    })

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
        })
    }
   // 队列扫描,处理超时、延迟等信息
    n.waitGroup.Wrap(n.queueScanLoop)
    // 向nsqlookup注册自己的元数据信息
    n.waitGroup.Wrap(n.lookupLoop)
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }

    err := <-exitCh
    return err
}
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
    logf(lg.INFO, "TCP: listening on %s", listener.Addr())

    var wg sync.WaitGroup

    for {
       //等待请求的到来
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                logf(lg.WARN, "temporary Accept() failure - %s", err)
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                return fmt.Errorf("listener.Accept() error - %s", err)
            }
            break
        }

        wg.Add(1)
        // 每当到来一个请求都启动一个goroutine进行处理
        go func() {
            handler.Handle(clientConn)
            wg.Done()
        }()
    }

    // wait to return until all handler goroutines complete
    wg.Wait()

    logf(lg.INFO, "TCP: closing %s", listener.Addr())

    return nil
}
unc (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

    // The client should initialize itself by sending a 4 byte sequence indicating
    // the version of the protocol that it intends to communicate, this will allow us
    // to gracefully upgrade the protocol away from text/line oriented to whatever...
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
        clientConn.Close()
        return
    }
    //协商协议版本
    protocolMagic := string(buf)

    p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx}
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    p.conns.Store(clientConn.RemoteAddr(), clientConn)
    // 开始一个死循环
    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    }

    p.conns.Delete(clientConn.RemoteAddr())
}
func (p *protocolV2) IOLoop(conn net.Conn) error {
    var err error
    var line []byte
    var zeroTime time.Time

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)
    p.ctx.nsqd.AddClient(client.ID, client)

    // synchronize the startup of messagePump in order
    // to guarantee that it gets a chance to initialize
    // goroutine local state derived from client attributes
    // and avoid a potential race with IDENTIFY (where a client
    // could have changed or disabled said attributes)
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    // 消息分发,向消费者发送消息
    <-messagePumpStartedChan

    for {
    // 设置socket读取超时,如果consumer未在指定的时间内发送过来,那么会断开连接,导致consumer退出
        if client.HeartbeatInterval > 0 {
            client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
        } else {
            client.SetReadDeadline(zeroTime)
        }

        // ReadSlice does not allocate new space for the data each request
        // ie. the returned slice is only valid until the next call to it
        //读取生产者或者消费者发送过来的请求
        line, err = client.Reader.ReadSlice('\n')
        if err != nil {
            if err == io.EOF {
                err = nil
            } else {
                err = fmt.Errorf("failed to read command - %s", err)
            }
            break
        }

        // trim the '\n'
        line = line[:len(line)-1]
        // optionally trim the '\r'
        if len(line) > 0 && line[len(line)-1] == '\r' {
            line = line[:len(line)-1]
        }
        params := bytes.Split(line, separatorBytes)

        p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

        var response []byte
        // 根据不同的命令执行不同的动作
        response, err = p.Exec(client, params)
        if err != nil {
            ctx := ""
            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
                ctx = " - " + parentErr.Error()
            }
            p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

            sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
            if sendErr != nil {
                p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
                break
            }

            // errors of type FatalClientErr should forceably close the connection
            if _, ok := err.(*protocol.FatalClientErr); ok {
                break
            }
            continue
        }

        if response != nil {
            err = p.Send(client, frameTypeResponse, response)
            if err != nil {
                err = fmt.Errorf("failed to send response - %s", err)
                break
            }
        }
    }

    p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
    conn.Close()
    close(client.ExitChan)
    if client.Channel != nil {
        client.Channel.RemoveClient(client.ID)
    }

    p.ctx.nsqd.RemoveClient(client.ID)
    return err
}

在继续向下看前,看一下生产者的 PUB 请求在nsqd中做了什么

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    var err error

    if len(params) < 2 {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
    }

    topicName := string(params[1])
    if !protocol.IsValidTopicName(topicName) {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
            fmt.Sprintf("PUB topic name %q is not valid", topicName))
    }

    bodyLen, err := readLen(client.Reader, client.lenSlice)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
    }

    if bodyLen <= 0 {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
            fmt.Sprintf("PUB invalid message body size %d", bodyLen))
    }

    if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
            fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize))
    }

    messageBody := make([]byte, bodyLen)
    _, err = io.ReadFull(client.Reader, messageBody)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
    }

    if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
        return nil, err
    }
    // topic 在nsqd中的创建的lazy create,只有当某个生产者向该topic中发送消息时才会创建topic,
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    err = topic.PutMessage(msg)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
    }

    client.PublishedMessage(topicName, 1)

    return okBytes, nil
}
/ GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
    // most likely, we already have this topic, so try read lock first.
    n.RLock()
    // 当topic在nsqd中创建过时就直接返回该topic
    t, ok := n.topicMap[topicName]
    n.RUnlock()
    if ok {
        return t
    }

    n.Lock()

    t, ok = n.topicMap[topicName]
    if ok {
        n.Unlock()
        return t
    }
    deleteCallback := func(t *Topic) {
        n.DeleteExistingTopic(t.name)
    }
    //稍后看一下这个函数
    t = NewTopic(topicName, &context{n}, deleteCallback)
    n.topicMap[topicName] = t

    n.Unlock()

    n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
    // topic is created but messagePump not yet started

    // if loading metadata at startup, no lookupd connections yet, topic started after load
    if atomic.LoadInt32(&n.isLoading) == 1 {
        return t
    }

    // if using lookupd, make a blocking call to get the topics, and immediately create them.
    // this makes sure that any message received is buffered to the right channels
    //如果使用了nsqlookup,那么从nsqlookup中查询该topic的channel信息,如果没有在nsqd中创建就创建出来
    lookupdHTTPAddrs := n.lookupdHTTPAddrs()
    if len(lookupdHTTPAddrs) > 0 {
        channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
        if err != nil {
            n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
        }
        for _, channelName := range channelNames {
            if strings.HasSuffix(channelName, "#ephemeral") {
                continue // do not create ephemeral channel with no consumer client
            }
            t.GetChannel(channelName)
        }
    } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
        n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
    }

    // now that all channels are added, start topic messagePump
    t.Start()
    return t
}
// Topic constructor
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    t := &Topic{
        name:              topicName,
        channelMap:        make(map[string]*Channel),
        memoryMsgChan:     nil,
        startChan:         make(chan int, 1),
        exitChan:          make(chan int),
        channelUpdateChan: make(chan int),
        ctx:               ctx,
        paused:            0,
        pauseChan:         make(chan int),
        deleteCallback:    deleteCallback,
        idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),
    }
    // create mem-queue only if size > 0 (do not use unbuffered chan)
    if ctx.nsqd.getOpts().MemQueueSize > 0 {
        t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
    }
    if strings.HasSuffix(topicName, "#ephemeral") {
        t.ephemeral = true
        t.backend = newDummyBackendQueue()
    } else {
        dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
            opts := ctx.nsqd.getOpts()
            lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
        }
        //持久化的结构
        t.backend = diskqueue.New(
            topicName,
            ctx.nsqd.getOpts().DataPath,
            ctx.nsqd.getOpts().MaxBytesPerFile,
            int32(minValidMsgLength),
            int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
            ctx.nsqd.getOpts().SyncEvery,
            ctx.nsqd.getOpts().SyncTimeout,
            dqLogf,
        )
    }
   // topic中也启动了一个messagePump,在protocolv2中也启动了一个同名函数,前一个是为了向consumer推送消息,这个是向topic下的一个或者多个队列中发送消息
    t.waitGroup.Wrap(t.messagePump)
    // 通知持久化
    t.ctx.nsqd.Notify(t)

    return t
}

func (t *Topic) Start() {
    select {
    case t.startChan <- 1:
    default:
    }
}

看一下nsqd是如何向nsqlookup注册自己的元数据信息的,在nsqd启动时起了一个goroutine lookuploop

func (n *NSQD) lookupLoop() {
    var lookupPeers []*lookupPeer
    var lookupAddrs []string
    connect := true

    hostname, err := os.Hostname()
    if err != nil {
        n.logf(LOG_FATAL, "failed to get hostname - %s", err)
        os.Exit(1)
    }

    // for announcements, lookupd determines the host automatically
    ticker := time.Tick(15 * time.Second)
    for {
        if connect {
            for _, host := range n.getOpts().NSQLookupdTCPAddresses {
                if in(host, lookupAddrs) {
                    continue
                }
                n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
                lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
                    connectCallback(n, hostname))
                lookupPeer.Command(nil) // start the connection
                lookupPeers = append(lookupPeers, lookupPeer)
                lookupAddrs = append(lookupAddrs, host)
            }
            n.lookupPeers.Store(lookupPeers)
            connect = false
        }

        select {
        case <-ticker:
        // 向nsqlookup发送心跳信息
            // send a heartbeat and read a response (read detects closed conns)
            for _, lookupPeer := range lookupPeers {
                n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
                cmd := nsq.Ping()
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
                }
            }
        case val := <-n.notifyChan:
            var cmd *nsq.Command
            var branch string

            switch val.(type) {
            // 注册channel
            case *Channel:
                // notify all nsqlookupds that a new channel exists, or that it's removed
                branch = "channel"
                channel := val.(*Channel)
                if channel.Exiting() == true {
                    cmd = nsq.UnRegister(channel.topicName, channel.name)
                } else {
                    cmd = nsq.Register(channel.topicName, channel.name)
                }
            // 注册topic
            case *Topic:
           
                // notify all nsqlookupds that a new topic exists, or that it's removed
                branch = "topic"
                topic := val.(*Topic)
                if topic.Exiting() == true {
                    cmd = nsq.UnRegister(topic.name, "")
                } else {
                    cmd = nsq.Register(topic.name, "")
                }
            }

            for _, lookupPeer := range lookupPeers {
                n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
                }
            }
        case <-n.optsNotificationChan:
            var tmpPeers []*lookupPeer
            var tmpAddrs []string
            for _, lp := range lookupPeers {
                if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
                    tmpPeers = append(tmpPeers, lp)
                    tmpAddrs = append(tmpAddrs, lp.addr)
                    continue
                }
                n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
                lp.Close()
            }
            lookupPeers = tmpPeers
            lookupAddrs = tmpAddrs
            connect = true
        case <-n.exitChan:
            goto exit
        }
    }

exit:
    n.logf(LOG_INFO, "LOOKUP: closing")
}

在nsqd启动lookuploop这个goroutine时还启动了另一 queueScanLoop goroutine,主要用来监控超时消息的处理。
总结一下

  • nsqd启动时分别监听tcp、http端口
  • 启动loopuploop goroutine 向nsqlookup 注册自己的相关信息
  • 启动 queueScanLoop goroutine 对超时消息进行处理
  • 启动 statsdLoop goroutine 进行性能和topic、channel等一些参数进行统计
  • 当有 producer client 通过 PUB 命令接入进来时,nsqd 会情动一个单独的 goroutine 进行处理,此时会创建 topic、channel,topic会启动一个 messagepump 的 goroutine,将消息发送给下面的各个channel
  • 当有 consumer client 接入进来时, 启动单独 goroutine 进行处理,会启动一个 messagepump goroutine 将消息发送给各个consumer

注意,consumer 消费消息是有超时配置的,消费者的每一条消息要在超时范围内,要不然会导致一些问题。


发表评论

电子邮件地址不会被公开。 必填项已用*标注