1

我是 DDS 的新手,并尝试在 Intellij-IDEA 中编写一个简单的 Java 程序,该程序由 3 个部分组成:

  1. 发送数据的客户端模拟器。
  2. 我的程序模拟器从客户端接收数据,对其进行操作并将其发送回客户端。
  3. 读取操作数据的客户端模拟器。

我在示例中尝试发送的所有数据都是一个简单的字符串。

我正在使用 RTI Code Gen 自动生成大部分代码。

没有和unboundedSupport标志(字符串限制为 255 个字符)一切正常。但是,在应用unboundedSupport标志时,我收到以下内存不足错误:

java.lang.OutOfMemoryError: Java heap space
    at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
    at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
    at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
    at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
    at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
    at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error

我正在激活首先读取数据的客户端模拟器。

这是我的 .idl 文件:

struct JsonMessage {
    string msg;
};

这是我的主程序(第 10 行是 的初始化subscriber1):

public static void main(String... args) {
    ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
    JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber1.consume();
    ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
    JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber2.consume();
    ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
    JsonMessageSubscriber subscriber3 =
        new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
    subscriber3.consume();
  }

这是我的 ClientResultsConsumer 类:

public class ClientResultsConsumer implements Consumer {

  @Override
  public void consume(String msg) {
    System.out.println("Client results consumer got " + msg);
  }
}

这是我的 JsonMessageSubscriber 类(第 71 行是subscriber.create_datareader():

public class JsonMessageSubscriber implements DataConsumer {

  ExecutorService executor = Executors.newSingleThreadExecutor();

  public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {

    DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
        .create_participant(domainId,
                            DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
                            null /* listener */,
                            StatusKind.STATUS_MASK_NONE);
    if (participant == null) {
      System.err.println("create_participant error\n");
      System.exit(-1);
    }

    // --- Create subscriber --- //

            /* To customize subscriber QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Subscriber subscriber = participant.create_subscriber(
        DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
        StatusKind.STATUS_MASK_NONE);
    if (subscriber == null) {
      System.err.println("create_subscriber error\n");
      System.exit(-1);
    }

    // --- Create topic --- //

    /* Register type before creating topic */
    String typeName = JsonMessageTypeSupport.get_type_name();
    JsonMessageTypeSupport.register_type(participant, typeName);

            /* To customize topic QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Topic topic = participant.create_topic(
        topicName,
        typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
        null /* listener */, StatusKind.STATUS_MASK_NONE);
    if (topic == null) {
      System.err.println("create_topic error\n");
      System.exit(-1);
    }

    // --- Create reader --- //

    DataReaderListener listener = new JsonMessageListener(consumer);

            /* To customize data reader QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    JsonMessageDataReader reader = (JsonMessageDataReader)
        subscriber.create_datareader(
            topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
            StatusKind.STATUS_MASK_ALL);
    if (reader == null) {
      System.err.println("create_datareader error\n");
      System.exit(-1);
    }
  }

  // -----------------------------------------------------------------------

  @Override
  public void consume() {
    final long scanTimeMillis = 1000;
    Runnable task = () -> {
      while (true) {
        try {
          TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
        } catch (Exception e) {
          System.err.println(e.getMessage());
        }
      }
    };
    executor.submit(task);
  }
}

不幸的是,除了限制序列大小之外,我没有找到解决方案,但我知道将它限制在足够大的数量会解决我的问题,它也需要大量内存,我宁愿它不要占用更多超过每条消息所需的最小值。

任何帮助将不胜感激,谢谢

4

2 回答 2

2

使用 -unboundedSupport 时,必须在 QoS 文件中设置一些内存阈值。这些阈值在用户手册中进行了描述它们定义了阈值,在该阈值中,样本的内存要么是动态分配的,要么是从预先分配的源中​​重用的。这些必须在 DataReader 和 DataWriter 中设置。

这些阈值的设置实际上取决于您的数据大小,并且根据您的描述,我没有足够的信息来为您提供在您的场景中有意义的示例。基本上,您不想为每个样本动态分配内存。这可能会对性能产生影响,具体取决于您的数据速率。您想要选择大多数样本使用预分配内存的值。用户手册中“处理大数据时的写入端内存管理”部分下提供的示例是视频流,其中包含较大频率较低的 I 帧和较小频率较高的 P 帧。您可以查看该部分和相应的DataReader 部分以获取示例 XML 文件。

于 2018-07-18T19:15:09.177 回答
0

我设法使用此处的示例解决了问题

只需将自动生成的 qos 文件路径传递给订阅者/发布者构造函数,然后在初始化域参与者之前编写这些行(这与上面链接中提供的示例不同,提供的示例对我不起作用) :

DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
DomainParticipantFactory.TheParticipantFactory.get_qos(factoryQos);
factoryQos.profile.url_profile.add(0, qosPolicyPath);
factoryQos.profile.url_profile.setMaximum(1);
DomainParticipantFactory.TheParticipantFactory.set_qos(factoryQos);
于 2019-01-01T12:55:50.380 回答