RocketMQ
RocketMQ
课程约定
资料格式
配置文件:
1 | <groupId>com.itheima</groupId> |
Java代码:
1 | Statement stat = con.createStatement(); |
示例:
1 | <groupId>com.itheima</groupId> |
命令:
1 | mvn test |
1. MQ简介
1.1 项目工程弊端
1.2 MQ简介
MQ(Message Queue)消息队列,是一种用来保存消息数据的队列.
队列:数据结构的一种,特征为 “先进先出”
何为消息
- 服务器间的业务请求
- 原始架构:
- 服务器中的A功能需要调用B、C模块才能完成
- 微服务架构:
- 服务器A向服务器B发送要执行的操作(视为消息)
- 服务器A向服务器C发送要执行的操作(视为消息)
- 原始架构:
- 服务器间的业务请求
小节:MQ概念
1.3 MQ作用
` 1. 应用解耦
2. 快速应用变更维护
3. 流量削锋
1.4 MQ基本工作模式
应用解耦:(异步消息发送)
快速应用变更维护:(异步消息发送)
流量削锋:(异步消息发送)
1.5 MQ优缺点分析
优点(作用):
- 应用解耦
- 快速应用变更维护
- 流量削锋
缺点:
1. 系统可用性降低
2. 系统复杂度提高
3. 异步消息机制
1. 消息顺序性
2. 消息丢失
3. 消息一致性
4. 消息重复使用
1.6 MQ产品介绍
- ActiveMQ
- java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
- RabbitMQ
- erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
- RocketMQ
- java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
- kafka
- scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
RocketMQ
RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会 作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛 的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿 级,峰值TPS达到5600万)
解决所有缺点
2. 环境搭建
2.1 基础概念
- 生产者
- 消费者
- 消息服务器
- 命名服务器
- 消息
- 主题
- 标签
- 心跳
- 监听器
- 拉取消费、推动消费
- 注册
2.2 安装
- 命名服务器
- 消息服务器
2.3 下载
2.4 安装过程
步骤1:安装JDK(1.8)
步骤2:上传压缩包(zip)
1
yum -y install lrzsz
1
rz
步骤3:解压缩
1
unzip rocketmq-all-4.5.2-bin-release.zip
步骤4:修改目录名称
1
mv rocketmq-all-4.5.2-bin-release rocketmq
2.5 启动服务器
- 步骤1:启动命名服务器(bin目录下)
1 | sh mqnamesrv |
步骤2:启动消息服务器(bin目录下)
1
sh mqbroker -n localhost:9876
修改 runserver.sh文件中有关内存的配置(调整的与当前虚拟机内存匹配即可,推荐256m-128m)
1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m
修改runbroker.sh文件中有关内存的配置(调整的与当前虚拟机内存匹配即可,推荐256m-128m)
1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xms256m -Xmn128m"
同时修改文件内74行的数据,改为以下内容:
1
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=256m"
2.6 测试服务器环境
步骤1:配置命名服务器地址
1
export NAMESRV_ADDR=localhost:9876
步骤2:启动生产者程序客户端(bin目录下)
1
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
启动后产生大量日志信息(注意该信息是测试程序中自带的,不具有通用性,仅供学习查阅参考)
步骤3:启动消费者程序客户端(bin目录下)
1
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
启动后产生大量日志信息
3. 消息发送(重点)
3.1 主要内容
- 基于Java环境构建消息发送与消息接收基础程序
- 单生产者单消费者
- 单生产者多消费者
- 多生产者多消费者
- 发送不同类型的消息
- 同步消息
- 异步消息
- 单向消息
- 特殊的消息发送
- 延时消息
- 批量消息
- 特殊的消息接收
- 消息过滤
- 消息发送与接收顺序控制
- 事务消息
3.2 消息发送与接收开发流程
- 谁来发?
- 发给谁?
- 怎么发?
- 发什么?
- 发的结果是什么?
- 打扫战场
3.3 单生产者单消费者消息发送(OneToOne)
导入RocketMQ客户端坐标
1
2
3
4
5<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
3.4 单生产者单消费者消息发送(OneToOne)
- 生产者
1 | //1.创建一个发送消息的对象Producer |
注意:关闭服务器防火墙
1
systemctl stop firewalld.service
3.5 单生产者单消费者消息发送(OneToOne)
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.184.128:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1","*");
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt msg : list){
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}});
//4.启动接收消息的服务
consumer.start();
3.6 单生产者多消费者消息发送(OneToMany)
消费者(负载均衡模式:默认模式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.184.128:9876");
consumer.subscribe("topic1","*");
//设置当前消费者的消费模式(默认模式:负载均衡)
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt msg : list){
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
3.7 单生产者多消费者消息发送(OneToMany)
消费者(广播模式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.184.128:9876");
consumer.subscribe("topic1","*");
//设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt msg : list){
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
3.8 多生产者多消费者消息发送(ManyToMany)
- 多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
3.9 小节
- 消息发送
- One-To-One(基础发送与基础接收)
- One-To-Many(负载均衡模式与广播模式)
- Many-To-Many
3.10 消息类别
- 同步消息
- 异步消息
- 单向消息
3.11 同步消息
特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
3.12 异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
3.13 单向消息
特征:不需要有回执的消息,例如日志类消息
3.14 单向消息
同步消息
1
SendResult result = producer.send(msg);
异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14producer.send(msg, new SendCallback() {
//表示成功返回结果
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示发送消息失败
public void onException(Throwable t) {
System.out.println(t);
}
});
//如果加了关闭producer的话 一定要在关闭前加个线程休眠 否则执行到回调函数 producer已经关了 获取不到回调的数据
Thread.sleep(2000);
producer.shutdown();单向消息
1
producer.sendOneway(msg);
3.15 延时消息
消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
1
2
3
4
5Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
//设置当前消息的延时效果
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);目前支持的消息时间
- 秒级:1,5,10,30
- 分级:1~10,20,30
- 时级:1,2
- 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
3.16 批量消息
发送批量消息
1
2
3
4
5List<Message> msgList = new ArrayList<Message>();
//注:批量发送消息时 一个list只能存放一样的topic 例如 一个list不能既存topic1 又存topic2
msgList.add(new Message("topic2","hello1".getBytes()));
msgList.add(new Message("topic2","hello2".getBytes()));
SendResult send = producer.send(msgList);消息内容总长度不超过4M
消息内容总长度包含如下:
- topic(字符串字节数)
- body (字节数组长度)
- 消息追加的属性(key与value对应字符串字节数)
- 日志(固定20字节)
3.17 消息过滤
分类过滤
生产者
1
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2" 标题 Tag ).getBytes("UTF-8"));
消费者
1
2//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
语法过滤(属性过滤/语法过滤/SQL过滤)
生产者
1
2
3//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");消费者
1
2//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能
1
enablePropertyFilter=true
启动服务器使启用对应配置文件
1
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
3.18 错乱的消息顺序
3.19 顺序消息
1.实体类(Order)
1 | package com.itheima.order.domain; |
发送消息(producer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102package com.itheima.order;
import com.itheima.order.domain.Order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.ArrayList;
import java.util.List;
//测试顺序消息
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
producer.start();
//创建要执行的业务队列
List<Order> orderList = new ArrayList<Order>();
Order order11 = new Order();
order11.setId("a");
order11.setMsg("主单-1");
orderList.add(order11);
Order order12 = new Order();
order12.setId("a");
order12.setMsg("子单-2");
orderList.add(order12);
Order order13 = new Order();
order13.setId("a");
order13.setMsg("支付-3");
orderList.add(order13);
Order order14 = new Order();
order14.setId("a");
order14.setMsg("推送-4");
orderList.add(order14);
Order order21 = new Order();
order21.setId("b");
order21.setMsg("主单-1");
orderList.add(order21);
Order order22 = new Order();
order22.setId("b");
order22.setMsg("子单-2");
orderList.add(order22);
Order order31 = new Order();
order31.setId("c");
order31.setMsg("主单-1");
orderList.add(order31);
Order order32 = new Order();
order32.setId("c");
order32.setMsg("子单-2");
orderList.add(order32);
Order order33 = new Order();
order33.setId("c");
order33.setMsg("支付-3");
orderList.add(order33);
//设置消息进入到指定的消息队列中
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();
}
}
//设置消息进入到指定的消息队列中
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
3.20 顺序消息
接收消息(consumer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30package com.itheima.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.184.128:9876");
consumer.subscribe("orderTopic","*");
//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println(Thread.currentThread().getName()+" 消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("接收消息服务已开启运行");
}
}
3.21 事务消息
- 正常事务过程
- 事务补偿过程
3.22 事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
- 注意:事务消息仅与生产者有关,与消费者无关
3.23 事务消息
1 | //事务消息使用的生产者是TransactionMQProducer |
4. 集群搭建
4.1 RocketMQ集群分类
- 单机
- 一个broker提供服务(宕机后服务瘫痪)
- 集群
- 多个broker提供服务(单机宕机后消息无法及时被消费)
- 多个master多个slave
- master到slave消息同步方式为同步(较异步方式性能略低,消息无延迟)
- master到slave消息同步方式为异步(较同步方式性能略高,数据略有延迟)
4.2 RocketMQ集群特征
RocketMQ集群工作流程
- 步骤1:NameServer启动,开启监听,等待broker、producer与consumer连接
- 步骤2:broker启动,根据配置信息,连接所有的NameServer,并保持长连接
- 步骤2补充:如果broker中有现存数据, NameServer将保存topic与broker关系
- 步骤3:producer发信息,连接某个NameServer,并建立长连接
- 步骤4:producer发消息
- 步骤4.1若果topic存在,由NameServer直接分配
- 步骤4.2如果topic不存在,由NameServer创建topic与broker关系,并分配
- 步骤5:producer在broker的topic选择一个消息队列(从列表中选择)
- 步骤6:producer与broker建立长连接,用于发送消息
- 步骤7:producer发送消息
comsumer工作流程同producer
双主双从集群搭建:
配置服务器环境:
1
vim /etc/hosts
1
2
3
4
5
6
7
8
9# nameserver
192.168.184.128 rocketmq-nameserver1
192.168.184.129 rocketmq-nameserver2
# broker
192.168.184.128 rocketmq-master1
192.168.184.129 rocketmq-slave2
192.168.184.129 rocketmq-master2
192.168.184.128 rocketmq-slave1配置完毕后重启网卡,应用配置
1 | systemctl restart network |
关闭防火墙或者开发指定端口对外提供服务
1
2
3
4
5
6# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service开启指定端口对外提供服务
1
2
3
4
5
6
7
8# 开放name server默认端口
firewall-cmd --remove-port=9876/tcp --permanent
# 开放master默认端口
firewall-cmd --remove-port=10911/tcp --permanent
# 开放slave默认端口 (当前集群模式可不开启)
firewall-cmd --remove-port=11011/tcp --permanent
# 重启防火墙
firewall-cmd --reload配置服务器环境
1
vim /etc/profile
1
2
3
4#set rocketmq
ROCKETMQ_HOME=/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH配置完毕后重启网卡,应用配置
1
source /etc/profile
创建集群服务器的数据存储目录
1
2
3
4mkdir /rocketmq/store
mkdir /rocketmq/store/commitlog
mkdir /rocketmq/store/consumequeue
mkdir /rocketmq/store/index注意master与slave如果在同一个虚拟机中部署,需要将存储目录区分开
broker-a.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128broker-b-s.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store-slave
#commitLog 存储路径
storePathCommitLog=/rocketmq/store-slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store-slave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store-slave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store-slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128检查启动内存
1
vim /rocketmq/bin/runbroker.sh
1
2# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"启动服务器(在bin目录下依次启动)
1
2nohup sh mqnamesrv &
1
2nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
1
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
broker-b.properties
1 | #所属集群名字 |
broker-a-s.properties
1 | #所属集群名字 |
1 | nohup sh mqnamesrv & |
1 | nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties & |
1 | nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties & |
4.3 rocketmq-console集群监控平台搭建
- incubator-rocketmq-externals是一个基于rocketmq的基础之上扩展开发的开源项目
- 获取地址:https://github.com/apache/rocketmq-externals
- rocketmq-console是一款基于java环境开发的(springboot)的管理控制台工具
5. 高级特性(重点)
5.1 消息的存储
- 消息生成者发送消息到MQ
- MQ返回ACK给生产者
- MQ push 消息给对应的消费者
- 消息消费者返回ACK给MQ
说明:ACK(Acknowledge character)
5.2 消息的存储
- 消息生成者发送消息到MQ
- MQ收到消息,将消息进行持久化,存储该消息
- MQ返回ACK给生产者
- MQ push 消息给对应的消费者
- 消息消费者返回ACK给MQ
- MQ删除消息
注意:
- 第⑤步MQ在指定时间内接到消息消费者返回ACK,MQ认定消息消费成功,执行⑥
- 第⑤步MQ在指定时间内未接到消息消费者返回ACK,MQ认定消息消费失败,重新执行④⑤⑥
5.3 消息的存储介质
- 数据库
- ActiveMQ
- 缺点:数据库瓶颈将成为MQ瓶颈
- 文件系统
- RocketMQ/Kafka/RabbitMQ
- 解决方案:采用消息刷盘机制进行数据存储
- 缺点:硬盘损坏的问题无法避免
5.4 高效的消息存储与读写方式
SSD(Solid State Disk)
随机写(100KB/s)
顺序写 (600MB/s)1秒1部电影
Linux系统发送数据的方式
“零拷贝”技术
- 数据传输由传统的4次复制简化成3次复制,减少1次复制过程
- Java语言中使用MappedByteBuffer类实现了该技术
- 要求:预留存储空间,用于保存数据(1G存储空间起步)
5.5 消息存储结构
- MQ数据存储区域包含如下内容
- 消息数据存储区域
- topic
- queueId
- message
- 消费逻辑队列
- minOffset
- maxOffset
- consumerOffset
- 索引
- key索引
- 创建时间索引
- 消息数据存储区域
5.6 刷盘机制
同步刷盘
生产者发送消息到MQ,MQ接到消息数据
MQ挂起生产者发送消息的线程
MQ将消息数据写入内存
内存数据写入硬盘
磁盘存储后返回SUCCESS
MQ恢复挂起的生产者线程
发送ACK到生产者
异步刷盘
- 生产者发送消息到MQ,MQ接到消息数据
- MQ将消息数据写入内存
- 发送ACK到生产者
- 同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
- 异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)
配置方式
1 | #刷盘方式 |
5.7 高可用性
- nameserver
- 无状态+全服务器注册
- 消息服务器
- 主从架构(2M-2S)
- 消息生产
- 生产者将相同的topic绑定到多个group组,保障master挂掉后,其他master仍可正常进行消 息接收
- 消息消费
- RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙 时候,自动切换由slave承担数据读取的工作
5.8 主从数据复制
同步复制
- master接到消息后,先复制到slave,然后反馈给生产者写操作成功
- 优点:数据安全,不丢数据,出现故障容易恢复
- 缺点:影响数据吞吐量,整体性能低
异步复制
- master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave
- 优点:数据吞吐量大,操作延迟低,性能高
- 缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失
配置方式
1
2
3
4
5#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
5.9 负载均衡
- Producer负载均衡
- 内部实现了不同broker集群中对同一topic对应消息队列的负载均衡
- Consumer负载均衡
- 平均分配
- 循环平均分配
- 广播模式(不参与负载均衡)
5.10 消息重试
- 当消息消费后未正常返回消费成功的信息将启动消息重试机制
- 消息重试机制
- 顺序消息
- 无序消息
5.10.1 顺序消息重试
- 当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为 1 秒)
- 注意:应用会出现消息消费被阻塞的情况,因此,要对顺序消息的消费情况进行监控,避免阻塞现象的发生
5.10.2 无序消息重试
- 无序消息包括普通消息、定时消息、延时消息、事务消息
- 无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费
- 为保障无序消息的消费,MQ设定了合理的消息重试间隔时长
5.11 死信队列
- 当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
- 死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)\
- 死信队列特征
- 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
- 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
- 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
- 死信队列中消息特征
- 不会被再次重复消费
- 死信队列中的消息有效期为3天,达到时限后将被清除
5.12 死信处理
- 在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费
5.13 消息重复消费
- 消息重复消费原因
- 生产者发送了重复的消息
- 网络闪断
- 生产者宕机
- 消息服务器投递了重复的消息
- 网络闪断
- 动态的负载均衡过程
- 网络闪断/抖动
- broker重启
- 订阅方应用重启(消费者)
- 客户端扩容
- 客户端缩容
- 生产者发送了重复的消息
5.14 消息幂等
- 对同一条消息,无论消费多少次,结果保持一致,称为消息幂等性
- 解决方案
- 使用业务id作为消息的key
- 在消费消息时,客户端对key做判定,未使用过放行,使用过抛弃
- 注意:messageId由RocketMQ产生,messageId并不具有唯一性,不能作用幂等判定条件