RocketMQ
# RocketMQ
# 应用场景
- 异步解藕
- 削峰填谷
- 消息分发
# 环境搭建
-
上传 rocketmq-all-4.4.0-bin-release.zip 到家目录
-
使用解压命令进行解压
1
unzip /usr/local/rocketmq-all-4.4.0-bin-release.zip
-
软件重命名
1
mv /usr/local/rocketmq-all-4.4.0-bin-release/ /usr/local/rocketmq-4.4/
-
修改启动参数配置
JAVA_OPT=”${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g“
两个文件
1
2
3vi /usr/local/rocketmq-4.4/bin/runbroker.sh
vi /usr/local/rocketmq-4.4/bin/runserver.sh -
启动名字服务和代理服务
1
2
3
4
5nohup sh /usr/local/rocketmq-4.4/bin/mqnamesrv &
# -n localhost:9876 指定名称服务的地址, 类似于zk的地址
nohup sh /usr/local/rocketmq-4.4/bin/mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf & -
检验是否启动正常
使用 java 的内置命令: jps 可以看到 BrokerStartup 和 NamesrvStartup 进程
使用 Linux 命令 **: netstat-ntlp 可以看到 9876 的端口和 10911 的端口 **
使用 ps-ef |grep java
查看启动日志:
tail -100f ~/logs/rocketmqlogs/namesrv.log
tail -100f ~/logs/rocketmqlogs/broker.log
-
关闭 RocketMQ
1
2
3
4
5
6
7# 1.关闭NameServer
sh /usr/local/rocketmq-4.4/bin/mqshutdown namesrv
# 2.关闭Broker
sh /usr/local/rocketmq-4.4/bin/mqshutdown broker
# 编写 sh 脚本文件
-
启动 (startRocketMQ.sh)
1
2
3
4
5
6
7
8
9# !/bin/bash
echo '------------------rocketmq-nameServer-starter-------------------------'
nohup sh /usr/local/rocketmq-4.4/bin/mqnamesrv &
echo '------------------rocketmq-nameServer-started-------------------------'
echo '------------------rocketmq-brokerServer-starter-----------------------'
nohup sh /usr/local/rocketmq-4.4/bin/mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
echo '------------------rocketmq-brokerServer-started-----------------------' -
关闭 (stutdownRocketMQ.sh)
1
2
3
4
5
6
7
8
9# !/bin/bash
echo '------------------rocketmq-nameServer-shutdown-------------------------'
sh /usr/local/rocketmq-4.4/bin/mqshutdown namesrv
echo '------------------rocketmq-nameServer-shutdowned-------------------------'
echo '------------------rocketmq-brokerServer-shutdown-----------------------'
sh /usr/local/rocketmq-4.4/bin/mqshutdown broker
echo '------------------rocketmq-brokerServer-shutdowned-----------------------'
# 监控平台
使用 jar
1 | nohup java -jar rocketmq-console-ng-1.0.1.jar & |
# SpringBoot 集成
# 依赖
1 | <dependency> |
# 配置
-
生产者
1
2rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group -
消费者
1
rocketmq.name-server=127.0.0.1:9876
# 编码
-
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14@RestController
public class HelloController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("01-hello")
public String sendMsg(String message,String age) throws Exception{
//发送消息
SendResult sendResult = rocketMQTemplate.syncSend("01-boot:", message);
System.out.println(sendResult.getMsgId());
System.out.println(sendResult.getSendStatus());
return "success";
}
} -
消费者
1
2
3
4
5
6
7
8
9
10
11@Component
@RocketMQMessageListener(
topic = "01-boot",
consumerGroup = "wolfcode-consumer"
)
public class HelloConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("消费消息"+messageExt);
}
}
# 发送消息方式 (生产者)
# 发送类型
-
同步消息
1
2
3SendResult sendResult = rocketMQTemplate.syncSend("020-boot", msg);
System.out.println(sendResult.getMsgId());
System.out.println(sendResult.getSendStatus()); -
异步消息
1
2
3
4
5
6
7
8
9
10
11
12rocketMQTemplate.asyncSend("020-boot", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getMsgId());
System.out.println(sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
}); -
一次性消息
1
rocketMQTemplate.sendOneWay("020-boot", msg);
# 发送时间
默认立即发送
-
延时发送
1
2// 参数1:主题 2:消息 3:rocket发送最大允许时间 4:延时级别(18级)
SendResult sendResult = rocketMQTemplate.syncSend("020-boot", MessageBuilder.withPayload(msg).build(),100000,3);
# 消费模式 (消费者)
以组为单位 默认为集群模式
-
集群模式 (每组只有一个可以收到)
1
2
3
4
5
6
7
8
9
10
11
12
13@Component
@RocketMQMessageListener(
topic = "020-boot",
messageModel = MessageModel.CLUSTERING,
consumerGroup = "wolfcode-consumer"
)
public class MqListenner implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("今天上映:"+s);
}
} -
广播模式 (每组的所有消费者都可以收到)
1
2
3
4
5
6
7
8
9
10
11
12
13@Component
@RocketMQMessageListener(
topic = "020-boot",
messageModel = MessageModel.BROADCASTING,
consumerGroup = "wolfcode-consumer"
)
public class MqListenner implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("今天上映:"+s);
}
}
# 消息过滤
# Tag 标签模式
在发送的消息 Topic:Tag 中间使用冒号隔开
-
生产者
1
2
3
4
5@RequestMapping("/sendTagMsg")
public String sendTagMsg(String msg) {
rocketMQTemplate.convertAndSend("020-boot:TagB",msg);
return "success";
} -
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14@Component
@RocketMQMessageListener(
topic = "020-boot",
selectorType = SelectorType.TAG,
//接收TagB或TagA
secretKey = "TagB || TagA",
consumerGroup = "wolfcode-consumer"
)
public class MqListenner implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("今天上映:"+s);
}
}
# SQL92 过滤
注意:在使用 SQL 过滤的时候,需要配置参数 enablePropertyFilter=true
-
生产者
1
2
3
4
5
6
7
8
9
10
11
12//Sql92过滤
@RequestMapping("/sendSQLMsg")
public String sendSQLMsg(int age,String msg) {
Map<String,Object> map=new HashMap<>();
//用户自定义属性
map.put("age", age);
map.put("name", "hesj");
//也可以设置系统属性
map.put(MessageConst.PROPERTY_KEYS,age);
template.convertAndSend("02-RocketMQ-Top7",msg,map);
return "success";
} -
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14@Component
@RocketMQMessageListener(
topic = "02-RocketMQ-Top7",
messageModel = MessageModel.CLUSTERING,
selectorType = SelectorType.SQL92,
selectorExpression = "age > 16",
consumerGroup= "wolfcode-consumer7"
)
public class MqListiner7 implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消费消息SQl92"+msg);
}
}
# 关于我
Brath 是一个热爱技术的 Java 程序猿,公众号「InterviewCoder」定期分享有趣有料的精品原创文章!
非常感谢各位人才能看到这里,原创不易,文章如果有帮助可以关注、点赞、分享或评论,这都是对我的莫大支持!