有没有人有在独立junit 测试中使用 Apache Qpid 的示例。
理想情况下,我希望能够动态创建一个队列,我可以在我的测试中放置/获取消息。所以我没有在我的测试中测试 QPid,我将为此使用集成测试,但是对于测试处理 msgs 的方法非常有用,因为它必须模拟大量服务。
这是我用于 QPID 0.30 的设置方法(我在 Spock 测试中使用它,但应该可以毫无问题地移植到 Junit 的 Java 中)。这支持 SSL 连接、HTTP 管理,并且仅使用内存中启动。启动时间是亚秒级。与将 ActiveMQ 用于相同目的相比,QPID 的配置很尴尬,但 QPID 符合 AMQP 并允许对 AMQP 客户端进行平滑、中立的测试(显然,交换的使用不能模仿 RabbitMQs 的实现,但对于基本目的来说就足够了)
首先,我创建了一个最小的 test-config.json,将其放入资源文件夹中:
{
"name": "${broker.name}",
"modelVersion": "2.0",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "passwordFile",
"type" : "PlainPasswordFile",
"path" : "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
"preferencesproviders" : [{
"name": "fileSystemPreferences",
"type": "FileSystemPreferences",
"path" : "${qpid.work_dir}${file.separator}user.preferences.json"
}]
} ],
"ports" : [ {
"name" : "AMQP",
"port" : "${qpid.amqp_port}",
"authenticationProvider" : "passwordFile",
"keyStore" : "default",
"protocols": ["AMQP_0_10", "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ],
"transports" : [ "SSL" ]
}, {
"name" : "HTTP",
"port" : "${qpid.http_port}",
"authenticationProvider" : "passwordFile",
"protocols" : [ "HTTP" ]
}],
"virtualhostnodes" : [ {
"name" : "default",
"type" : "JSON",
"virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"
} ],
"plugins" : [ {
"type" : "MANAGEMENT-HTTP",
"name" : "httpManagement"
}],
"keystores" : [ {
"name" : "default",
"password" : "password",
"path": "${qpid.home_dir}${file.separator}keystore.jks"
}]
}
II 还需要为 localhost 创建一个 keystore.jks 文件,因为 QPID 代理和 RabbitMQ 客户端不喜欢通过未加密的通道进行通信。我还在“integTest/resources/etc”中添加了一个名为“passwd”的文件,其中包含以下内容:
客人:密码
以下是单元测试设置中的代码:
def tmpFolder = Files.createTempDir()
Broker broker
def amqpPort = PortFinder.findFreePort()
def httpPort = PortFinder.findFreePort()
def qpidHomeDir = 'src/integTest/resources/'
def configFileName = "/test-config.json"
def setup() {
broker = new Broker();
def brokerOptions = new BrokerOptions()
File file = new File(qpidHomeDir)
String homePath = file.getAbsolutePath();
log.info(' qpid home dir=' + homePath)
log.info(' qpid work dir=' + tmpFolder.absolutePath)
brokerOptions.setConfigProperty('qpid.work_dir', tmpFolder.absolutePath);
brokerOptions.setConfigProperty('qpid.amqp_port',"${amqpPort}")
brokerOptions.setConfigProperty('qpid.http_port', "${httpPort}")
brokerOptions.setConfigProperty('qpid.home_dir', homePath);
brokerOptions.setInitialConfigurationLocation(homePath + configFileName)
broker.startup(brokerOptions)
log.info('broker started')
}
broker.shutdown()
从 Rabbit MQ 客户端建立 AMQP 连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://guest:password@localhost:${amqpPort}");
factory.useSslProtocol()
log.info('about to make connection')
def connection = factory.newConnection();
//get a channel for sending the "kickoff" message
def channel = connection.createChannel();
Qpid 项目有许多使用嵌入式代理进行测试的测试。虽然我们使用基本案例来处理启动关闭,但您可以执行以下操作来简单地将代理集成到您的测试中:
public void setUp()
{
int port=1;
// Config is actually a Configuaration File App Registry object, or Configuration Application Registry.
ApplicationRegistry.initialise(config, port);
TransportConnection.createVMBroker(port);
}
public void test()
{...}
public void tearDown()
{
TransportConnection.killVMBroker(port);
ApplicationRegistry.remove(port);
}
然后对于连接,您需要为代理指定连接 URL。即 borkerlist='vm://1'
我在 qpid-broker @ 6.1.1 上的解决方案,将下面添加到 pom.xml
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker</artifactId>
<version>6.1.1</version>
<scope>test</scope>
</dependency>
qpid 配置文件为:
{
"name" : "${broker.name}",
"modelVersion" : "6.1",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "anonymous",
"type" : "Anonymous"
} ],
"ports" : [ {
"name" : "AMQP",
"port" : "${qpid.amqp_port}",
"authenticationProvider" : "anonymous",
"virtualhostaliases" : [ {
"name" : "defaultAlias",
"type" : "defaultAlias"
} ]
} ],
"virtualhostnodes" : [ {
"name" : "default",
"type" : "JSON",
"defaultVirtualHostNode" : "true",
"virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"
} ]
}
启动 qpid 服务器的代码
Broker broker = new Broker();
BrokerOptions brokerOptions = new BrokerOptions();
// I use fix port number
brokerOptions.setConfigProperty("qpid.amqp_port", "20179");
brokerOptions.setConfigurationStoreType("Memory");
// work_dir for qpid's log, configs, persist data
System.setProperty("qpid.work_dir", "/tmp/qpidworktmp");
// init config of qpid. Relative path for classloader resource or absolute path for non-resource
System.setProperty("qpid.initialConfigurationLocation", "qpid/qpid-config.json");
brokerOptions.setStartupLoggedToSystemOut(false);
broker.startup(brokerOptions);
停止 qpid 服务器的代码
broker.shutdown();
由于我使用匿名模式,客户端应该这样做:
SaslConfig saslConfig = new SaslConfig() {
public SaslMechanism getSaslMechanism(String[] mechanisms) {
return new SaslMechanism() {
public String getName() {
return "ANONYMOUS";
}
public LongString handleChallenge(LongString challenge, String username, String password) {
return LongStringHelper.asLongString("");
}
};
}
};
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(20179);
factory.setSaslConfig(saslConfig);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
就这样。更多关于如何在其他版本上做到这一点。
您可以从官方网站下载 qpid-broker 二进制包。下载并解压缩后,您可以运行它以针对您的案例作为服务器进行测试。在您的案例连接好服务器后,使用命令行生成或仅复制 QPID_WORK 中的初始配置文件,删除无用的 id 文件并将其用于上述嵌入式服务器。
最复杂的是身份验证。您可以选择 PLAIN 模式,但您必须在初始配置中添加用户名和密码。我选择匿名模式,连接时需要一点代码。对于其他身份验证模式,您已经指定了密码文件或密钥/证书存储,我没有尝试过。
如果仍然无法正常工作,您可以阅读 qpid-broker 工件中的 qpid-borker 文档和 Main 类代码,其中显示了命令行如何为每个设置工作。
我能想到的最好的办法是:
PropertiesConfiguration properties = new PropertiesConfiguration();
properties.addProperty("virtualhosts.virtualhost.name", "test");
properties.addProperty("security.principal-databases.principal-database.name", "testPasswordFile");
properties.addProperty("security.principal-databases.principal-database.class", "org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase");
ServerConfiguration config = new ServerConfiguration(properties);
ApplicationRegistry.initialise(new ApplicationRegistry(config) {
@Override
protected void createDatabaseManager(ServerConfiguration configuration) throws Exception {
Properties users = new Properties();
users.put("guest","guest");
users.put("admin","admin");
_databaseManager = new PropertiesPrincipalDatabaseManager("testPasswordFile", users);
}
});
TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
使用以下网址:
amqp://admin:admin@/test?brokerlist='vm://:1?sasl_mechs='PLAIN''
最大的痛苦在于配置和授权。里程可能会有所不同。