登录|注册|帮助中心|联系我们

导航
首页 综合百科 生活常识 数码科技 明星名人 传统文化 互联网 健康 影视 美食 教育 旅游 汽车 职场 时尚 运动 游戏 家电 地理 房产 金融 节日 服饰 乐器 歌曲 动物 植物
当前位置:首页 > 生活常识

activemq端口怎么调(activemq详解)

发布时间:2023年1月6日责任编辑:张小云标签:暂无标签

一、下载与安装

直接去官网(
http://activemq.apache.org/)下载最新版本即可,由于这是免安装的,只需要解压就行了。安装完之后进入bin目录,双击 activemq.bat文件(linux下在bin目录下执行 activemq start)

二、访问控制台

在浏览器输入:http://ip:8161/admin/,出现如下界面表示启动成功,默认的用户名密码都是admin

???

三、修改端口号

61616为对外服务端口号

8161为控制器端口号

当端口号冲突时,可以修改这两个端口号。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。

queue队列模式:

和rabbitmq简单队列模式一样,若是有多个消费者消费同一个队列中的消息的话,默认也是轮询机制的消费

示例代码:

public class Productor { ???public static final String BORKER_URL = "tcp://127.0.0.1:61616"; ???public static final String QUEUE_NAME = "queue1"; ???public static void main(String[] args) throws JMSException { ???????//创建工厂 ???????ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); ???????//创建tcp连接 ???????Connection connection = factory.createConnection(); ???????//建立连接 ???????connection.start(); ???????????????Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ???????//创建队列(消息的目的地) ???????Queue queue = session.createQueue(QUEUE_NAME); ???????//创建生产者 ???????MessageProducer producer = session.createProducer(queue); ???????//消息非持久化 ???????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); ???????//消息持久化 默认是持久化的// ???????producer.setDeliveryMode(DeliveryMode.PERSISTENT); ???????//创建消息 ???????TextMessage message = session.createTextMessage("你好吗"); ???????//发送消息 ???????producer.send(message); ???????producer.close(); ???????session.close(); ???????connection.close(); ???????System.out.println("发送成功!"); ???}}public class Consumer { ???public static final String BORKER_URL = "tcp://127.0.0.1:61616"; ???public static final String QUEUE_NAME = "queue1"; ???public static void main(String[] args) throws JMSException { ???????//创建工厂 ???????ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); ???????//创建tcp连接 ???????Connection connection = factory.createConnection(); ???????//建立连接 ???????connection.start(); ???????????????Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ???????//创建/声明队列(消息的目的地) ???????Queue queue = session.createQueue(QUEUE_NAME); ???????//创建消费者 ???????MessageConsumer consumer = session.createConsumer(queue); ???????????????//监听的方式消费 ???????consumer.setMessageListener(message -> { ???????????TextMessage textMessage = (TextMessage)message; ???????????try { ???????????????System.out.println("1号接收到消息:" + textMessage.getText()); ???????????} catch (JMSException e) { ???????????????e.printStackTrace(); ???????????} ???????}); ???}}

topic队列模式:

称为发布订阅模式,生产者把消息发送给订阅给某个topic主题的消费者,是分发的模式,这种模式默认需要先启动消费者,不然就算生产者发布了某个topic主题的消息,消费者也消费不了;除非消费者提前订阅,并且做了消息持久化的处理,这样后启动消费者才能消费提前推送的消息。

代码:

public class Productor { ???public static final String BORKER_URL = "tcp://127.0.0.1:61616"; ???public static final String TOPIC_NAME = "topic1"; ???public static void main(String[] args) throws JMSException { ???????//创建工厂 ???????ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); ???????//异步投递 ???????factory.setUseAsyncSend(true); ???????//创建tcp连接 ???????Connection connection = factory.createConnection(); ???????????????Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ???????//创建/声明topic(消息的目的地) ???????Topic topic = session.createTopic(TOPIC_NAME); ???????//创建生产者 ???????ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic); ???????//持久化 ???????producer.setDeliveryMode(DeliveryMode.PERSISTENT); ???????//建立连接 ???????connection.start(); ???????//创建消息 ???????TextMessage message = session.createTextMessage("你好吗"); ???????//发送消息,异步发送回调函数 ???????producer.send(message, new AsyncCallback() { ???????????@Override ???????????public void onSuccess() { ???????????????System.out.println("success"); ???????????} ???????????@Override ???????????public void onException(JMSException e) { ???????????????System.out.println("fail"); ???????????} ???????}); ???????producer.close(); ???????session.close(); ???????connection.close(); ???????System.out.println("发送成功!"); ???}}public class Consumer1 { ???public static final String BORKER_URL = "tcp://127.0.0.1:61616"; ???public static final String TOPIC_NAME = "topic1"; ???public static void main(String[] args) throws JMSException { ???????//创建工厂 ???????ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); ???????//创建tcp连接 ???????Connection connection = factory.createConnection(); ???????//制定clientId ???????connection.setClientID("my"); ???????????????Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ???????//创建/声明topic(消息的目的地) ???????Topic topic = session.createTopic(TOPIC_NAME); ???????//订阅主题 ???????TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark"); ???????//建立连接 ???????connection.start(); ???????while (true) { ???????????//receive会阻塞线程 ???????????//接收订阅的消息 ???????????TextMessage message = (TextMessage) subscriber.receive(); ???????????System.out.println("接收到消息:" + message.getText()); ???????} ???????????}}

如何保证消息的可靠性

回答这个问题主要从持久化,事务,签收这几个方面入手

消息持久化的核心代码:

//queue模式的消息持久化 默认是持久化的 producer.setDeliveryMode(DeliveryMode.PERSISTENT); ?Topic topic = session.createTopic(TOPIC_NAME);ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();

事务的核心代码(偏生产者):

//参数设置成trueconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);//事务提交session.commit();

签收的核心代码(偏消费者):

//参数设置成手动提交connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//消息签收message.acknowledge();

注意:若是既开启事务,又开启手动签收,以事务为准,只要事务被提交了也默认消息被签收了

性能提升:

1.利用nio的协议比tcp的性能高,

??配置方式:在conf目录下activemq.xml照着下面配置 ?<broker> ?... ?<transportConnectors> ???<transportConnector name="nio" uri="nio://0.0.0.0:61616"/> ???</<transportConnectors> ?...</broker> ??第二步是代码访问方式由tcp改为nio ?//创建工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio://127.0.0.1:61616");

2.jdbc+Journaling提高只有jdbc持久化的性能,它在做持久化入数据库之前,会先将数据保存到Journaling文件中,之后才慢慢同步到数据库中,等于中间加了一层缓冲层。

??把数据库mysql的驱动包放到lib目录下 ?配置方式:在conf目录下activemq.xml照着下面配置,其中有个createTablesOnStartup属性,默认值是true,表示每次启动后去数据库自动建表 ?<persistenceAdapter> ?<kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> //上面是默认配置找到改成下面的配置<persistenceAdapter> ??<journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${basedir}/activemq-data" dataSource="#mysql-ds"/></persistenceAdapter> //下面的代码写在<beans>节点中<bean ??destroy-method="close"> ?<property name="driverClassName" value="com.mysql.jdbc.Driver"/> ?<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> ?<property name="username" value="activemq"/> ?<property name="password" value="activemq"/> ?<property name="poolPreparedStatements" value="true"/></bean>

其它知识推荐

溜溜百科知识网——分享日常生活学习工作各类知识。 垃圾信息处理邮箱 tousu589@163.com
icp备案号 闽ICP备14012035号-2 互联网安全管理备案 不良信息举报平台 Copyright 2023 www.6za.net All Rights Reserved