nsqd源码分析之 nsqd 进程的启动过程分析(基于 1.2.0 版本)

1.2.0 版本为准.

/apps/nsqd/main.go 中的 Start 方法即是 nsqd daemon 的启动过程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (p *program) Start() error {
opts := nsqd.NewOptions()

// ...
// ...

nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd

err = p.nsqd.LoadMetadata()
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}

go func() {
err := p.nsqd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}f
}()

return nil
}

在执行到 p.nsqd.Main 之前, nsqd 先调用了 LoadMetadataPersistMetadata 方法, 以恢复上一次 nsqd 运行时保存的运行数据.

下面来看看 /nsqd/nsqd.goMain 方法里的具体过程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (n *NSQD) Main() error {
ctx := &context{n}

exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}

n.tcpServer.ctx = ctx
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})

httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})

if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}

n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}

err := <-exitCh
return err
}

其中nsqd 创建了 tcpServerhttpServer. tcpServer 负责监听客户端的请求, httpServer 负责对外提供 api 服务. 如果 nsqd 启动时有配置 https, 也会创建 httpsServer.

另外还创建了 queueScanLooplookupLoop goroutine. queueScanLoop 是 nsqd 内部处理消息的定时任务, lookupLoop 是向 nsqlookupd 同步节点信息的定时任务.

如果 nsqd 启动时还配置了 -statsd-address 参数, 会创建 statsLoop 的 goroutine. 负责定时推送这个 nsqd 进程的内存信息.

以上这些服务都是以新的 goroutine 创建的, 这里采用了sync.WaitGroup这个同步工具. 这里 nsqd 对 WaitGroup 进行了一层包装.代码在
/internal/util/wait_group_wrapper.go.

1
2
3
4
5
6
7
8
9
10
11
type WaitGroupWrapper struct {
sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}