一、基础环境搭建
1.1 Java环境
安装OpenJDK8
yum search java|grep jdk #查看yum支持的版本
yum install -y java-1.8.0-openjdk #使用yum安装jdk8
java -version #检查是否成功
安装Maven
yum -y install maven
二、安装
先贴出官网文档
Apache RocketMQ官网 https://rocketmq.apache.org/
Apache RocketMQ官网文档 https://rocketmq.apache.org/docs/quick-start/
Apache RocketMQ GitHub文档中文版 https://github.com/apache/rocketmq/tree/master/docs/cnf
2.1从发行版下载和构建
主要参考 Apache RocketMQ官网文档 https://rocketmq.apache.org/docs/quick-start/
wget https://mirror.bit.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip #从官网下载4.8.0版本源码
unzip rocketmq-all-4.8.0-source-release.zip #解压
cd rocketmq-all-4.8.0/
mvn -Prelease-all -DskipTests clean install -U #开始构建
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
2.2修改配置
1)添加JAVA路径的绝对位置
需要添加的文件有两个:
tools.sh 这个文件中添加是需要创建Topic,不添加可能会无法创建Topic
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.275.b01-1.el8_3.x86_64/jre/lib/ext"
runbroker.sh 这个文件和broker息息相关,不添加使用ACL时可能会无法签名
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.275.b01-1.el8_3.x86_64/jre/lib/ext"
文件路径在: /rokectMQ解压后的路径/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/bin
2)修改JVM相关配置
默认配置下的JVM最大内存为8G,需要根据服务器情况修改配置。可以参考 https://rocketmq.apache.org/docs/system-config/
tools.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"
runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
3)修改broker相关配置
文件路径在: /rokectMQ解压后的路径/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf
主要参考 https://github.com/apache/rocketmq/blob/release-4.8.0/docs/cn/acl/user_guide.md
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
#开启允许创建topic
autoCreateTopicEnable = true
#开启运行创建group
autoCreateSubscriptionGroup=true
## if acl is open,the flag will be true
aclEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876
4)修改权限控制存储
参考 https://github.com/apache/rocketmq/blob/release-4.8.0/docs/cn/acl/user_guide.md
三、启动
启动mqnamesrv,mqbroker前建好输入的日志文件
cd ~
mkdir logs
cd logs
mkdir rocketmqlogs
cd rocketmqlogs
touch broker.log
touch namesrv.log
启动mqnamesrv
nohup sh bin/mqnamesrv > ~/logs/rocketmqlogs/mqnamesrv.log 2> ~/logs/rocketmqlogs/mqnamesrv.log & #要注意执行命令的路径
启动mqbroker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf > ~/logs/rocketmqlogs/broker.log 2> ~/logs/rocketmqlogs/broker.log & #要注意执行的路径
启动成功后日志文件中有
…… boot success. serializeType=JSON and name server is localhost:9876
四、测试发送和接收消息
4.1在安装RocketMQ本地测试
在安装RocketMQ本地测试需在第二步的broker相关配置种关闭ACL
启动生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
启动消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
4.2远程ACL测试
rockectMQ源码中有example,简单修改即可测试
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AclClient {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
private static final String ACL_ACCESS_KEY = "RocketMQ";//修改为自己配置的
private static final String ACL_SECRET_KEY = "1234567";//修改为自己配置的
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
pushConsumer();
pullConsumer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("groupA", getAclRPCHook());
producer.setNamesrvAddr("xx.xx.xx.xx:9876");//服务器地址
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("topicA",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg,5000L);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("xx.xx.xx.xx:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// Wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
public static void pullConsumer() throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAclRPCHook());
consumer.setNamesrvAddr("xx.xx.xx.xx:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
printBody(pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void printBody(PullResult pullResult) {
printBody(pullResult.getMsgFoundList());
}
private static void printBody(List<MessageExt> msg) {
if (msg == null || msg.size() == 0)
return;
for (MessageExt m : msg) {
if (m != null) {
System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
}
}
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
}
五、关闭服务
关闭broker
sh bin/mqshutdown broker
关闭nameser
sh bin/mqshutdown namesrv