您的位置 首页 golang

golang nsq源码分析&添加中文注释系列(二):Nsqd入口主流程

以往看网上的源码分析时,基本都是前面一段讲解,后面跟一大段代码,没有上下文分析,我就暗暗的想,如果一个函数或一段逻辑能有中文注释(俺小本毕业英语不太好)带有上下文分析,这样读源码岂不是会更快顺畅。。。不废话了,我们开始吧。

上一篇大概讲解了基本介绍,我们也把Nsqd一步一步跑起来了(假设您已动手尝试过),本篇则从源码入口开始讲解

前言

  • 针对特殊的包或者方法,会单独开一篇博客讲解,请注意代码里面的链接地址,建议手动尝试一下
  • 文章宗旨是学习大神的每一行代码,所以看起来会比较啰嗦,建议您一边看代码一边读文章(效果更佳)

NSQ整体流程

NSQ由3个守护进程组成:

  • nsqd 是接收、保存和传送消息到客户端的守护进程
  • nsqlookupd 是管理的拓扑信息,维护着所有nsqd的状态,并提供了最终一致发现服务的守护进程
  • nsqadmin 是一个web ui的实时监控集群和执行各种管理任务

nsqd入口文件:nsq/apps/nsqd/main.go

废话不多说,都在酒里了(代码里),直接看注释就能理解

 package main

import (
	"flag"
	"fmt"
	"math/rand"
	"os"
	"path/filepath"
	"sync"
	"syscall"
	"time"

	// toml开源包
	"github.com/BurntSushi/toml"
	// go-options开源包
	"github.com/mreiferson/go-options"
	// 内部版本号
	"github.com/nsqio/nsq/internal/version"
	// 命令行控制包svc 服务控制
	"github.com/judwhite/go-svc/svc"

	// 内部包 日志中间件 log
	"github.com/nsqio/nsq/internal/lg"

	// nsqd真正工作的区域
	"github.com/nsqio/nsq/nsqd"
)

/*
定义业务program结构体
*/
type program struct {

	// once能确保实例化对象Do方法在多线程环境只运行一次,内部通过互斥锁实现
	once sync.Once
	nsqd *nsqd.NSQD
}

/*
采用SVC包进行服务控制,主要是统一管理服务,对于信号控制不用每次都写在业务上,在ctrl+c时,能正常监听defer结束,方便获取很多日志,参数等
*/
func main() {
	// 实例化
	prg := &program{}
	/*
	服务控制实践-svc包转/
	
	// Implement this interface and pass it to the Run function to start your program.
type Service interface {
	// Init is called before the program/service is started and after it's
	// determined if the program is running as a Windows Service.
	Init(Environment) error

	// Start is called after Init. This method must be non-blocking.
	Start() error

	// Stop is called in response to os.Interrupt, os.Kill, or when a
	// Windows Service is stopped.
	Stop() error
}
	svc 第一个参数需要实现Service接口才可以正常运行,这也就是大伙看到的program 实现的init/start/stop三个函数

	使用svc启动相关程序
	*/
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		logFatal("%s", err)
	}
}

func (p *program) Init(env svc.Environment) error {
	// 检查是否是windows 服务。。。目测一般时候也用不到
	if env.IsWindowsService() {
		dir := filepath.Dir(os.Args[0])
		return os.Chdir(dir)
	}
	return nil
}

func (p *program) Start() error {
	/*
	实例化并初始一些配置和默认值
	/nsq/nsqd/options.go
	*/
	opts := nsqd.NewOptions()

	/*
	封装了命令行的一些检查项,设置检查项的默认值
	使用apps目录:/nsq/apps/nsqd/options.go
	然后parse解析
	*/
	flagSet := nsqdFlagSet(opts)
	flagSet.Parse(os.Args[1:])

	// 生成随机数 time.Now().UnixNano()  单位纳秒
	rand.Seed(time.Now().UTC().UnixNano())

	// 打印版本号,接收命令行参数version  默认值:false
	/*
	执行效果
	bj-m-server:nsqd yixia$ go run ./ --version=true
	nsqd v1.2.1-alpha (built w/go1.12.2)
	*/
	if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
		// 打印版本号  %!V(string=nsqd v1.2.1-alpha (built w/go1.12.2))
		fmt.Println(version.String("nsqd"))
		os.Exit(0)
	}

	// 获取外部的配置文件,解析toml文件格式
	var cfg config
	/*
	bj-m-server:nsqd yixia$ go run ./ --config=config.toml
	*/
	configFile := flagSet.Lookup("config").Value.String()
	// 如果不为空
	if configFile != "" {
		// 加载,读出的数据采用空_  抛弃,赋值给cfg
		_, err := toml.DecodeFile(configFile, &cfg)
		// 抛错
		if err != nil {
			logFatal("failed to load config file %s - %s", configFile, err)
		}
	}
	// 检查配置文件
	cfg.Validate()

	// 采用优先级从高到低依次进行解析,最终
	options.Resolve(opts, flagSet, cfg)
	/*
	传入用户自定义配置,实例化nsqd

	nsqd.new以后做了那些事情,大概捋一下,后续看的时候能加深印象
	1、检查命令行cli   opts.DataPath 、  opts.Logger没设置  设置默认值
	2、实例NSQD主对象
	3、监听tcp   net.Listen("tcp", opts.TCPAddress)
	4、监听http  net.Listen("tcp", opts.HTTPAddress)
	5、监听https tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig)

	综合以上了解,基本做的事情就是实例化主对象,并对cli 自定义的命令一顿操作。。。然后就这样了,return (*NSQD, error)
	*/
	nsqd, err := nsqd.New(opts)
	if err != nil {
		logFatal("failed to instantiate nsqd - %s", err)
	}
	p.nsqd = nsqd

	/*
	加载历史数据,数据来源nsqd.dat -> 历史数据格式{"topics":[],"version":"1.2.1-alpha"}
	1、获取历史数据
	2、解析成对应的结构体 json.Unmarshal(data, &m)
	3、遍历 for _, t := range m.Topics , 解析每个topic -> channel
	4、启动N个topic.Start()(重点代码中有一个GetTopic,采用线程线程安全方式,重点学习)
	5、func (n *NSQD) LoadMetadata() error
	*/
	err = p.nsqd.LoadMetadata()
	if err != nil {
		logFatal("failed to load metadata - %s", err)
	}

	/*
	持久化最新数据
	1、获取原始数据文件名 fileName := newMetadataFile(n.getOpts())
	2、遍历 nsqd.topicMap -> ndqd.channelMap  ,这是对topicMap和channelMap加了互斥锁
	3、将最新数据写入到临时文件中,明文明文件名为:nsqd.dat.333569681738193261.tmp
	4、func (n *NSQD) PersistMetadata() error
	*/
	err = p.nsqd.PersistMetadata()
	if err != nil {
		logFatal("failed to persist metadata - %s", err)
	}

	/*
	开启协程进入nsqd.Main主函数
	Main方法里重点使用了封装的WaitGroup
	下列出现的n.waitGroup.Wrap均采用了封装groutine
	可以看我另一篇文章讲解了封装的流程和使用方法:积累-waitgroup包装/

	方法:
	func (n *NSQD) Main() error {

	大体执行思路
	1、实例化context
	2、建立退出通道,保证退出函数只运行一次,创建了匿名函数exitFunc
	3、初始化并监听TCPServer
	3.1、exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
	3.2、TCPServer采用无限循环方式监听tcp client长连接,当有一个client连接,分配一个groutine进行处理
	for -> listener.Accept() -> groutine
	4、初始化HTTPServer
	4.1、使用httprouter进行路由设置,然后初始化各种接口

	5、初始化HttpsServer
	6、监控循环队列:n.waitGroup.Wrap(n.queueScanLoop)
	7、节点信息管理:n.waitGroup.Wrap(n.lookupLoop)
	8、统计信息:n.waitGroup.Wrap(n.statsdLoop)
	*/
	go func() {
		err := p.nsqd.Main()
		if err != nil {
			p.Stop()
			os.Exit(1)
		}
	}()

	return nil
}

func (p *program) Stop() error {
	/*
		/*
	底层源码
	func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 1 {
		return
	}
	// Slow-path.
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

	可以看成这样的链式操作
	p.once.Do == program.once.Do

	确保在执行时只执行一次退出操作
	*/

	p.once.Do(func() {
		p.nsqd.Exit()
	})
	return nil
}

func logFatal(f string, args ...interface{}) {
	lg.LogFatal("[nsqd] ", f, args...)
}

  

Nsqd流程图

偷个懒,从网上摘录的流程图直接拿下来了大家先看看

小结

nsqd代码逻辑清晰,利用Go协程高效并发处理分布式多节点nsqd的生产和消费,学习并发处理nsqd是最佳项目,每行代码都值得学习,坚持读每一行代码相信大伙一定会受益匪浅的,等我们把这项目2000多行代码都读差不多了,在回头看看成长,绝对比看几本书学的快,学以致用,多动手,多练习。

下一章具体分析nsqd主程序。

文章来源:智云一二三科技

文章标题:golang nsq源码分析&添加中文注释系列(二):Nsqd入口主流程

文章地址:https://www.zhihuclub.com/101044.shtml

关于作者: 智云科技

热门文章

网站地图