新闻媒体自出现以来,承担着传递消息的功能,或社会事实,或明星花边,或人物访谈。无论是民国时期传播新思潮的纸媒,还是借助互联网崛起的新媒体,虽然有不同的承载媒介,却有类似的工作流程,都要经历“来源-加工-发布”的工作流,才能呈现给读者。

在编程领域,有很多需求需要用到类似的工作流。例如新用户注册后,需要发送注册邮件或者短信通知,用户注册的功能通常属于一个进程,而发送邮件、短信通知的功能常常位于另一个进程甚至服务器中,就需要,通知服务进程(订阅者)去中转中心订阅信息,注册服务进程(生产者)将信息发布到中转中心,从而实现通讯。在这里,前者相当于记者、编辑,中转中心是报社,而后者则看作是读者。

同样地,电子商务等项目中的订单系统与库存、秒杀和日志系统,都需要用到这种消息发布、订阅的机制,目的在于异步处理、应用解耦、流量削峰和消息通讯。

在 Go 语言中,这样的基础设施由 NSQ 提供,消息的生产者和订阅者只需要使用 NSQ提供的对应语言的客户端,即可实现消息的传递,而不用操心具体的实现细节。

来源

话题(Topic)

新闻来源可能是电话、短信或者更经典的书信,经过记者、编辑的多方采集、交叉认证,写成文章,放入到不同的版面中。类似地,NSQ 提供两种发布消息的方式,一是通过 TCP 连接,使用命令式的自定义协议,二是通过 HTTP 请求,使用者通过这两种方式连接到消息的中转中心 —— nsqd

HTTP 请求方式除了简略地提供发布消息的功能之外,还可以用于统计数据和调试。以发布单条消息为例,生产者发送 POST http://host:port/pub?topic=topicName HTTP 请求到 nsqd,将消息内容放在 body 中。

post /pub?topic=topicName HTTP/1.1
Host: host:port

raw message bytes

TCP 连接提供消息队列可能需要的所有命令,无论是生产者还是消费者,通过 TCP 连接与 nsqd 建立连接。同样以发布单条消息为例,生产者通过已建立的连接,按照协议格式将 topicNameraw message bytes 发送到 nsqd

PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]

<topic_name> - a valid string (optionally having #ephemeral suffix)

就如报社一样,无论是通过何种渠道获得的消息,到达之后都需要经过整理加工,写成文章。nsqd 接收到消息之后,取出需要的数据,组装成 Message

func NewMessage(id MessageID, body []byte) *Message {
	return &Message{
		ID:        id,
		Body:      body,
		Timestamp: time.Now().UnixNano(),
	}
}

加工

就如报社有专栏、印刷多份一样,专注消息中转领域的 NSQ 也有同样的两个概念:话题(Topic)和频道(Channel)。话题归类类型相同的消息,频道用于将消息送达多个消费者。

话题(Topic)

报社的主编在编辑写好新闻之后,会根据新闻的内容,决定将文章放在哪个栏目中发表。前文提到的两种发布消息的方式,虽然所用技术不同,数据的格式也有很大差异,但是都有两个关键的信息 —— “topicName” 和 “raw message bytes”。后者是消息要传递的内容(新闻本体),前者则是决定消息去向(专栏名)。

我们周围每时每刻都会发生大量值得报道的新闻,报社根据新闻的轻重缓急,确定文章发布到专栏的时间,疫情等热点新闻可能先上,书评等可以延后一些时日。 对于 nsqd 整个系统而言,处于下游的消息消费者可能无法及时处理所有消息,就需要一种机制,将超出消费者承受能力的消息暂存起来。

消息到达 nsqd 后,将会被保存在一个有缓存的通道memoryMsgChan)中,也就是保存在内存中。虽然内存存取速度快,但是容量有限,这时候就需要价格较为便宜且容量更大的外部存储,来弥补内存的不足。NSQ 使用的是本地磁盘来做这部分的存储,不过理论上,只要是满足接口定义的外部存储,即使是远在千里之外的数据中心,都可以用于保存这部分额外的信息。

type BackendQueue interface {
	Put([]byte) error
	ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

外部存储不能简单地理解成内存的备胎,在 NSQ 中它们是平等的地位。在暂存消息时,程序会先判断内存通道是否已满,优先使用速度更快的内存通道,然后才考虑将消息缓存到外部存储。在取出消息的时候,程序会随机地从两条通道中取一条数据。这种规则,会导致部分后来的消息反而先得到处理,不过总体来说,所有的消息最终都会得到消费者的处理。

// 暂存时,优先考虑内存,再考虑外部存储
// 代码有精简
func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
	}
	return nil
}
// 随机地从两条通道中取一条数据
// 代码有精简
func (t *Topic) messagePump() {
	for {
		select {
		case msg = <-memoryMsgChan:
		case buf = <-backendChan:
			msg, err = decodeMessage(buf)
		}

		for i, channel := range chans {
			chanMsg := msg
			err := channel.PutMessage(chanMsg)
		}
	}

频道(Channel)

报社需要收入来生存发展,就需要尽可能地让报纸的销量更好,将同样的内容多次售卖,一份只印刷一份的报纸从财务角度来看是失败的。在编程领域,有大量的场景会出现一条消息有多个消费者的情况,例如根据订单扣除库存的时候,将这部分操作记录在日志中,以便日后查找问题。单台机器的性能终归有极限,当某种消息非常多的时候,我们希望可以部署多台一样的机器,横向拓展实现负载均衡,提供整体的处理能力。

频道的存在即是为了满足这个需求。不过,虽然和 Go 语言中的通道是同样的英文单词(channel),但它们是风牛马不相及,完全是不同的东西。

如果一条消息有多个消费者,根据墨菲定律,就必然会存在一个动作缓慢的消费者,拖累整个系统的处理效率。为了避免这种情况,消息从话题出来时到频道的时候,每个频道都会复制一次消息,得到一份完全一样的副本。

消息进入频道之后,会经过与话题结构一致的两条通道,即使遇到消费者不能及时处理消息的情况,也能保证消息有暂时的去处。如果频道只有一个订阅者,这个订阅者最终会接收到所有的消息。如果频道有多个订阅者,这些订阅者会以相同的概率收到消息,以实现负载均衡。

复制和两条通道

发布

消息交到消费者之后,会出现几种情况:

  1. 在规定的时间完成处理任务
  2. 处理时间超过了规定的时间
  3. 消费者不能正确处理消息

为了处理这些情况,保证每一条消息都能得到正确的处理,NSQ 为已发送到消费者但未确认完成的消息,引入了一个 InFlight 的概念。超过一定时间消费者仍然汇报消息处理结果,即可认为消费者没有正确处理消息(或许是崩溃了),NSQ 将此消息重新放入频道中,以便能够得到更多处理的机会。如果消费者处理失败,也需要向NSQ发送一个 REQ 请求,将此消息重新放入频道中。

在大多数情况下,消费者能够在未超时的情况下完成任务,则发送 FIN 请求告知 NSQ 此消息已被正确处理。

//随机从两条通道中取出发送消息到消费者,并且记录为 InFlight 状态
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
		select {
		case b := <-backendMsgChan:
			msg, err := decodeMessage(b)
			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
      case msg := <-memoryMsgChan:
         subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
		}
	}
}