RocketMQ

课程约定

资料格式

配置文件:

1
<groupId>com.itheima</groupId>

Java代码:

1
Statement stat = con.createStatement();

示例:

1
<groupId>com.itheima</groupId>

命令:

1
mvn test

1. MQ简介

1.1 项目工程弊端

image-20201211144649579

1.2 MQ简介

  1. MQ(Message Queue)消息队列,是一种用来保存消息数据的队列.

    1. 队列:数据结构的一种,特征为 “先进先出”

      image-20201211144854222

    2. 何为消息

      1. 服务器间的业务请求
        1. 原始架构:
          1. 服务器中的A功能需要调用B、C模块才能完成
        2. 微服务架构:
          1. 服务器A向服务器B发送要执行的操作(视为消息)
          2. 服务器A向服务器C发送要执行的操作(视为消息)
  2. 小节:MQ概念

1.3 MQ作用

` 1. 应用解耦

2. 快速应用变更维护
3. 流量削锋

1.4 MQ基本工作模式

image-20201211145144063

应用解耦:(异步消息发送)

image-20201211145528832

image-20201211145540523

image-20201211145546139

快速应用变更维护:(异步消息发送)

image-20201211151126206

流量削锋:(异步消息发送)

1.5 MQ优缺点分析

优点(作用):

  1. 应用解耦
  2. 快速应用变更维护
  3. 流量削锋

缺点:

1. 系统可用性降低
2. 系统复杂度提高
3. 异步消息机制
      1. 消息顺序性
      2. 消息丢失
      3. 消息一致性
      4. 消息重复使用

image-20201211151329710

1.6 MQ产品介绍

  1. ActiveMQ
    1. java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
  2. RabbitMQ
    1. erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
  3. RocketMQ
    1. java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
  4. kafka
    1. scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

RocketMQ

  1. RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会 作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛 的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿 级,峰值TPS达到5600万)

  2. 解决所有缺点

2. 环境搭建

image-20201211151842484

2.1 基础概念

  1. 生产者
  2. 消费者
  3. 消息服务器
  4. 命名服务器
  5. 消息
    1. 主题
    2. 标签
  6. 心跳
  7. 监听器
  8. 拉取消费、推动消费
  9. 注册

2.2 安装

  1. 命名服务器
  2. 消息服务器

image-20201211152332969

2.3 下载

https://www.apache.org/

2.4 安装过程

  1. 步骤1:安装JDK(1.8)

  2. 步骤2:上传压缩包(zip)

    1
    yum -y install lrzsz 
    1
    rz
  3. 步骤3:解压缩

    1
    unzip rocketmq-all-4.5.2-bin-release.zip
  4. 步骤4:修改目录名称

    1
    mv rocketmq-all-4.5.2-bin-release rocketmq

2.5 启动服务器

  1. 步骤1:启动命名服务器(bin目录下)
1
sh mqnamesrv
  1. 步骤2:启动消息服务器(bin目录下)

    1
    sh mqbroker -n localhost:9876
  2. 修改 runserver.sh文件中有关内存的配置(调整的与当前虚拟机内存匹配即可,推荐256m-128m)

    1
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m 

    修改runbroker.sh文件中有关内存的配置(调整的与当前虚拟机内存匹配即可,推荐256m-128m)

    1
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xms256m -Xmn128m"

    ​ 同时修改文件内74行的数据,改为以下内容:

    1
    JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=256m"

2.6 测试服务器环境

  1. 步骤1:配置命名服务器地址

    1
    export NAMESRV_ADDR=localhost:9876
  2. 步骤2:启动生产者程序客户端(bin目录下)

    1
    sh tools.sh org.apache.rocketmq.example.quickstart.Producer

    启动后产生大量日志信息(注意该信息是测试程序中自带的,不具有通用性,仅供学习查阅参考)

  3. 步骤3:启动消费者程序客户端(bin目录下)

    1
    sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

    启动后产生大量日志信息

3. 消息发送(重点)

3.1 主要内容

  1. 基于Java环境构建消息发送与消息接收基础程序
    1. 单生产者单消费者
    2. 单生产者多消费者
    3. 多生产者多消费者
  2. 发送不同类型的消息
    1. 同步消息
    2. 异步消息
    3. 单向消息
  3. 特殊的消息发送
    1. 延时消息
    2. 批量消息
  4. 特殊的消息接收
    1. 消息过滤
  5. 消息发送与接收顺序控制
  6. 事务消息

image-20201211153150220

3.2 消息发送与接收开发流程

  1. 谁来发?
  2. 发给谁?
  3. 怎么发?
  4. 发什么?
  5. 发的结果是什么?
  6. 打扫战场

3.3 单生产者单消费者消息发送(OneToOne)

  1. 导入RocketMQ客户端坐标

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
    </dependency>

3.4 单生产者单消费者消息发送(OneToOne)

  1. 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.184.128:9876");
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
//5.关闭连接
producer.shutdown();
  1. 注意:关闭服务器防火墙

    1
    systemctl stop firewalld.service 

3.5 单生产者单消费者消息发送(OneToOne)

  1. 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    //1.创建一个接收消息的对象Consumer
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    //2.设定接收的命名服务器地址
    consumer.setNamesrvAddr("192.168.184.128:9876");
    //3.设置接收消息对应的topic,对应的sub标签为任意*
    consumer.subscribe("topic1","*");
    //3.开启监听,用于接收消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    for(MessageExt msg : list){
    System.out.println("消息:"+new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }});
    //4.启动接收消息的服务
    consumer.start();

3.6 单生产者多消费者消息发送(OneToMany)

  1. 消费者(负载均衡模式:默认模式)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.184.128:9876");
    consumer.subscribe("topic1","*");
    //设置当前消费者的消费模式(默认模式:负载均衡)
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    for(MessageExt msg : list){
    System.out.println("消息:"+new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //4.启动接收消息的服务
    consumer.start();

3.7 单生产者多消费者消息发送(OneToMany)

  1. 消费者(广播模式)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.184.128:9876");
    consumer.subscribe("topic1","*");
    //设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    for(MessageExt msg : list){
    System.out.println("消息:"+new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //4.启动接收消息的服务
    consumer.start();

3.8 多生产者多消费者消息发送(ManyToMany)

  1. 多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

3.9 小节

  1. 消息发送
    1. One-To-One(基础发送与基础接收)
    2. One-To-Many(负载均衡模式与广播模式)
    3. Many-To-Many

3.10 消息类别

  1. 同步消息
  2. 异步消息
  3. 单向消息

3.11 同步消息

特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

image-20201211154023471

3.12 异步消息

特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息

image-20201211154108518

3.13 单向消息

特征:不需要有回执的消息,例如日志类消息

image-20201211154145199

3.14 单向消息

  1. 同步消息

    1
    SendResult result = producer.send(msg);
  2. 异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    producer.send(msg, new SendCallback() {
    //表示成功返回结果
    public void onSuccess(SendResult sendResult) {
    System.out.println(sendResult);
    }
    //表示发送消息失败
    public void onException(Throwable t) {
    System.out.println(t);
    }
    });

    //如果加了关闭producer的话 一定要在关闭前加个线程休眠 否则执行到回调函数 producer已经关了 获取不到回调的数据
    Thread.sleep(2000);
    producer.shutdown();
  3. 单向消息

    1
    producer.sendOneway(msg);

3.15 延时消息

  1. 消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

    1
    2
    3
    4
    5
    Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
    //设置当前消息的延时效果
    msg.setDelayTimeLevel(3);
    SendResult result = producer.send(msg);
    System.out.println("返回结果:"+result);
  2. 目前支持的消息时间

    1. 秒级:1,5,10,30
    2. 分级:1~10,20,30
    3. 时级:1,2
    4. 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

3.16 批量消息

  1. 发送批量消息

    1
    2
    3
    4
    5
    List<Message> msgList = new ArrayList<Message>();
    //注:批量发送消息时 一个list只能存放一样的topic 例如 一个list不能既存topic1 又存topic2
    msgList.add(new Message("topic2","hello1".getBytes()));
    msgList.add(new Message("topic2","hello2".getBytes()));
    SendResult send = producer.send(msgList);
  2. 消息内容总长度不超过4M

  3. 消息内容总长度包含如下:

    1. topic(字符串字节数)
    2. body (字节数组长度)
    3. 消息追加的属性(key与value对应字符串字节数)
    4. 日志(固定20字节)

3.17 消息过滤

  1. 分类过滤

    1. 生产者

      1
      Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2" 标题 Tag ).getBytes("UTF-8"));
    2. 消费者

      1
      2
      //接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
      consumer.subscribe("topic6","tag1 || tag2");
  2. 语法过滤(属性过滤/语法过滤/SQL过滤)

    1. 生产者

      1
      2
      3
      //为消息添加属性
      msg.putUserProperty("vip","1");
      msg.putUserProperty("age","20");
    2. 消费者

      1
      2
      //使用消息选择器来过滤对应的属性,语法格式为类SQL语法
      consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
    3. 注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能

      1
      enablePropertyFilter=true
    4. 启动服务器使启用对应配置文件

      1
      sh mqbroker -n localhost:9876 -c ../conf/broker.conf

3.18 错乱的消息顺序

image-20201211155020542

image-20201211160226139

3.19 顺序消息

1.实体类(Order)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.itheima.order.domain;

public class Order {
private String id;
private String msg;

@Override
public String toString() {
return "Order{ id='" + id + ", msg='" + msg + '}';
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}

  1. 发送消息(producer)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    package com.itheima.order;

    import com.itheima.order.domain.Order;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;

    import java.util.ArrayList;
    import java.util.List;

    //测试顺序消息
    public class Producer {
    public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("192.168.184.128:9876");
    producer.start();

    //创建要执行的业务队列
    List<Order> orderList = new ArrayList<Order>();

    Order order11 = new Order();
    order11.setId("a");
    order11.setMsg("主单-1");
    orderList.add(order11);

    Order order12 = new Order();
    order12.setId("a");
    order12.setMsg("子单-2");
    orderList.add(order12);

    Order order13 = new Order();
    order13.setId("a");
    order13.setMsg("支付-3");
    orderList.add(order13);

    Order order14 = new Order();
    order14.setId("a");
    order14.setMsg("推送-4");
    orderList.add(order14);

    Order order21 = new Order();
    order21.setId("b");
    order21.setMsg("主单-1");
    orderList.add(order21);

    Order order22 = new Order();
    order22.setId("b");
    order22.setMsg("子单-2");
    orderList.add(order22);

    Order order31 = new Order();
    order31.setId("c");
    order31.setMsg("主单-1");
    orderList.add(order31);

    Order order32 = new Order();
    order32.setId("c");
    order32.setMsg("子单-2");
    orderList.add(order32);

    Order order33 = new Order();
    order33.setId("c");
    order33.setMsg("支付-3");
    orderList.add(order33);

    //设置消息进入到指定的消息队列中
    for(final Order order : orderList){
    Message msg = new Message("orderTopic",order.toString().getBytes());
    //发送时要指定对应的消息队列选择器
    SendResult result = producer.send(msg, new MessageQueueSelector() {
    //设置当前消息发送时使用哪一个消息队列
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    //根据发送的信息不同,选择不同的消息队列
    //根据id来选择一个消息队列的对象,并返回->id得到int值
    int mqIndex = order.getId().hashCode() % list.size();
    return list.get(mqIndex);
    }
    }, null);

    System.out.println(result);
    }

    producer.shutdown();
    }
    }
    //设置消息进入到指定的消息队列中
    for(final Order order : orderList){
    Message msg = new Message("orderTopic",order.toString().getBytes());
    //发送时要指定对应的消息队列选择器
    SendResult result = producer.send(msg, new MessageQueueSelector() {
    //设置当前消息发送时使用哪一个消息队列
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    //根据发送的信息不同,选择不同的消息队列
    //根据id来选择一个消息队列的对象,并返回->id得到int值
    int mqIndex = order.getId().hashCode() % list.size();
    return list.get(mqIndex);
    }
    }, null);
    System.out.println(result);
    }

3.20 顺序消息

  1. 接收消息(consumer)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.itheima.order;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.*;
    import org.apache.rocketmq.common.message.MessageExt;

    import java.util.List;

    public class Consumer {
    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.184.128:9876");
    consumer.subscribe("orderTopic","*");

    //使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
    consumer.registerMessageListener(new MessageListenerOrderly() {
    //使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
    for(MessageExt msg : list){
    System.out.println(Thread.currentThread().getName()+" 消息:"+new String(msg.getBody()));
    }
    return ConsumeOrderlyStatus.SUCCESS;
    }
    });

    consumer.start();
    System.out.println("接收消息服务已开启运行");
    }
    }

3.21 事务消息

  1. 正常事务过程
  2. 事务补偿过程

image-20201211160454763

3.22 事务消息状态

  1. 提交状态:允许进入队列,此消息与非事务消息无区别
  2. 回滚状态:不允许进入队列,此消息等同于未发送过
  3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
  4. 注意:事务消息仅与生产者有关,与消费者无关

3.23 事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
//添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果:"+result);
producer.shutdown();

4. 集群搭建

4.1 RocketMQ集群分类

  1. 单机
    1. 一个broker提供服务(宕机后服务瘫痪)
  2. 集群
    1. 多个broker提供服务(单机宕机后消息无法及时被消费)
    2. 多个master多个slave
      1. master到slave消息同步方式为同步(较异步方式性能略低,消息无延迟)
      2. master到slave消息同步方式为异步(较同步方式性能略高,数据略有延迟)

image-20201211160840336

4.2 RocketMQ集群特征

image-20201211160927121

RocketMQ集群工作流程

  1. 步骤1:NameServer启动,开启监听,等待broker、producer与consumer连接
  2. 步骤2:broker启动,根据配置信息,连接所有的NameServer,并保持长连接
  3. 步骤2补充:如果broker中有现存数据, NameServer将保存topic与broker关系
  4. 步骤3:producer发信息,连接某个NameServer,并建立长连接
  5. 步骤4:producer发消息
    1. 步骤4.1若果topic存在,由NameServer直接分配
    2. 步骤4.2如果topic不存在,由NameServer创建topic与broker关系,并分配
  6. 步骤5:producer在broker的topic选择一个消息队列(从列表中选择)
  7. 步骤6:producer与broker建立长连接,用于发送消息
  8. 步骤7:producer发送消息

comsumer工作流程同producer

image-20201211161127465

双主双从集群搭建:

image-20201211161235249

image-20201211161300053

  1. 配置服务器环境:

    1
    vim /etc/hosts
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # nameserver
    192.168.184.128 rocketmq-nameserver1
    192.168.184.129 rocketmq-nameserver2
    # broker
    192.168.184.128 rocketmq-master1
    192.168.184.129 rocketmq-slave2
    192.168.184.129 rocketmq-master2
    192.168.184.128 rocketmq-slave1

  2. 配置完毕后重启网卡,应用配置

1
systemctl restart network
  1. 关闭防火墙或者开发指定端口对外提供服务

    1
    2
    3
    4
    5
    6
    # 关闭防火墙
    systemctl stop firewalld.service
    # 查看防火墙的状态
    firewall-cmd --state
    # 禁止firewall开机启动
    systemctl disable firewalld.service
  2. 开启指定端口对外提供服务

    1
    2
    3
    4
    5
    6
    7
    8
    # 开放name server默认端口
    firewall-cmd --remove-port=9876/tcp --permanent
    # 开放master默认端口
    firewall-cmd --remove-port=10911/tcp --permanent
    # 开放slave默认端口 (当前集群模式可不开启)
    firewall-cmd --remove-port=11011/tcp --permanent
    # 重启防火墙
    firewall-cmd --reload
  3. 配置服务器环境

    1
    vim /etc/profile
    1
    2
    3
    4
    #set rocketmq
    ROCKETMQ_HOME=/rocketmq
    PATH=$PATH:$ROCKETMQ_HOME/bin
    export ROCKETMQ_HOME PATH
  4. 配置完毕后重启网卡,应用配置

    1
    source /etc/profile
  5. 创建集群服务器的数据存储目录

    1
    2
    3
    4
    mkdir /rocketmq/store
    mkdir /rocketmq/store/commitlog
    mkdir /rocketmq/store/consumequeue
    mkdir /rocketmq/store/index
  6. 注意master与slave如果在同一个虚拟机中部署,需要将存储目录区分开

  7. broker-a.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128

  8. broker-b-s.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/rocketmq/store-slave
    #commitLog 存储路径
    storePathCommitLog=/rocketmq/store-slave/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/rocketmq/store-slave/consumequeue
    #消息索引存储路径
    storePathIndex=/rocketmq/store-slave/index
    #checkpoint 文件存储路径
    storeCheckpoint=/rocketmq/store-slave/checkpoint
    #abort 文件存储路径
    abortFile=/rocketmq/store-slave/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
  9. 检查启动内存

    1
    vim /rocketmq/bin/runbroker.sh
    1
    2
    # 开发环境配置 JVM Configuration
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
  10. 启动服务器(在bin目录下依次启动)

    1
    2
    nohup sh mqnamesrv &

    1
    2
    nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &

    1
    nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &

broker-b.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

broker-a-s.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store-slave
#commitLog 存储路径
storePathCommitLog=/rocketmq/store-slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store-slave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store-slave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store-slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
1
nohup sh mqnamesrv &
1
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &
1
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &

4.3 rocketmq-console集群监控平台搭建

  1. incubator-rocketmq-externals是一个基于rocketmq的基础之上扩展开发的开源项目
  2. 获取地址:https://github.com/apache/rocketmq-externals
  3. rocketmq-console是一款基于java环境开发的(springboot)的管理控制台工具

5. 高级特性(重点)

5.1 消息的存储

  1. 消息生成者发送消息到MQ
  2. MQ返回ACK给生产者
  3. MQ push 消息给对应的消费者
  4. 消息消费者返回ACK给MQ

说明:ACK(Acknowledge character)

image-20201211162105593

5.2 消息的存储

  1. 消息生成者发送消息到MQ
  2. MQ收到消息,将消息进行持久化,存储该消息
  3. MQ返回ACK给生产者
  4. MQ push 消息给对应的消费者
  5. 消息消费者返回ACK给MQ
  6. MQ删除消息

注意:

  1. 第⑤步MQ在指定时间内接到消息消费者返回ACK,MQ认定消息消费成功,执行⑥
  2. 第⑤步MQ在指定时间内未接到消息消费者返回ACK,MQ认定消息消费失败,重新执行④⑤⑥

image-20201211162231313

5.3 消息的存储介质

  1. 数据库
    1. ActiveMQ
    2. 缺点:数据库瓶颈将成为MQ瓶颈
  2. 文件系统
    1. RocketMQ/Kafka/RabbitMQ
    2. 解决方案:采用消息刷盘机制进行数据存储
    3. 缺点:硬盘损坏的问题无法避免

image-20201211162334322

5.4 高效的消息存储与读写方式

  1. SSD(Solid State Disk)

    image-20201211162416020

    1. 随机写(100KB/s)

      image-20201211162448564

    2. 顺序写 (600MB/s)1秒1部电影

image-20201211162541076

image-20201211162638525

  1. Linux系统发送数据的方式

  2. “零拷贝”技术

    1. 数据传输由传统的4次复制简化成3次复制,减少1次复制过程
    2. Java语言中使用MappedByteBuffer类实现了该技术
    3. 要求:预留存储空间,用于保存数据(1G存储空间起步)

    image-20201211162729614

5.5 消息存储结构

  1. MQ数据存储区域包含如下内容
    1. 消息数据存储区域
      1. topic
      2. queueId
      3. message
    2. 消费逻辑队列
      1. minOffset
      2. maxOffset
      3. consumerOffset
    3. 索引
      1. key索引
      2. 创建时间索引

image-20201211162857722

5.6 刷盘机制

  1. 同步刷盘

    1. 生产者发送消息到MQ,MQ接到消息数据

    2. MQ挂起生产者发送消息的线程

    3. MQ将消息数据写入内存

    4. 内存数据写入硬盘

    5. 磁盘存储后返回SUCCESS

    6. MQ恢复挂起的生产者线程

    7. 发送ACK到生产者

      image-20201211163159680

  2. 异步刷盘

    1. 生产者发送消息到MQ,MQ接到消息数据
    2. MQ将消息数据写入内存
    3. 发送ACK到生产者

image-20201211163238738

  1. 同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
  2. 异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)

配置方式

1
2
3
4
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH

5.7 高可用性

  1. nameserver
    1. 无状态+全服务器注册
  2. 消息服务器
    1. 主从架构(2M-2S)
  3. 消息生产
    1. 生产者将相同的topic绑定到多个group组,保障master挂掉后,其他master仍可正常进行消 息接收
  4. 消息消费
    1. RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙 时候,自动切换由slave承担数据读取的工作

5.8 主从数据复制

  1. 同步复制

    1. master接到消息后,先复制到slave,然后反馈给生产者写操作成功
    2. 优点:数据安全,不丢数据,出现故障容易恢复
    3. 缺点:影响数据吞吐量,整体性能低
  2. 异步复制

    1. master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave
    2. 优点:数据吞吐量大,操作延迟低,性能高
    3. 缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失
  3. 配置方式

    1
    2
    3
    4
    5
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER

5.9 负载均衡

  1. Producer负载均衡
    1. 内部实现了不同broker集群中对同一topic对应消息队列的负载均衡
  2. Consumer负载均衡
    1. 平均分配
    2. 循环平均分配
  3. 广播模式(不参与负载均衡)

image-20201211163654002

image-20201211163706995

image-20201211163736770

5.10 消息重试

  1. 当消息消费后未正常返回消费成功的信息将启动消息重试机制
  2. 消息重试机制
    1. 顺序消息
    2. 无序消息
5.10.1 顺序消息重试

image-20201211163848104

image-20201211163854405

  1. 当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为 1 秒)
  2. 注意:应用会出现消息消费被阻塞的情况,因此,要对顺序消息的消费情况进行监控,避免阻塞现象的发生

image-20201211163910587

image-20201211163928369

5.10.2 无序消息重试
  1. 无序消息包括普通消息、定时消息、延时消息、事务消息
  2. 无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费
  3. 为保障无序消息的消费,MQ设定了合理的消息重试间隔时长

image-20201211164005887

5.11 死信队列

  1. 当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
  2. 死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)\
  3. 死信队列特征
    1. 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
    2. 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
    3. 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
  4. 死信队列中消息特征
    1. 不会被再次重复消费
    2. 死信队列中的消息有效期为3天,达到时限后将被清除

5.12 死信处理

  1. 在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费

5.13 消息重复消费

  1. 消息重复消费原因
    1. 生产者发送了重复的消息
      1. 网络闪断
      2. 生产者宕机
    2. 消息服务器投递了重复的消息
      1. 网络闪断
    3. 动态的负载均衡过程
      1. 网络闪断/抖动
      2. broker重启
      3. 订阅方应用重启(消费者)
      4. 客户端扩容
      5. 客户端缩容

image-20201211164244534

5.14 消息幂等

  1. 对同一条消息,无论消费多少次,结果保持一致,称为消息幂等性
  2. 解决方案
    1. 使用业务id作为消息的key
    2. 在消费消息时,客户端对key做判定,未使用过放行,使用过抛弃
  3. 注意:messageId由RocketMQ产生,messageId并不具有唯一性,不能作用幂等判定条件