我正在做一个项目,我从 azure IotHub 读取设备事件。我使用了链接中提供的代码。我正在为我的项目使用spring boot。代码在我的windows机器上运行良好,但是当我在我的服务器(linux机器)上部署代码时,azure eventthub接收不起作用。我还在本地 linux 机器上尝试了代码,我遇到了同样的问题,应用程序抛出以下错误并且没有收到任何新事件。我还启用了端口 5671 和 9352。
Error:
"Operation not allowed after the com.microsoft.azure.servicebus.MessageReceiver instance is Closed."
**Code**:
public class ReceiveEventService {
public void startListening() {
EventHubClient client0 = receiveMessages("0");
EventHubClient client1 = receiveMessages("1");
try {
log.info(this.getClass(), "startListening", "Started listening to azure");
System.in.read();
client0.closeSync();
client1.closeSync();
System.exit(0);
} catch (ServiceBusException | IOException e) {
log.error(this.getClass(), "error in azure startListening", e);
}
}
private EventHubClient receiveMessages(final String partitionId) {
EventHubClient client = null;
try {
client = EventHubClient.createFromConnectionStringSync(deviceEndPointString);
} catch (Exception e) {
log.error(this.getClass(), "receiveMessages - Failed to create client", e);
System.exit(1);
}
try {
client.createReceiver(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId, Instant.now())
.thenAccept(new Consumer<PartitionReceiver>() {
public void accept(PartitionReceiver receiver) {
log.info(this.getClass(), "receiveMessages",
" *Created receiver on partition " + partitionId);
try {
while (true) {
Iterable<EventData> receivedEvents = receiver.receive(100).get();
int batchSize = 0;
if (receivedEvents != null) {
for (EventData receivedEvent : receivedEvents) {
log.info(this.getClass(), "receiveMessages", "Device ID:"
+ receivedEvent.getSystemProperties()
.get("iothub-connection-device-id")
+ " offset:" + receivedEvent.getSystemProperties().getOffset()
+ " EnqueueTime:"
+ receivedEvent.getSystemProperties().getEnqueuedTime() + "SeqNo:"
+ receivedEvent.getSystemProperties().getSequenceNumber());
batchSize++;
}
}
log.debug(this.getClass(), "receiveMessages:",
"Partition " + partitionId + " ReceivedBatch size" + batchSize);
}
} catch (Exception e) {
log.error(this.getClass(), "receiveMessages-Failed to receive messages", e);
}
}
});
} catch (Exception e) {
log.error(this.getClass(), "receiveMessages-Failed to create receiver", e);
}
return client;
}
}
我的 pom.xml 中的依赖项如下。
<dependency>
<groupId>com.microsoft.azure.sdk.iot</groupId>
<artifactId>iot-service-client</artifactId>
<version>1.3.19</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>0.13.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>0.13.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.sdk.iot</groupId>
<artifactId>iot-device-client</artifactId>
<version>1.1.26</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>0.9.7</version>
</dependency>
我的主要课程如下。我的代码执行在 linux 上也可以正常工作,但只有 azure eventthub 部分会抛出错误并且无法正常工作。
@SpringBootApplication
public class AzureApplication extends SpringBootServletInitializer{
public static void main(String[] args) {
SpringApplication.run(AzureApplication.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(AzureApplication.class);
}
}