go-nsq 源码阅读

NSQ 包括多个部分:nsqdnsqlookupdgo-nsq 等等,涉及到两个 git 仓库:

  1. NSQ - 包括服务端的 nsqdnsqlookupdnsqadmin 等等
  2. go-nsq - NSQ 的 Go 语言客户端,兼具生产者和消费者的功能

尽管生产者和消费者的行为天差地别,但是从整体来看,go-nsq 可以分为用户层、协议层和连接层,两种角色共享部分代码。

运行

下载源代码

git clone git@github.com:nsqio/nsq.git
git clone https://github.com/nsqio/go-nsq

编译

运行 NSQ 中的 test.sh 编译出可执行文件,目标文件位于 apps/<name>中。

nsq $ ./test.sh
...
building apps/nsq_stat
building apps/nsq_tail
building apps/nsq_to_file
building apps/nsq_to_http
building apps/nsq_to_nsq
building apps/nsqadmin
building apps/nsqd
building apps/nsqlookupd
building apps/to_nsq
building bench/bench_channels
building bench/bench_reader
building bench/bench_writer

启动

用第一个终端启动 nsqlookupd:

nsq $ apps/nsqlookupd/nsqlookupd 
[nsqlookupd] 2019/12/28 16:42:01.555209 INFO: nsqlookupd v1.2.1-alpha (built w/go1.13.4)
[nsqlookupd] 2019/12/28 16:42:01.556528 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2019/12/28 16:42:01.556584 INFO: HTTP: listening on [::]:4161

用第二个终端启动 nsqd:

nsq $ apps/nsqd/nsqd --lookupd-tcp-address=127.0.0.1:4160
[nsqd] 2019/12/28 16:43:25.409678 INFO: nsqd v1.2.1-alpha (built w/go1.13.4)
[nsqd] 2019/12/28 16:43:25.409906 INFO: ID: 151
[nsqd] 2019/12/28 16:43:25.454968 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/12/28 16:43:25.462583 INFO: TCP: listening on [::]:4150
[nsqd] 2019/12/28 16:43:25.462660 INFO: LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2019/12/28 16:43:25.462676 INFO: LOOKUP connecting to 127.0.0.1:4160
[nsqd] 2019/12/28 16:43:25.464903 INFO: HTTP: listening on [::]:4151

nsqlookupd 提供了目录服务,消费者可以在其中查找提供特定的主题的 nsqd 实例的地址。

生产者

// p.go
package main

import (
	"github.com/nsqio/go-nsq"
	"log"
)

func main() {
	producer, err := nsq.NewProducer("localhost:4150", nsq.NewConfig())
	if err != nil {
		log.Fatal(err)
	}

	messageBody := []byte("hello world")
	topicName := "topic"

	err = producer.Publish(topicName, messageBody)
	if err != nil {
		log.Fatal(err)
	}

	producer.Stop()
}

消费者

// c.go
package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"log"
	"sync"
)

type myMessageHandler struct{}
var wg sync.WaitGroup

func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
	if len(m.Body) == 0 {
		return nil
	}
	fmt.Println(string(m.Body))
	wg.Done()
	return nil
}

func main() {
	consumer, err := nsq.NewConsumer("topic", "channel", nsq.NewConfig())
	if err != nil {
		log.Fatal(err)
	}

	wg.Add(1)
	consumer.AddHandler(&myMessageHandler{})

	err = consumer.ConnectToNSQLookupd("localhost:4161")
	if err != nil {
		log.Fatal(err)
	}

	wg.Wait()
	consumer.Stop()
}

先运行 go run p.go 发布消息到 nsqd,然后运行 go run c.go 即可接收到先前发布的消息。

用户层

生产者与消费者拥有相同的协议格式,但是使用不同的命令,有不同的行为,所以涉及到不同的方法。

生产者

从用户的角度来看,发布消息的第一步,是调用 producer#NewProducer(address, config) 函数,提供 nsqd 的地址和配置项,去创建生产者对象,这个对象负责发布消息和接收结果。

获取到 producer 之后,用户可以使用其提供的 Publish() 以及一系列变种方法向 nsqd 发布消息。

  1. Publish - 同步发布
  2. PublishAsync - 异步发布
  3. MultiPublish - 同步发布多条消息
  4. MultiPublishAsync - 异步发布多条消息
  5. DeferredPublish - 同步发布延迟消息
  6. DeferredPublishAsync - 异步发布延迟消息

时序图1

同步的方法实质上是对异步的一个封装,在同步的方法中会创建接收结果的通道,并且等到结果返回之后才会结束方法。以 PublishPublishAsync为例,调用栈分别是:

  • Publish -> sendCommand -> sendCommandAsync
  • PublishAsync -> sendCommandAsync

最终都是调用到 sendCommandAsync

func (w *Producer) sendCommand(cmd *Command) error {
	doneChan := make(chan *ProducerTransaction)
	err := w.sendCommandAsync(cmd, doneChan, nil)
	if err != nil {
		close(doneChan)
		return err
	}
	t := <-doneChan
	return t.Error
}

producer 发现 TCP 连接尚未建立时,会创建并初始化 conn,并且会启动 readLoop 协程用于读取结果。接着将消息的发布请求通过transactionChan 通道发送到 router 协程,由此协程将命令写入到 TCP 流中。

在未来的某一时刻,readLoop 协程读取到数据之后,再将结果交回到 router 协程,后者将数据放入此请求的结果接收通道中。

消费者

消费者角色使用 nsq 的第一步是到 nsqlookupd 查找并订阅对应 TopicChannelnsqd

时序图2

成功订阅之后,每次调用 addHandler 会启动一个 handlerLoop 协程,readLoop 协程读取并解码 TCP 流中的消息,并且发送到handlerLoop 协程。如果消息处理成功, 则创建 FINISH 命令,如果消息处理失败,则创建 REQ 命令,交由 writeLoop 协程将命令写入 TCP 连接中。

协议层

NSQ 使用类似 memcached协议,协议可以分为两部分:

  1. 生产者/消费者到 nsqd
  2. nsqd 到生产者/消费者

第一部分用于客户端向服务器的通讯,就是各种各样的命令+数据,例如消费者向 nsqd 订阅 topic/channel 的 SUB 命令

SUB <topic_name> <channel_name>\n

<topic_name> - a valid string (optionally having #ephemeral suffix)
<channel_name> - a valid string (optionally having #ephemeral suffix)

成功时响应:

OK

失败时响应:

E_INVALID
E_BAD_TOPIC
E_BAD_CHANNEL

又例如生产者向 nsqd 发布消息的 PUB 命令

PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]

<topic_name> - a valid string (optionally having #ephemeral suffix)

成功时响应:

OK

失败时响应:

E_INVALID
E_BAD_TOPIC
E_BAD_MESSAGE
E_PUB_FAILED

第二部分用于服务器向客户端通讯,数据的格式为:

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size     frame type     data

其中 data 部分又有具体的格式:

[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
|       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
|       8-byte         ||    ||                 16-byte                      || N-byte
------------------------------------------------------------------------------------------...
  nanosecond timestamp    ^^                   message ID                       message body
                       (uint16)
                        2-byte
                       attempts

连接层

同样地,最底层的连接层也是生产者/消费者通用的,使用 net 包提供的 TCP 能力创建连接,然后在此连接上进行读写操作。

遗憾

生产者获取 nsqd 的地址

在 demo 阶段,这样的代码是没有大问题的,但是在实际的生产环境中,把nsqd地址写在代码中就不太妥当了,我们需要更灵活的方式获取此地址。

的确,可以将多个地址保存在本地配置文件中,但是本地配置文件缺乏灵活性,这样就牺牲了横向拓展的可能性,得不偿失。

好消息是,nsqd 启动之后会向 nsqlookupd 注册,而且 nsqlookupd 的地址应该是相对固定的。所以最好的方法是,利用 nsqlookupd 服务器提供的 get /nodes 获取所有目前可用的 nsqd地址,然后选择一个符合要求的地址。

~ $ curl http://127.0.0.1:4161/nodes | fx .
{
  "producers": [
    {
      "remote_address": "127.0.0.1:54927",
      "hostname": "nulls_rmbp",
      "broadcast_address": "nulls_rmbp",
      "tcp_port": 4180,
      "http_port": 4181,
      "version": "1.2.1-alpha",
      "tombstones": [],
      "topics": []
    },
    {
      "remote_address": "127.0.0.1:54934",
      "hostname": "nulls_rmbp",
      "broadcast_address": "nulls_rmbp",
      "tcp_port": 4190,
      "http_port": 4191,
      "version": "1.2.1-alpha",
      "tombstones": [],
      "topics": []
    },
    {
      "remote_address": "127.0.0.1:54916",
      "hostname": "nulls_rmbp",
      "broadcast_address": "nulls_rmbp",
      "tcp_port": 4150,
      "http_port": 4151,
      "version": "1.2.1-alpha",
      "tombstones": [],
      "topics": []
    }
  ]
}