您可以使用我们的aktos-dcs和aktos-dcs-cs库。我已经成功地在生产中使用了这些库,以使 RFID 阅读器(来自 Impinj)与我们的遥测系统进行通信(实际上,集成在其中)。RFID 阅读器有一个 C# API,我们在遥测系统中大量使用 Python。
最简单的测试用例是一个 pinger-ponger 应用程序,下面是这些库的样子:
pinger.cs:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using aktos_dcs_cs;
namespace pinger
{
class Pinger : Actor
{
public void handle_PingMessage(Dictionary<string, object> msg)
{
Console.WriteLine("Pinger handled PingMessage: {0} ", msg["text"]);
string msg_ser = @"
{""PongMessage"":
{""text"": ""this is proper message from csharp implementation""}
}
";
System.Threading.Thread.Sleep(2000);
send(msg_ser);
}
}
class Program
{
static void Main(string[] args)
{
Pinger x = new Pinger();
Actor.wait_all();
}
}
}
ponger.py:
from aktos_dcs import *
class Pinger(Actor):
def handle_PingMessage(self, msg_raw):
msg = get_msg_body(msg_raw)
print "Pinger got ping message: ", msg['text'], (time.time() - msg_raw['timestamp'])
sleep(2)
self.send({'PongMessage': {'text': "Hello ponger, this is pinger 1!"}})
if __name__ == "__main__":
ProxyActor()
pinger = Pinger()
pinger.send({'PongMessage': {'text': "Hello ponger, this is STARTUP MESSAGE!"}})
wait_all()