独立Apache Qpid(amqp)Junit测试示例

有没有人有一个在独立的 junit测试中使用Apache Qpid的例子。

理想情况下,我希望能够动态创建一个队列,我可以在我的测试中添加/获取msgs。 所以我没有在我的测试中测试QPid,我将使用集成测试,但是测试处理msgs的方法非常有用,不得不模拟一大堆服务。

这是我用于QPID 0.30的设置方法(我在Spock测试中使用它,但应该可以移植到Junit的Java而没有问题)。 这支持SSL连接,HTTP管理,仅使用内存启动。 启动时间小于秒。 与使用ActiveMQ用于相同目的相比,QPID的配置很难,但QPID符合AMQP,允许对AMQP客户端进行平滑,中立的测试(显然交换的使用不能模仿RabbitMQs实现,但基本用途就足够了)

首先,我创建了一个最小的test-config.json,它放在resources文件夹中:

{ "name": "${broker.name}", "modelVersion": "2.0", "defaultVirtualHost" : "default", "authenticationproviders" : [ { "name" : "passwordFile", "type" : "PlainPasswordFile", "path" : "${qpid.home_dir}${file.separator}etc${file.separator}passwd", "preferencesproviders" : [{ "name": "fileSystemPreferences", "type": "FileSystemPreferences", "path" : "${qpid.work_dir}${file.separator}user.preferences.json" }] } ], "ports" : [ { "name" : "AMQP", "port" : "${qpid.amqp_port}", "authenticationProvider" : "passwordFile", "keyStore" : "default", "protocols": ["AMQP_0_10", "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ], "transports" : [ "SSL" ] }, { "name" : "HTTP", "port" : "${qpid.http_port}", "authenticationProvider" : "passwordFile", "protocols" : [ "HTTP" ] }], "virtualhostnodes" : [ { "name" : "default", "type" : "JSON", "virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }" } ], "plugins" : [ { "type" : "MANAGEMENT-HTTP", "name" : "httpManagement" }], "keystores" : [ { "name" : "default", "password" : "password", "path": "${qpid.home_dir}${file.separator}keystore.jks" }] } 

我还需要为localhost创建一个keystore.jks文件,因为QPID代理和RabbitMQ客户端不喜欢通过未加密的通道进行通信。 我还在“integTest / resources / etc”中添加了一个名为“passwd”的文件,其中包含以下内容:

嘉宾:密码

以下是unit testing设置的代码:

类级变量:

 def tmpFolder = Files.createTempDir() Broker broker def amqpPort = PortFinder.findFreePort() def httpPort = PortFinder.findFreePort() def qpidHomeDir = 'src/integTest/resources/' def configFileName = "/test-config.json" 

setup()方法的代码:

  def setup() { broker = new Broker(); def brokerOptions = new BrokerOptions() File file = new File(qpidHomeDir) String homePath = file.getAbsolutePath(); log.info(' qpid home dir=' + homePath) log.info(' qpid work dir=' + tmpFolder.absolutePath) brokerOptions.setConfigProperty('qpid.work_dir', tmpFolder.absolutePath); brokerOptions.setConfigProperty('qpid.amqp_port',"${amqpPort}") brokerOptions.setConfigProperty('qpid.http_port', "${httpPort}") brokerOptions.setConfigProperty('qpid.home_dir', homePath); brokerOptions.setInitialConfigurationLocation(homePath + configFileName) broker.startup(brokerOptions) log.info('broker started') } 

清理代码()

 broker.shutdown() 

要从Rabbit MQ客户端建立AMQP连接:

  ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://guest:password@localhost:${amqpPort}"); factory.useSslProtocol() log.info('about to make connection') def connection = factory.newConnection(); //get a channel for sending the "kickoff" message def channel = connection.createChannel(); 

Qpid项目有许多使用嵌入式代理进行测试的测试。 虽然我们使用基本案例来处理启动关闭,但您可以执行以下操作来简单地在您的测试中集成代理:

 public void setUp() { int port=1; // Config is actually a Configuaration File App Registry object, or Configuration Application Registry. ApplicationRegistry.initialise(config, port); TransportConnection.createVMBroker(port); } public void test() {...} public void tearDown() { TransportConnection.killVMBroker(port); ApplicationRegistry.remove(port); } 

然后,对于连接,您需要为代理指定conectionURL。 即borkerlist =’vm:// 1′

我在qpid-broker @ 6.1.1上的解决方案,下面添加到pom.xml

  org.apache.qpid qpid-broker 6.1.1 test  

qpid配置文件为:

 { "name" : "${broker.name}", "modelVersion" : "6.1", "defaultVirtualHost" : "default", "authenticationproviders" : [ { "name" : "anonymous", "type" : "Anonymous" } ], "ports" : [ { "name" : "AMQP", "port" : "${qpid.amqp_port}", "authenticationProvider" : "anonymous", "virtualhostaliases" : [ { "name" : "defaultAlias", "type" : "defaultAlias" } ] } ], "virtualhostnodes" : [ { "name" : "default", "type" : "JSON", "defaultVirtualHostNode" : "true", "virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }" } ] } 

用于启动qpid服务器的代码

 Broker broker = new Broker(); BrokerOptions brokerOptions = new BrokerOptions(); // I use fix port number brokerOptions.setConfigProperty("qpid.amqp_port", "20179"); brokerOptions.setConfigurationStoreType("Memory"); // work_dir for qpid's log, configs, persist data System.setProperty("qpid.work_dir", "/tmp/qpidworktmp"); // init config of qpid. Relative path for classloader resource or absolute path for non-resource System.setProperty("qpid.initialConfigurationLocation", "qpid/qpid-config.json"); brokerOptions.setStartupLoggedToSystemOut(false); broker.startup(brokerOptions); 

用于停止qpid服务器的代码

 broker.shutdown(); 

由于我使用匿名模式,客户端应该这样做:

 SaslConfig saslConfig = new SaslConfig() { public SaslMechanism getSaslMechanism(String[] mechanisms) { return new SaslMechanism() { public String getName() { return "ANONYMOUS"; } public LongString handleChallenge(LongString challenge, String username, String password) { return LongStringHelper.asLongString(""); } }; } }; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(20179); factory.setSaslConfig(saslConfig); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); 

就这样。 关于如何在其他版本上执行此操作的更多信息。

您可以从官方网站下载qpid-broker二进制包。 下载并解压缩后,您可以运行它来针对您的案例测试服务器。 在您的案例连接服务器之后,使用命令行生成或只是复制QPID_WORK中的初始配置文件,删除无用的id字段并将其用于上述嵌入式服务器。

最复杂的是身份validation。 您可以选择PLAIN模式,但必须在初始配置中添加用户名和密码。 我选择匿名模式,连接时需要一些代码。 对于其他身份validation模式,您已指定密码文件或密钥/证书存储区,我没有尝试过。

如果它仍然不起作用,您可以阅读qpid-broker工件中的qpid-borker doc和Main类代码,其中显示了命令行如何为每个设置工作。

我能想到的最好的是:

 PropertiesConfiguration properties = new PropertiesConfiguration(); properties.addProperty("virtualhosts.virtualhost.name", "test"); properties.addProperty("security.principal-databases.principal-database.name", "testPasswordFile"); properties.addProperty("security.principal-databases.principal-database.class", "org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase"); ServerConfiguration config = new ServerConfiguration(properties); ApplicationRegistry.initialise(new ApplicationRegistry(config) { @Override protected void createDatabaseManager(ServerConfiguration configuration) throws Exception { Properties users = new Properties(); users.put("guest","guest"); users.put("admin","admin"); _databaseManager = new PropertiesPrincipalDatabaseManager("testPasswordFile", users); } }); TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); 

使用以下URL:

 amqp://admin:admin@/test?brokerlist='vm://:1?sasl_mechs='PLAIN'' 

最大的痛苦在于配置和授权。 Milage可能会有所不同。