以 1.2.0 版本为准.
/apps/nsqd/main.go 中的 Start 方法即是 nsqd daemon 的启动过程.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
   | func (p *program) Start() error {   opts := nsqd.NewOptions()   
 
     	nsqd, err := nsqd.New(opts) 	if err != nil { 		logFatal("failed to instantiate nsqd - %s", err) 	} 	p.nsqd = nsqd
  	err = p.nsqd.LoadMetadata() 	if err != nil { 		logFatal("failed to load metadata - %s", err) 	} 	err = p.nsqd.PersistMetadata() 	if err != nil { 		logFatal("failed to persist metadata - %s", err) 	}
  	go func() { 		err := p.nsqd.Main() 		if err != nil { 			p.Stop() 			os.Exit(1) 		}f 	}()
  	return nil }
  | 
 
在执行到 p.nsqd.Main 之前, nsqd 先调用了 LoadMetadata 和 PersistMetadata 方法, 以恢复上一次 nsqd 运行时保存的运行数据.
下面来看看 /nsqd/nsqd.go 中 Main 方法里的具体过程.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
   | func (n *NSQD) Main() error { 	ctx := &context{n}
  	exitCh := make(chan error) 	var once sync.Once 	exitFunc := func(err error) { 		once.Do(func() { 			if err != nil { 				n.logf(LOG_FATAL, "%s", err) 			} 			exitCh <- err 		}) 	}
  	n.tcpServer.ctx = ctx 	n.waitGroup.Wrap(func() { 		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) 	})
  	httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) 	n.waitGroup.Wrap(func() { 		exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) 	})
  	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { 		httpsServer := newHTTPServer(ctx, true, true) 		n.waitGroup.Wrap(func() { 			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) 		}) 	}
  	n.waitGroup.Wrap(n.queueScanLoop) 	n.waitGroup.Wrap(n.lookupLoop) 	if n.getOpts().StatsdAddress != "" { 		n.waitGroup.Wrap(n.statsdLoop) 	}
  	err := <-exitCh 	return err }
  | 
 
其中nsqd 创建了 tcpServer 和 httpServer. tcpServer 负责监听客户端的请求, httpServer 负责对外提供 api 服务. 如果 nsqd 启动时有配置 https, 也会创建 httpsServer.
另外还创建了 queueScanLoop 和 lookupLoop goroutine. queueScanLoop 是 nsqd 内部处理消息的定时任务, lookupLoop 是向 nsqlookupd 同步节点信息的定时任务.
如果 nsqd 启动时还配置了 -statsd-address 参数, 会创建 statsLoop 的 goroutine. 负责定时推送这个 nsqd 进程的内存信息.
以上这些服务都是以新的 goroutine 创建的, 这里采用了sync.WaitGroup这个同步工具. 这里 nsqd 对 WaitGroup 进行了一层包装.代码在
/internal/util/wait_group_wrapper.go.
1 2 3 4 5 6 7 8 9 10 11
   | type WaitGroupWrapper struct { 	sync.WaitGroup }
  func (w *WaitGroupWrapper) Wrap(cb func()) { 	w.Add(1) 	go func() { 		cb() 		w.Done() 	}() }
  |