ActiveMQ
1.概述
ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。
2.特点
2.⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
●完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
●对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
●通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
●支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
●支持通过JDBC和journal提供高速的消息持久化
● 从设计上保证了高性能的集群,客户端-服务器,点对点
●支持Ajax
●支持与Axis的整合
●可以很容易的调用内嵌JMS provider,进行测试
3.安装测试
ActiveMQ服务安装
开发环境
windows64 PC
下载最新版本5.15.0release, 解压apache-activemq-5.15.0-bin.zip(或者apache-activemq-5.15.0-bin.tar.gz)目录如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
+data (默认是空的)
+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)
+example (几个例子
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
-NOTICE.txt
-README.txt
-user-guide.html
你可以使用bin\win64\activemq.bat(activemq) 启动,如果一切顺利的话,你就会看见类似下面的信息:
但若出现如下类似信息
则查看或修改配置文件E:\apache-activemq-5.15.0\apache-activemq-5.15.0\conf\jetty中的host\port是否有误
服务已经启动。。。。。。。。。。。。。。。。。。。
我们可以web访问一下下 --------->192.168.8.107:8161
接着我们在一块开发板上安装c++实现的ActiveMQ客户端库ActiveMQ_cpp,可搜索下载
开发环境
RK3288(ubuntu)
安装步骤:
依赖安装
下载apr-1.6.3.tar apr-iconv-1.2.2.tar apr-util-1.6.1.tar cppunit-1.12.1.tar
分别解压默认安装进入目录执行./configure 执行 make && make install
下载activemq-cpp-library-3.9.4-src.tar
解压执行./configure --with-apr=/usr/local/apr/ --with-cppunit=/usr/local/cppunit/
执行make && make install
一切准备就绪,则我们实现一个demo直接贴代码
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleProducer : public Runnable {
private:
CountDownLatch latch;
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;
private:
SimpleProducer( const SimpleProducer& );
SimpleProducer& operator= ( const SimpleProducer& );
public:
SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
latch(1),
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
useTopic(useTopic),
clientAck(clientAck),
numMessages(numMessages),
brokerURI(brokerURI),
destURI(destURI) {
}
virtual ~SimpleProducer(){
cleanup();
}
void close() {
this->cleanup();
}
void waitUntilReady() {
latch.await();
}
virtual void run() {
try {
auto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );
try{
connection = connectionFactory->createConnection();
connection->start();
} catch( CMSException& e ) {
e.printStackTrace();
printf("%s..........\n",e.getStackTraceString());
throw e;
}
if( clientAck )
{
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else
{
session = connection->createSession(Session::AUTO_ACKNOWLEDGE );
}
if( useTopic )
{
destination = session->createTopic( destURI );
} else
{
destination = session->createQueue( destURI );
}
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::PERSISTENT );
string threadIdStr = Long::toString( Thread::currentThread()->getId() );
string text = (string)"Hello world! from thread " + threadIdStr;
while(1)
{
unsigned int ix=0;
for(ix = 0 ; ix<numMessages; ++ix )
{
TextMessage* message = session->createTextMessage( text );
message->setIntProperty( "Integer", ix );
printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
producer->send( message );
delete message;
}
}
}catch ( CMSException& e ) {
printf("%s..........\n",e.getStackTraceString());
e.printStackTrace();
}
}
private:
void cleanup(){
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) { e.printStackTrace(); }
destination = NULL;
try
{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) { e.printStackTrace(); }
producer = NULL;
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) { e.printStackTrace(); }
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) { e.printStackTrace(); }
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) { e.printStackTrace(); }
connection = NULL;
}
};
#if 1
int main(int argc , char* argv[])
{
std::cout<<"init ActiveMQCPP library"<<std::endl;
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting produce message:" << std::endl;
std::cout << "-----------------------------------------------------\n";
std::string brokerURI =("failover:(tcp://192.168.8.107:61616)");
unsigned int numMessages = 10;
std::string destURI = "ckjiaoc@.isstech.com";
bool useTopics = true;
bool clientAck = false;
SimpleProducer producer( brokerURI, numMessages, destURI, useTopics ,clientAck);
producer.run();
//producer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished test" << std::endl;
std::cout << "=====================================================\n";
producer.waitUntilReady();
std::cout << "delete ActiveMQCPP library=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}
#endif
./Serverservice
init ActiveMQCPP library
=====================================================
Starting produce message:
-----------------------------------------------------
Sent message #1 from thread -1225509283
Sent message #2 from thread -1225509283
Sent message #3 from thread -1225509283
Sent message #4 from thread -1225509283
Sent message #5 from thread -1225509283
Sent message #6 from thread -1225509283
Sent message #7 from thread -1225509283
Sent message #8 from thread -1225509283
Sent message #9 from thread -1225509283
Sent message #10 from thread -1225509283
Sent message #1 from thread -1225509283
Sent message #2 from thread -1225509283
Sent message #3 from thread -1225509283
Sent message #4 from thread -1225509283
Sent message #5 from thread -1225509283
Sent message #6 from thread -1225509283
这时你会发现ActiveMQ的服务端的主题里就有你刚才发送的信息。。。。
主题名字可自定义,489值所对应的内容表示入队消息MessagesEnqueued 489条,0 MessageDequeued出队消息为0条,表示,此主题没有被订阅,下面,我们再实现一个可消费的ActiveMQ demo
代码如下
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener,
public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
bool useTopic ;
std::string brokerURI;
std::string destURI;
bool clientAck;
private:
SimpleAsyncConsumer( const SimpleAsyncConsumer& );
SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
public:
SimpleAsyncConsumer( const std::string& brokerURI,
const std::string& destURI,
bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
useTopic(useTopic),
brokerURI(brokerURI),
destURI(destURI),
clientAck(clientAck) {
}
virtual ~SimpleAsyncConsumer() {
this->cleanup();
}
void close() {
this->cleanup();
}
void runConsumer() {
try {
ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI );
connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
if( amqConnection != NULL )
{
amqConnection->addTransportListener( this );
}
connection->start();
connection->setExceptionListener(this);
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage( const Message* message ) {
static int count = 0;
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
if( clientAck ) {
message->acknowledge();
}
printf( "Message #%d Received: %s\n", count, text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
printf("CMS Exception occurred. Shutting down client.\n");
exit(1);
}
virtual void transportInterrupted() {
std::cout << "The Connection's Transport has been Interrupted." << std::endl;
}
virtual void transportResumed() {
std::cout << "The Connection's Transport has been Restored." << std::endl;
}
private:
void cleanup(){
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
#if 1
int main(int argc, char* argv[]) {
std::cout<<"init ActiveMQCPP library"<<std::endl;
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting recieve message:" << std::endl;
std::cout << "-----------------------------------------------------\n";
std::string brokerURI =("failover:(tcp://192.168.8.107:61616)");
std::string destURI = "ckjiaoc@.isstech.com";
bool useTopics = true;
bool clientAck = false;
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
consumer.runConsumer();
std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}
consumer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}
#endif // 0
./Consumer
init ActiveMQCPP library
=====================================================
Starting recieve message:
-----------------------------------------------------
The Connection's Transport has been Restored.
Press 'q' to quit
Message #1 Received: Hello world! from thread -1225255331
Message #2 Received: Hello world! from thread -1225255331
Message #3 Received: Hello world! from thread -1225255331
Message #4 Received: Hello world! from thread -1225255331
Message #5 Received: Hello world! from thread -1225255331
Message #6 Received: Hello world! from thread -1225255331
Message #7 Received: Hello world! from thread -1225255331
Message #8 Received: Hello world! from thread -1225255331
Message #9 Received: Hello world! from thread -1225255331
Message #10 Received: Hello world! from thread -1225255331
Message #11 Received: Hello world! from thread -1225255331
Message #12 Received: Hello world! from thread -1225255331
这时我们再看看ActiveMQ服务端会有什么变化
没错,结果表明,入队2254条信息,出队1136条信息
特别说明一下,ActiveMQ 的订阅消息模式
点对点
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。这里要注意:
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
发布/订阅
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。发布订阅模式适用于1个消息生产者,多个消费者场景,首先启动消息订阅方,在消息发布方开始执行后,接收该消息进行处理。在ActiveMQ管理界面会动态跟进消息产生-消费(入队、出队)情况;以及生产者个数,消费者个数。
这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅)
注意:ActiveMQ 内容还有很多,如消息持久化和非持久化处理,不同的应用场景,其消息模式也有所不同,值得大家去学习
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)