5

我正在尝试创建一个BodyPublisher可以反序列化我的 JSON 对象的自定义。我可以在创建请求并使用ofByteArray方法时反序列化 JSON,BodyPublishers但我宁愿使用自定义发布者。

public class CustomPublisher implements HttpRequest.BodyPublisher {
    private byte[] bytes;
    
    public CustomPublisher(ObjectNode jsonData) {
        ...
        // Deserialize jsonData to bytes
        ...
    }
    
    @Override
    public long contentLength() {
        if(bytes == null) return 0;
        return bytes.length
    }
    
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);       
    }

    private CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber<? super ByteBuffer> subscriber;
         private boolean cancelled;
         private Iterator<Byte> byterator;

         private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List<Byte> bytelist = new ArrayList<>();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if(cancelled) return;
             if(n < 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if(byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}

此实现有效,但request前提是使用 1 作为参数调用订阅方法。但这就是当我将它与 HttpRequest 一起使用时发生的情况。

我很确定这不是创建自定义订阅的任何首选或最佳方式,但我还没有找到更好的方法来使它工作。

如果有人能带领我走上更好的道路,我将不胜感激。

4

1 回答 1

0

避免从中创建字节数组是正确的,因为这会给大对象带来内存问题。

我不会尝试编写自定义发布者。相反,只需利用工厂方法HttpRequest.BodyPublishers.ofInputStream

HttpRequest.BodyPublisher publisher =
    HttpRequest.BodyPublishers.ofInputStream(() ->  {
        PipedInputStream in = new PipedInputStream();

        ForkJoinPool.commonPool().submit(() -> {
            try (PipedOutputStream out = new PipedOutputStream(in)) {
                objectMapper.writeTree(
                    objectMapper.getFactory().createGenerator(out),
                    jsonData);
            }
            return null;
        });

        return in;
    });

如您所述,您可以使用HttpRequest.BodyPublishers.ofByteArray. 这对于相对较小的对象来说很好,但我出于习惯而为可扩展性编程。假设代码不需要扩展的问题是其他开发人员会假设传递大对象是安全的,而没有意识到对性能的影响。

编写自己的身体出版商将是很多工作。它的subscribe方法继承自Flow.Publisher

subscribe方法的文档以此开头:

如果可能,添加给定的订阅者。

每次subscribe调用您的方法时,您都需要将订阅者添加到某种集合中,您需要创建Flow.Subscription的实现,并且需要立即将其传递给订阅者的onSubscribe方法。您的 Subscription 实现对象需要发送回一个或多个 ByteBuffers,只有在调用 Subscription 的request方法时,通过调用相应的 Subscriber(不仅仅是任何 Subscriber)的onNext方法,并且一旦您发送了所有数据,您必须调用相同的订阅者onComplete()方法。最重要的是,订阅实现对象需要处理cancel请求。

您可以通过扩展SubmissionPublisher(这是 Flow.Publisher 的默认实现),然后向其中添加一个contentLength()方法,从而使这一切变得更容易。但正如 SubmissionPublisher 文档所示,即使是最小的工作实现,您仍然有大量工作要做。

HttpRequest.BodyPublishers.of... 方法将为您完成所有这些工作。 ofByteArray适用于小物体,但ofInputStream适用于您可以通过的任何物体。

于 2020-08-09T22:26:59.100 回答