nsqd源码分析之nsqd 进程的数据结构 (基于 1.2.0 版本)

1.2.0 版本为准.

涉及的数据结构

  • NSQD
  • Topic
  • Channel
  • Message
  • inFlightPqueue
  • PriorityQueue
  • diskQueue
  • dummyBackendQueue

以上数据结构的关系如下图:
nsqd 数据结构关系图

NSQD

/nsqd/nsqd.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
clientIDSequence int64

sync.RWMutex

opts atomic.Value

dl *dirlock.DirLock
isLoading int32
errValue atomic.Value
startTime time.Time

// 这里有 Topic
// one
topicMap map[string]*Topic

clientLock sync.RWMutex
clients map[int64]Client

lookupPeers atomic.Value

tcpServer *tcpServer
tcpListener net.Listener
httpListener net.Listener
httpsListener net.Listener
tlsConfig *tls.Config

poolSize int

notifyChan chan interface{}
optsNotificationChan chan struct{}
exitChan chan int
waitGroup util.WaitGroupWrapper

ci *clusterinfo.ClusterInfo
}

Topic

/nsqd/topic.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64
messageBytes uint64

sync.RWMutex

name string
channelMap map[string]*Channel
backend BackendQueue
memoryMsgChan chan *Message
startChan chan int
exitChan chan int
channelUpdateChan chan int
waitGroup util.WaitGroupWrapper
exitFlag int32
idFactory *guidFactory

ephemeral bool
deleteCallback func(*Topic)
deleter sync.Once

paused int32
pauseChan chan int

ctx *context
}

Channel

/nsqd/channel.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Channel represents the concrete type for a NSQ channel (and also
// implements the Queue interface)
//
// There can be multiple channels per topic, each with there own unique set
// of subscribers (clients).
//
// Channels maintain all client and message metadata, orchestrating in-flight
// messages, timeouts, requeuing, etc.
type Channel struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
requeueCount uint64
messageCount uint64
timeoutCount uint64

sync.RWMutex

topicName string
name string
ctx *context

backend BackendQueue

memoryMsgChan chan *Message
exitFlag int32
exitMutex sync.RWMutex

// state tracking
clients map[int64]Consumer
paused int32
ephemeral bool
deleteCallback func(*Channel)
deleter sync.Once

// Stats tracking
e2eProcessingLatencyStream *quantile.Quantile

// TODO: these can be DRYd up
deferredMessages map[MessageID]*pqueue.Item
deferredPQ pqueue.PriorityQueue
deferredMutex sync.Mutex
inFlightMessages map[MessageID]*Message
inFlightPQ inFlightPqueue
inFlightMutex sync.Mutex
}

Message

/nsqd/message.go

1
2
3
4
5
6
7
8
9
10
11
12
13
type Message struct {
ID MessageID
Body []byte
Timestamp int64
Attempts uint16

// for in-flight handling
deliveryTS time.Time
clientID int64
pri int64
index int
deferred time.Duration
}

Queue

inFlightPqueue

/nsqd/in_flight_pqueue.go

1
type inFlightPqueue []*Message

PriorityQueue

/internal/pqueue/pqueue.go

1
2
3
4
5
6
7
8
9
type Item struct {
Value interface{}
Priority int64
Index int
}

// this is a priority queue as implemented by a min heap
// ie. the 0th element is the *lowest* value
type PriorityQueue []*Item

BackendQueue

/nsqd/backend_queue.go

1
2
3
4
5
6
7
8
9
10
// BackendQueue represents the behavior for the secondary message
// storage system
type BackendQueue interface {
Put([]byte) error
ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Empty() error
}

diskQueue

/diskqueue.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms

// run-time state (also persisted to disk)
readPos int64
writePos int64
readFileNum int64
writeFileNum int64
depth int64

sync.RWMutex

// instantiation time metadata
name string
dataPath string
maxBytesPerFile int64 // currently this cannot change once created
minMsgSize int32
maxMsgSize int32
syncEvery int64 // number of writes per fsync
syncTimeout time.Duration // duration of time per fsync
exitFlag int32
needSync bool

// keeps track of the position where we have read
// (but not yet sent over readChan)
nextReadPos int64
nextReadFileNum int64

readFile *os.File
writeFile *os.File
reader *bufio.Reader
writeBuf bytes.Buffer

// exposed via ReadChan()
readChan chan []byte

// internal channels
depthChan chan int64
writeChan chan []byte
writeResponseChan chan error
emptyChan chan int
emptyResponseChan chan error
exitChan chan int
exitSyncChan chan int

logf AppLogFunc
}

dummyBackendQueue

/nsqd/dummy_backend_queue.go

1
2
3
type dummyBackendQueue struct {
readChan chan []byte
}

###

NSQ 处理的消息流被称为 Topic.每个Topic 可以对应多个 Channel.客户端通过 Channel 订阅消息 Message.一个 Topic 可以向多个 Channel 发布同一则 Message.

Topic和 Channel 都是即用即创建,当Producer 向NSQ 注册某个命名Topic 或某个 Consumer 订阅某个Topic 下的 Channel 时,nsqd进程才会创建对应的 Topic对象.

当客户端向 nsqd 进程订阅某个 topic 下的 channel 时,这个 channel 才会被 nsqd 进程创建.