RocketMQ Note

Consumer

  1. consumer数量 < Queue队列数,否则多出的consumer无法消费消息

  2. 负载均衡策略

AllocateMessageQueueAveragely(默认)

AllocateMessageQueueByMachineRoom

AllocateMessageQueueAveragelyByCircle

AllocateMessageQueueConsistentHash

  1. 集群模式下,queue都是只允许分配只一个实例。这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

  2. Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合

  3. 单Queue并行消费

  4. 尽量让consumer group下的consumer的数目和topic的queue的数目一致或成倍数关系。

  5. 在RocketMQ中,假如一个新消费组订阅了几个topic,按正常人或者正常业务的期望,新消费组应该从订阅topic的最后一个消息开始消费,但是实际情形不是如此,有时候新消费组会从这些topic的开头开始消费。这就是新消费组上线的风险点。

解决方案:

消费端配置。消费端集群消费时,消费端的默认配置是从topic的最后offset开始消费。具体配置代码在DefaultMQPullConsumerImpl的consumeFromWhere,CONSUME_FROM_LAST_OFFSET的含义是“一个新的订阅组第一次启动从队列的最后位置开始消费”,RocketMQ 3.2.6版本的代码注释中清晰的说明了,但是实际表现却不是如此。

  1. 消息消费模式有两种:集群消费(Clustering)和广播消费(Broadcasting)。默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。而广播消费消息会发给消费者组中的每一个消费者进行消费。

  2. 消息顺序(Message Order)有两种:顺序消费(Orderly)和并行消费(Concurrently)。顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

  3. 读老数据的问题

RocketMQ的数据是要落盘的,一般只有最新写入的数据才会在PageCache中。比如下游消费数据,因为一些原因停了一天之后,又突然起来消费数据。这个时候就需要读磁盘上的数据。然后RocketMQ的消息体是全部存储在一个append only的 commitlog 中的。如果这个集群中混杂了很多不同topic的数据的话,要读的两条消息就很有可能间隔很远。最坏情况就是一次磁盘IO读一条消息。这就基本等价于随机读取了。如果磁盘的IOPS(Input/Output Operations Per Second)扛不住,还会影响数据的写入,这个问题就严重了。
值得庆幸的是,RocketMQ提供了自动从Slave读取老数据的功能。这个功能主要由slaveReadEnable这个参数控制。默认是关的(slaveReadEnable = false bydefault)。推荐把它打开,主从都要开。这个参数打开之后,在客户端消费数据时,会判断,当前读取消息的物理偏移量跟最新的位置的差值,是不是超过了内存容量的一个百分比(accessMessageInMemoryMaxRatio= 40 by default)。如果超过了,就会告诉客户端去备机上消费数据。如果采用异步主从,也就是brokerRole等于ASYNC_AMSTER的时候,你的备机IO打爆,其实影响不太大。但是如果你采用同步主从,那还是有影响。所以这个时候,最好挂两个备机。因为RocketMQ的主从同步复制,只要一个备机响应了确认写入就可以了,一台IO打爆,问题不大。

  1. 过期数据删除

RocketMQ默认数据保留72个小时(fileReservedTime=72)。然后它默认在凌晨4点开始删过期数据(deleteWhen=”04”)。你可以设置多个值用分号隔开。因为数据都是定时删除的,所以在磁盘充足的情况,数据的最长保留会比你设置的还多一天。又由于默认都是同一时间,删除一整天的数据,如果用了机械硬盘,一般磁盘容量会比较大,需要删除的数据会特别多,这个就会导致在删除数据的时候,磁盘IO被打满。这个时候又要影响写入了。
为了解决这个问题,可以尝试多个方法,一个是设置文件删除的间隔,有两个参数可以设置,

  • deleteCommitLogFilesInterval = 100(毫秒)。每删除10个commitLog文件的时间间隔;
  • deleteConsumeQueueFilesInterval=100(毫秒)。每删除一个ConsumeQueue文件的时间间隔。
    另外一个就是增加删除频率,把00-23都写到deleteWhen,就可以实现每个小时都删数据。
  1. 索引

默认情况下,所有的broker都会建立索引(messageIndexEnable=true)。这个索引功能可以支持按照消息的uniqId,消息的key来查询消息体。索引文件实现的时候,本质上也就是基于磁盘的个一个hashmap。如果broker上消息数量比较多,查询的频率比较高,这也会造成一定的IO负载。所以我们的推荐方案是在Master上关掉了index功能,只在slave上打开。然后所有的index查询全部在slave上进行。当然这个需要简单修改一下MQAdminImpl里的实现。因为默认情况下,它会向Master发出请求。

  1. RabbitMQ 延迟消息的延迟极限是多少?

延迟 5000 毫秒。消息发送到 MQ 之后确实延迟了 5 秒之后才得到了消费,没有任何问题。
延迟 1 年( 31536000000 毫秒)。消息发送到 MQ 之后马上就被消费者消费了,完全没有延迟效果。
在 RabbitMQ 中,消息的过期时间必须是非负 32 位整数,即:0 <= n <= 2^32-1,以毫秒为单位。 其中,2^32-1 = 4294967295。可以尝试下面两个请求,分别设置延迟时间为 4294967295 何 4294967296:
curl localhost:8080/sendMessage?message=hello&delay=4294967295 和 curl localhost:8080/sendMessage?message=hello&delay=4294967296
可以发现,当延迟时间为 4294967295 毫秒的时候,延迟消息工作正常;当延迟时间为 4294967296 毫秒的时候,消息被直接消费,没有延迟效果。

Producer

  1. 每个Producer默认采用Roundbin方式轮训发送给每个Queue

  2. 可选模式:

Normal:普通,传统模式,不能保证消息的顺序一致性

Order:顺序,可以严格保证消息顺序进行消费

Transaction:事务

  1. 如何实现顺序消费

Producer发送消息到某个Topic的Queue-0,然后Consumer实现MessageOrder接口,指定去Queue-0获取数据,此时其他线程无法去该队列获取数据。

  1. 关于Topic的读写队列数量

读队列数量和写队列数量可以不一致:当我们使用updateTopic命令创建topic时,会发现新建的topic下会有默认的8个写对列和8个读对列(依赖于配置),并且读队列的数量和写队列的数量还可以不一致,这是为什么呢?难道在底层读写队列是在物理上分离的吗?抱着这个问题,我分析了相关的源代码,发现底层代码对于读写队列指的都是同一个队列,其中写队列的数量是针对的producer,读队列的数量针对的是consumer。

假设写队列有8个、读队列有4个,那么producer产生的消息会按轮训的方式写入到8个队列中,但是consumer却只能消费前4个队列,只有把读队列重新设置为8后,consumer可以继续消费后4个队列的历史消息;

假设写队列有4个、读队列有8个,那么producer产生的消息会按轮训的方式写入到4个队列中,但是consumer却能消费8个队列,只是后4个队列没有消息可以消费罢了。

  1. 集群方案

例如在2主2从异步架构下,a和b为master,as和bs为slaver,当a机宕机后,producer会将消息全部发往b机,consumer会消费as,b和bs上的消息,理论上只会丢失毫秒级别的消息,不会影响业务的正常使用。

MQ

store

  1. RocketMQ选择LinuxExt4文件系统,原因:Ext4文件系统删除1G大小的文件通常耗时小于50ms,而Ext3文件系统耗时约1s左右,且删除文件时,磁盘IO压力极大,会导致IO写入超时。

  2. 所有topic的数据混在一起进行存储,默认超过1G的话,则重新创建一个新的文件。消息的写入过程即写入该混杂的文件中,然后又有一个线程服务,在不断的读取分析该混杂文件,将消息进行分拣,然后存储在对应队列目录中(存储的是简要信息,如消息在混杂文件中的offset,消息大小等)

  3. Consumer将消费的offset定时存储到broker所在的机器上,这个broker优先是master,如果master挂了的话,则会选择slave来存储,broker也是将这些offset定时刷新到本地磁盘上,同时slave会定时的访问master来获取这些offset。

communication

  1. Producer和Name Server:每一个Producer会与Name Server集群中的一台机器建立TCP连接,会从这台Name Server上拉取路由信息。

  2. Producer和broker:Producer会和它要发送的topic相关的master类型的broker建立TCP连接,用于发送消息以及定时的心跳信息。broker中会记录该Producer的信息,供查询使用

  3. broker与Name Server:broker(不管是master还是slave)会和每一台Name Server机器来建立TCP连接。broker在启动的时候会注册自己配置的topic信息到Name Server集群的每一台机器中。即每一台Name Server都有该broker的topic的配置信息。master与master之间无连接,master与slave之间有连接

  4. Consumer和Name Server:每一个Consumer会和Name Server集群中的一台机器建立TCP连接,会从这台Name Server上拉取路由信息,进行负载均衡

  5. Consumer和broker:Consumer可以与master或者slave的broker建立TCP连接来进行消费消息,Consumer也会向它所消费的broker发送心跳信息,供broker记录。

broker 高可用

启动多个 Broker分组 形成 集群 实现高可用。Broker分组 = Master节点x1 + Slave节点xN。类似 MySQL,Master节点 提供读写服务,Slave节点 只提供读服务。

每个分组,Master节点 不断发送新的 CommitLog 给 Slave节点。 Slave节点 不断上报本地的 CommitLog 已经同步到的位置给 Master节点。

Broker分组 与 Broker分组 之间没有任何关系,不进行通信与数据同步。

消费进度 目前不支持 Master/Slave 同步。

集群内,Master节点 有两种类型:Master_SYNC、Master_ASYNC:前者在 Producer 发送消息时,等待 Slave节点 存储完毕后再返回发送结果,而后者不需要等待。

commitLog

commitLog是保存消息元数据的地方,所有消息到达Broker后都会保存到commitLog文件。

这里需要强调的是所有topic的消息都会统一保存在commitLog中,举个例子:当前集群有TopicA, TopicB,这两个Toipc的消息会按照消息到达的先后顺序保存到同一个commitLog中,而不是每个Topic有自己独立的commitLog。

每个commitLog大小上限为1G,满1G之后会自动新建CommitLog文件做保存数据用。

CommitLog的清理机制:按时间清理,rocketmq默认会清理3天前的commitLog文件;按磁盘水位清理,当磁盘使用量到达磁盘容量75%,开始清理最老的commitLog文件。

文件地址:/store/${commitlog}/${fileName}

ConsumerQueue

ConsumerQueue相当于CommitLog的索引文件,消费者消费时会先从ConsumerQueue中查找消息在commitLog中的offset,再去CommitLog中找元数据。如果某个消息只在CommitLog中有数据,没在ConsumerQueue中, 则消费者无法消费,Rocktet的事务消息就是这个原理。

consumequeue数据结构:消息在commitLog文件实际偏移量(commitLogOffset);消息大小;消息tag的哈希值

文件地址:/store/consumequeue/${topicName}/${queueId}/${fileName}

Refer

http://jm.taobao.org/2018/11/06/%E6%BB%B4%E6%BB%B4%E5%87%BA%E8%A1%8C%E5%9F%BA%E4%BA%8ERocketMQ%E6%9E%84%E5%BB%BA%E4%BC%81%E4%B8%9A%E7%BA%A7%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E6%9C%8D%E5%8A%A1%E7%9A%84%E5%AE%9E%E8%B7%B5/