我正在尝试构建一个 STOMP 库,该库可用于从 chrome 应用程序连接到 RabbitMQ。我从外部 chrome 进行的初步实验效果很好。但是,我无法将 Socket 代码转换为在 chrome 应用程序中工作。可以修改下面的客户端 api 代码以在 chrome 应用程序中工作吗?
问问题
1809 次
1 回答
1
我最初的实验是成功的。我希望在客户端 dart 库中看到完整的 Socket API,以便我们可以直接从 chrome 应用程序中使用此类代码。虽然使用 chrome 的 js 套接字功能提供了一个 Socket API,但它不像 dart.io 包中可用于服务器端代码的那样干净。
这是代码。要使用它,请部署 RabbitMQ 并启用 STOMP 插件。
import 'dart:io';
import 'dart:async';
void main() {
List<String> versions = ["1.2","1.1","1.0"];
String host = "127.0.0.1"; // localhost
int port = 61613; // rabbitmq default port
Socket.connect(host, 61613).then((connection) {
String hostpath="/";
String login = "guest"; // rabbitmq default login
String passcode = "guest"; // rabbitmq default passcode
stomp(connection, versions, hostpath, login, passcode); // stomp connect
connection
.transform(new StompTransformer())
.listen((frame) {
if(frame.headers.containsKey("ack")) {
ack(connection, frame.headers["ack"]);
}
dumpStompFrame(frame);
stdout.write("enter a message> ");
},
onDone: () { print("done"); },
onError: (e) { print (e); } );
subscribe(connection, "/queue/a", 1, "client", true);
stdin
.transform(new StringDecoder())
.transform(new LineTransformer())
.listen((line) {
send(connection, "/queue/a", line);
});
});
}
/*
* STOMP
* accept-version:1.0,1.1,2.0
* host:/
*
* ^@
*/
void stomp(Socket connection,
List<String> versions,
String hostpath,
String login, String passcode) {
connection.writeln("STOMP");
if(versions.length > 0) {
connection.writeln("accept-version:${versions.join(',')}");
}
connection.writeln("host:$hostpath");
connection.writeln("login:$login");
connection.writeln("passcode:$passcode");
connection.writeln();
connection.add([0x00]);
}
/*
* SUBSCRIBE
* id:0
* destination:/queue/foo
* ack:client
*
* ^@
*/
void subscribe(Socket connection,
String destination,
int id, String ack, bool persistent) {
connection.writeln("SUBSCRIBE");
connection.writeln("id:$id");
connection.writeln("destination:$destination");
connection.writeln("ack:$ack");
connection.writeln("persistent:$persistent");
connection.writeln();
connection.add([0x00]);
}
/*
* UNSUBSCRIBE
* id:0
*
* ^@
*/
void unsubscribe(Socket connection, int id) {
connection.writeln("UNSUBSCRIBE");
connection.writeln("id:$id");
connection.writeln();
connection.add([0x00]);
}
/*
* ACK
* id:12345
* transaction:tx1
*
* ^@
*/
void ack(Socket connection, String id, [String transaction]) {
connection.writeln("ACK");
connection.writeln("id:$id");
if(?transaction) {
connection.writeln("transaction:$transaction");
}
connection.writeln();
connection.add([0x00]);
}
/*
* NACK
* id:12345
* transaction:tx1
*
* ^@
*/
void nack(Socket connection, String id, [String transaction]) {
connection.writeln("NACK");
connection.writeln("id:$id");
if(?transaction) {
connection.writeln("transaction:$transaction");
}
connection.writeln();
connection.add([0x00]);
}
/*
* BEGIN
* transaction:tx1
*
* ^@
*/
void begin(Socket connection, String transaction) {
connection.writeln("BEGIN");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* COMMIT
* transaction:tx1
*
* ^@
*/
void commit(Socket connection, String transaction) {
connection.writeln("COMMIT");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* ABORT
* transaction:tx1
*
* ^@
*/
void abort(Socket connection, String transaction) {
connection.writeln("ABORT");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* SEND
* destination:/queue/a
* content-type:text/plain
*
* hello queue a
* ^@
*/
void send(Socket connection, String queue, String message) {
connection.writeln("SEND");
connection.writeln("destination:$queue");
connection.writeln("content-type:text/plain");
connection.writeln();
connection.write(message);
connection.add([0x00]);
}
class StompServerFrame {
String frame;
Map<String, String> headers = new Map<String, String>();
List<int> body = new List<int>();
String toString() {
StringBuffer sb = new StringBuffer();
sb.writeln(frame);
for(String key in headers.keys) {
sb.writeln("$key=${headers[key]}");
}
sb.writeln(new String.fromCharCodes(body));
return sb.toString();
}
}
void dumpStompFrame(StompServerFrame frame) {
print("BEGIN STOMP FRAME DUMP");
print(frame.toString());
print("END STOMP FRAME DUMP");
}
class StompTransformer extends StreamEventTransformer<List<int>, StompServerFrame> {
List<String> serverFrames = ['CONNECTED', 'MESSAGE', 'RECEIPT', 'ERROR'];
String state = 'COMMAND'; // 'COMMAND', 'HEADERS', 'BODY'
List<int> token = new List<int>();
StompServerFrame stompServerFrame = new StompServerFrame();
StompTransformer() {}
int lastValue = -1;
void handleData(List<int> intList, EventSink<StompServerFrame> sink) {
for(int b in intList) {
switch(state) {
case 'COMMAND':
if(b == 0x0a) { // done with command
stompServerFrame.frame = new String.fromCharCodes(token);
state = 'HEADERS';
token.clear();
} else {
token.add(b);
}
lastValue = b;
break;
case 'HEADERS':
if(b == 0x0a && lastValue == 0x0a) { // done with all headers
state = 'BODY';
token.clear();
lastValue = -1;
} else if(b == 0x0a && lastValue != 0x0a) { // done with a header
String tokenString = new String.fromCharCodes(token);
List<String> tokenStringParts = tokenString.split(":");
if(tokenStringParts.length == 2) {
stompServerFrame.headers.putIfAbsent(tokenStringParts.elementAt(0),
() => tokenStringParts.elementAt(1));
} else {
// possible header format error
print("was here with $tokenString");
}
token.clear();
lastValue = b;
} else {
token.add(b);
lastValue = b;
}
break;
case 'BODY':
if(b == 0x00) { // done with body
sink.add(stompServerFrame);
stompServerFrame = new StompServerFrame();
state = 'COMMAND';
token.clear();
lastValue = -1;
} else {
stompServerFrame.body.add(b);
}
break;
default:
break;
}
}
}
}
于 2013-07-15T06:15:53.873 回答