0

我正在使用反应库,我观察字节,当我得到它们时,我发布它们。我无法找出我应该在哪里等待收集所有字节。然后处理它们。

 public partial class Form1 : Form
        { 
     public Form1()
            {
                InitializeComponent();
                obj.SignatureAvailable.ObserveOn(SynchronizationContext.Current).Subscribe(HandlePinsAvailable);
            }

            void HandlePinsAvailable(byte[] signBytes)
            {
//here I will collect byte blocks.

                //MessageBox.Show(Encoding.ASCII.GetString(signBytes));
            }
    }

      public class SignatureListener
        {
             private Subject<byte[]> SignaturesAvailable = new Subject<byte[]>();

             public IObservable<byte[]> SignatureAvailable { get { return SignaturesAvailable.AsObservable(); } }

            private IDisposable SignatureSubscription;

            public SignatureListener()
            {
                SignatureSubscription = HidUtility.Messages.Subscribe(HandlePinMessageBytes);
            }
      public void HandlePinMessageBytes(byte[] signatureBytes)
            { 
    SignaturesAvailable.OnNext(sobj.RawData.ToArray());
    }
    public class data
    {
    public void get data()
    {
         private static Subject<byte[]> subject = new Subject<byte[]>();
      public static IObservable<byte[]> Messages { get { return subject.AsObservable(); } }
      subject.OnNext(bytes);//I have have the actuall blocks here.
        }
4

1 回答 1

0

我不完全理解你的问题,所以如果我的回答不恰当,请道歉。

我假设您拥有的是您正在观察的字节流。此蒸汽将向您“分块”字节,直到您到达流的末尾(如 a FileStream)。

如果是这种情况,那么您可以继续处理字节,直到可观察序列完成。

var allBytes = new List<byte>();
org.SignatureAvailable.Subscribe(
    chunk=>{allBytes.AddRange(chunk);},       //OnNext
    ()=>{/*Do something with allBytes list*/} //OnCompleted
    );

问题在于我们已经在异步序列之外泄露了这个列表。更好的选择可能是使用该Aggregate功能

org.SignatureAvailable
   .Aggregate(new List<byte>(), (acc, chunk)=>acc.AddRange(chunk))

现在,这将返回一个List<bytes>包含所有字节的单个SignatureAvailable序列OnCompletes。为了保持格式一致,您可能希望返回字节。我认为您的解决方案如下所示:

org.SignatureAvailable
   .Aggregate(new List<byte>(), (acc, chunk)=>acc.AddRange(chunk))
   .Select(list=>list.ToArray())
   .ObserveOn(SynchronizationContext.Current)
   .Subscribe(HandlePinsAvailable);          //Will only get called once when the sequence completes will all bytes.

我书中的这些链接可能会有所帮助:

遍历文件:

http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator

聚合:

http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#BuildYourOwn

于 2013-08-28T15:43:59.617 回答