我创建了一个非常简单的逻辑门模拟器,它可以在输入端采用 IEnumerable 馈送,并流经一系列门,在输出端准确地产生高/低位。
这个想法是将概念扩展到多门网络,将存储在内存数据库中的数组中的值馈送到网络的输入,并在逻辑门网络中复制场景评估规则集。假设是,Rx 允许的流式操作应该提供一种机制来及时评估 400,000 个场景,从而避免搜索和数组迭代。大多数输入只是前馈,但也有一些需要与网络的输出一起工作,因为下一个场景的决策依赖于前一个场景的输出。
一切正常,直到我尝试在网络中创建一个反馈循环,尝试将网络的输出流用作门的输入流,门的输出是网络输出分辨率的一部分。
我想知道如何获取输出流并以这种方式使用它。我确实看过“Recursive / fan-out in Reactive Extensions”,感觉它可能会有所帮助,但我对 Rx 很陌生,并没有完全理解它。
这是我到目前为止的代码..非常简单,有点混乱(道歉)并且目前已损坏(尽管如果反馈尝试被删除并替换为前馈尝试,则将起作用)。
class Program
{
private static AndGate _andGate;
private static XorGate _xorGate;
static void Main(string[] args)
{
var data = new List<decimal>();
var data2 = new List<decimal>();
var output = new List<Tuple<Bit, Bit, Bit>>();
var rnd = new Random();
//for (var i = 0; i < 10000; i++)
//{
// data.Add(rnd.Next(0, 100));
// data2.Add(rnd.Next(0, 100));
//}
data.Add(10);
data.Add(30);
data.Add(80);
data.Add(5);
data.Add(34);
data.Add(27);
data2.Add(10);
data2.Add(5);
data2.Add(10);
data2.Add(34);
data2.Add(67);
data2.Add(80);
var stream = data.ToObservable();
var stream2 = data2.ToObservable();
var filter = new DataFilter(stream, a => a > 27);
var filter2 = new DataFilter(stream2, a => a < 12);
_andGate = new AndGate {InputA = filter.OutputStream};
_andGate.InputB = new XorGate(filter2.OutputStream, _andGate.Output.Publish()).Output;
_andGate.SetOutput();
//filter.OutputStream.Subscribe(x => Console.WriteLine(x));
//filter2.OutputStream.Subscribe(x => Console.WriteLine(x));
_andGate.Output.Subscribe(x=>Console.WriteLine(x));
//var zippedSequence =
// Observable.When(
// filter.OutputStream.And(filter2.OutputStream)
// .Then((first, second) => new [] { first, second }));
//zippedSequence.Subscribe(x=>Console.WriteLine("{0}, {1}", x[0], x[1]));
Console.ReadLine();
}
}
public class DataFilter
{
protected IObservable<Bit> ValueStream;
public IObservable<Bit> OutputStream
{
get { return ValueStream; }
}
public DataFilter(IObservable<decimal> stream, Func<decimal, bool> operation )
{
ValueStream = stream.Select(a => operation(a)? Bit.High : Bit.Low);
}
}
public class AndGate
{
protected IObservable<Bit> StreamA;
protected IObservable<Bit> StreamB;
protected IObservable<Bit> OutputStream;
public IObservable<Bit> InputA { get { return StreamA; } set { StreamA = value; } }
public IObservable<Bit> InputB { get { return StreamB; } set { StreamB = value; } }
public IObservable<Bit> Output
{
get { return OutputStream; }
}
public AndGate()
{
OutputStream = Observable.When(StreamA.And(StreamB).Then(Determine));
}
public AndGate(IObservable<Bit> streamA, IObservable<Bit> streamB)
{
OutputStream = Observable.When(streamA.And(streamB).Then(Determine));
}
private Bit Determine(Bit bitA, Bit bitB)
{
return bitA == Bit.High && bitB == Bit.High ? Bit.High : Bit.Low;
}
}
public class OrGate
{
protected IObservable<Bit> InputStreamA;
protected IObservable<Bit> InputStreamB;
protected IObservable<Bit> OutputStream;
public IObservable<Bit> Output
{
get { return OutputStream; }
}
public OrGate()
{
}
public OrGate(IObservable<Bit> streamA, IObservable<Bit> streamB)
{
OutputStream = Observable.When(streamA.And(streamB).Then(Determine));
}
private Bit Determine(Bit bitA, Bit bitB)
{
return bitA == Bit.High || bitB == Bit.High ? Bit.High : Bit.Low;
}
}
public class XorGate
{
protected IObservable<Bit> StreamA;
protected IObservable<Bit> StreamB;
protected IObservable<Bit> OutputStream;
public IObservable<Bit> Output
{
get { return OutputStream; }
}
public XorGate(IObservable<Bit> streamA, IObservable<Bit> streamB)
{
OutputStream = Observable.When(streamA.And(streamB).Then(Determine));
}
private Bit Determine(Bit bitA, Bit bitB)
{
return bitA == Bit.High ^ bitB == Bit.High ? Bit.High : Bit.Low;
}
}
public enum Bit { Low = 0, High = 1}
编辑:我添加了这个图表以帮助理解。红线是引起问题的线。;-)
另一个编辑:我继续解决这个小问题,发现 Rx 的 Generate 和 Publish/Connect 元素使这个问题得以解决。通过发布门网络的输出,可以检查输出并根据输出值设置网络元素的变化。Generate 方法允许我设置一个带有两个内部比特流的异或门,1 个高和 1 个低(最初),这种排列的输出为高。异或门有一个状态位,可以使用发布的输出流从门外设置。此方法锁存异或门高或低的输出。
class Program
{
private static AndGate _andGate1;
private static AndGate _andGate2;
private static XorGate _xorGate;
static void Main(string[] args)
{
var data = new List<decimal>();
var data2 = new List<decimal>();
var output = new List<Tuple<Bit, Bit, Bit>>();
var rnd = new Random();
//for (var i = 0; i < 10000; i++)
//{
// data.Add(rnd.Next(0, 100));
// data2.Add(rnd.Next(0, 100));
//}
data.Add(10);
data.Add(23);
data.Add(80);
data.Add(5);
data.Add(34);
data.Add(27);
data2.Add(10);
data2.Add(5);
data2.Add(10);
data2.Add(34);
data2.Add(67);
data2.Add(80);
//Raw Data streams
var stream = data.ToObservable();
var stream2 = data2.ToObservable();
//Converted to Bit streams
var filter = new DataFilter(stream, a => a > 27);
var filter2 = new DataFilter(stream2, a => a < 12);
//Gate network
_xorGate = new XorGate();
_andGate1 = new AndGate(filter.OutputStream, _xorGate.Output);
_andGate2 = new AndGate(_andGate1.Output, filter2.OutputStream );
//Publish and Connect to the outcome of the network
var observable = _andGate2.Output.SubscribeOn(NewThreadScheduler.Default).Publish();
observable.Connect();
//Subscribe to the outcome to allow changes to be made to the XorGate
observable.Subscribe(x => { if (x == Bit.High) { _xorGate.SetStatusBitHigh(x); } });
//View the results
observable.Subscribe(x => Console.WriteLine(x));
Console.ReadLine();
}
}
public class DataFilter
{
protected IObservable<Bit> ValueStream;
public IObservable<Bit> OutputStream
{
get { return ValueStream; }
}
public DataFilter(IObservable<decimal> stream, Func<decimal, bool> operation )
{
ValueStream = stream.Select(a => operation(a)? Bit.High : Bit.Low);
}
}
public class AndGate
{
protected IObservable<Bit> OutputStream;
public IObservable<Bit> Output
{
get { return OutputStream; }
}
public AndGate(IObservable<Bit> streamA, IObservable<Bit> streamB)
{
OutputStream = Observable.When(streamA.And(streamB).Then(Determine));
}
private Bit Determine(Bit bitA, Bit bitB)
{
return bitA == Bit.High && bitB == Bit.High ? Bit.High : Bit.Low;
}
}
/// <summary>
/// This is really a bastardized XorGate as I force two Bit streams into the inputs
/// Keep one High, start the other Low and then send it High when a high signal results from the output of the network
/// This allows me to force the network output low i.e. take no action...
/// ...until the Bit is sent low again by another network (no done yet)
/// </summary>
public class XorGate
{
protected IObservable<Bit> StreamA;
protected IObservable<Bit> StreamB;
protected IObservable<Bit> OutputStream;
protected Bit StatusBit;
public void SetStatusBitHigh(Bit input)
{
//No action if circumstances are as you want them
if (input == StatusBit) return;
//Belt and Braces check here
if (StatusBit == Bit.Low && input == Bit.High)
{
StatusBit = Bit.High;
}
}
public void SetStatusBitLow(Bit input)
{
//No action if circumstances are as you want them
if (input == StatusBit) return;
//Belt and Braces check here
if (StatusBit == Bit.High && input == Bit.Low)
{
StatusBit = Bit.Low;
}
}
/// <summary>
/// Output the stream safely
/// </summary>
public IObservable<Bit> Output
{
get { return OutputStream; }
}
public XorGate()
{
//Set the StatusBit to Low initially
StatusBit = Bit.Low;
//Create a permanent high stream for one of the (imaginary) gate pins
var streamA = Observable.Generate(Bit.High, bit => true, bit => bit, bit => bit);
//Create a low stream for the other (imaginary) pin
//One which sets itself to the value of the StatusBit so that a StatusBit = Bit.High will produce a High stream
var streamB = Observable.Generate(Bit.Low, bit => true, bit => StatusBit, bit => bit);
//Produce the output
OutputStream = Observable.When(streamA.And(streamB).Then(Determine));
}
private Bit Determine(Bit bitA, Bit bitB)
{
return bitA == Bit.High ^ bitB == Bit.High ? Bit.High : Bit.Low;
}
}
public class OrGate
{
protected IObservable<Bit> OutputStream;
public IObservable<Bit> Output
{
get { return OutputStream; }
}
public OrGate(IObservable<Bit> streamA, IObservable<Bit> streamB)
{
OutputStream = Observable.When(streamA.And(streamB).Then(Determine));
}
private Bit Determine(Bit bitA, Bit bitB)
{
return bitA == Bit.High || bitB == Bit.High ? Bit.High : Bit.Low;
}
}
public enum Bit { Low = 0, High = 1}
电路现在看起来像这样
我欢迎评论更正和建议以其他更好的方式实现这一目标,因为它仍然让我觉得有点 hacky。