JEE7 + WildFly(HornetQ) – 从应用程序暂停队列

我们使用WildFly + HornetQ作为我们的应用程序服务器和JMS消息队列,并且要求能够从应用程序暂停/恢复队列。 这可能吗?

这可以使用JMX或使用hornetq核心管理API来完成。

出于本示例的目的,使用了wildfly 8.1.0.Final运行独立完整ha配置文件。

必需的Maven依赖项:

 org.hornetq hornetq-jms-client 2.4.1.Final   org.wildfly wildfly-jmx 8.1.0.Final  

这是一个测试类,演示了如何通过JMX使用JmsQueueControl:

 package test.jmx.hornetq; import org.hornetq.api.jms.management.JMSQueueControl; import javax.management.*; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; public class WildflyJmsControl { public static void main(String[] args) { try { //Get a connection to the WildFly 8 MBean server on localhost String host = "localhost"; int port = 9990; // management-web port String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port); JMXServiceURL serviceURL = new JMXServiceURL(urlString); JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null); MBeanServerConnection connection = jmxConnector.getMBeanServerConnection(); String queueName = "testQueue"; // use your queue name here String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName; ObjectName objectName = ObjectName.getInstance(mbeanObjectName); JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false); assert jmsQueueControl != null; long msgCount = jmsQueueControl.countMessages(null); System.out.println(mbeanObjectName + " message count: " + msgCount); jmsQueueControl.pause(); System.out.println("queue paused"); jmsQueueControl.resume(); System.out.println("queue resumed"); jmxConnector.close(); } catch (Exception e) { e.printStackTrace(); } } } 

要通过JMS访问hornetq管理,请使用:

 package test.jms.hornetq; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.*; import org.hornetq.api.core.management.ManagementHelper; import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; public class HornetqService { public void testPauseResumeQueue() { // this class needs to run in the same jvm as the wildfly server (ie not a remote jvm) try { ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration( InVMConnectorFactory.class.getName())); ClientSession session = locator.createSessionFactory().createSession(); session.start(); ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management"); String queueName = "testQueue"; // use your queue name here // get queue message count ClientMessage message = session.createMessage(false); ManagementHelper.putAttribute(message, queueName, "messageCount"); ClientMessage reply = requester.request(message); int count = (Integer) ManagementHelper.getResult(reply); System.out.println("There are " + count + " messages in exampleQueue"); // pause the queue message = session.createMessage(false); ManagementHelper.putOperationInvocation(message, queueName, "pause"); requester.request(message); // get queue paused message = session.createMessage(false); ManagementHelper.putAttribute(message, queueName, "paused"); reply = requester.request(message); Object result = ManagementHelper.getResult(reply); System.out.println("result: " + result.getClass().getName() + " : " + result.toString()); // resume queue message = session.createMessage(false); ManagementHelper.putOperationInvocation(message, queueName, "resume"); requester.request(message); // get queue paused message = session.createMessage(false); ManagementHelper.putAttribute(message, queueName, "paused"); reply = requester.request(message); Object result2 = ManagementHelper.getResult(reply); System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString()); requester.close(); session.close(); }catch (Exception e){ System.out.println("Error pausing queue" + e.getMessage()); } } } 

您是否正在寻找停止和开始传递消息的方法? 如果是,则JMS定义connection.Stop方法以暂停消息的传递。 可以使用connection.Start方法恢复邮件传递。

所以HornetQ JMS客户端将实现这些方法。 您将需要使用这些方法。