0

我正在尝试构建一个 STOMP 库,该库可用于从 chrome 应用程序连接到 RabbitMQ。我从外部 chrome 进行的初步实验效果很好。但是,我无法将 Socket 代码转换为在 chrome 应用程序中工作。可以修改下面的客户端 api 代码以在 chrome 应用程序中工作吗?

4

1 回答 1

1

我最初的实验是成功的。我希望在客户端 dart 库中看到完整的 Socket API,以便我们可以直接从 chrome 应用程序中使用此类代码。虽然使用 chrome 的 js 套接字功能提供了一个 Socket API,但它不像 dart.io 包中可用于服务器端代码的那样干净。

这是代码。要使用它,请部署 RabbitMQ 并启用 STOMP 插件。

import 'dart:io';
import 'dart:async';

void main() {
  List<String> versions = ["1.2","1.1","1.0"];
  String host = "127.0.0.1"; // localhost
  int port = 61613; // rabbitmq default port
  Socket.connect(host, 61613).then((connection) {
    String hostpath="/";
    String login = "guest"; // rabbitmq default login
    String passcode = "guest"; // rabbitmq default passcode
    stomp(connection, versions, hostpath, login, passcode); // stomp connect
    connection
      .transform(new StompTransformer())
      .listen((frame) {
          if(frame.headers.containsKey("ack")) {
            ack(connection, frame.headers["ack"]);
          }
          dumpStompFrame(frame);
          stdout.write("enter a message> ");
        },
        onDone: () { print("done"); },
        onError: (e) { print (e); } );
    subscribe(connection, "/queue/a", 1, "client", true);
    stdin
      .transform(new StringDecoder())
      .transform(new LineTransformer())
      .listen((line) {
        send(connection, "/queue/a", line);
      });
  });
}

/* 
 * STOMP
 * accept-version:1.0,1.1,2.0
 * host:/
 * 
 * ^@
 */

void stomp(Socket connection,
  List<String> versions,
  String hostpath,
  String login, String passcode) {
  connection.writeln("STOMP");
  if(versions.length > 0) {
    connection.writeln("accept-version:${versions.join(',')}");
  }
  connection.writeln("host:$hostpath");
  connection.writeln("login:$login");
  connection.writeln("passcode:$passcode");
  connection.writeln();
  connection.add([0x00]);
}

/*
 * SUBSCRIBE
 * id:0
 * destination:/queue/foo
 * ack:client
 * 
 * ^@
 */
void subscribe(Socket connection,
  String destination,
  int id, String ack, bool persistent) {
  connection.writeln("SUBSCRIBE");
  connection.writeln("id:$id");
  connection.writeln("destination:$destination");
  connection.writeln("ack:$ack");
  connection.writeln("persistent:$persistent");
  connection.writeln();
  connection.add([0x00]);
}

/*
 * UNSUBSCRIBE
 * id:0
 * 
 * ^@
*/
void unsubscribe(Socket connection, int id) {
  connection.writeln("UNSUBSCRIBE");
  connection.writeln("id:$id");
  connection.writeln();
  connection.add([0x00]);
}

/*
 * ACK
 * id:12345
 * transaction:tx1
 * 
 * ^@
*/
void ack(Socket connection, String id, [String transaction]) {
  connection.writeln("ACK");
  connection.writeln("id:$id");
  if(?transaction) {
    connection.writeln("transaction:$transaction");
  }
  connection.writeln();
  connection.add([0x00]);
}

/*
 * NACK
 * id:12345
 * transaction:tx1
 * 
 * ^@
*/
void nack(Socket connection, String id, [String transaction]) {
  connection.writeln("NACK");
  connection.writeln("id:$id");
  if(?transaction) {
    connection.writeln("transaction:$transaction");
  }
  connection.writeln();
  connection.add([0x00]);
}

/*
 * BEGIN
 * transaction:tx1
 * 
 * ^@
 */
void begin(Socket connection, String transaction) {
  connection.writeln("BEGIN");
  connection.writeln("transaction:$transaction");
  connection.writeln();
  connection.add([0x00]);
}

/*
 * COMMIT
 * transaction:tx1
 * 
 * ^@
 */
void commit(Socket connection, String transaction) {
  connection.writeln("COMMIT");
  connection.writeln("transaction:$transaction");
  connection.writeln();
  connection.add([0x00]);
}

/*
 * ABORT
 * transaction:tx1
 * 
 * ^@
 */
void abort(Socket connection, String transaction) {
  connection.writeln("ABORT");
  connection.writeln("transaction:$transaction");
  connection.writeln();
  connection.add([0x00]);
}

/*
 * SEND
 * destination:/queue/a
 * content-type:text/plain
 * 
 * hello queue a
 * ^@
 */
void send(Socket connection, String queue, String message) {
  connection.writeln("SEND");
  connection.writeln("destination:$queue");
  connection.writeln("content-type:text/plain");
  connection.writeln();
  connection.write(message);
  connection.add([0x00]);
}

class StompServerFrame {
  String frame;
  Map<String, String> headers = new Map<String, String>();
  List<int> body = new List<int>();
  String toString() {
    StringBuffer sb = new StringBuffer();
    sb.writeln(frame);
    for(String key in headers.keys) {
      sb.writeln("$key=${headers[key]}");
    }
    sb.writeln(new String.fromCharCodes(body));
    return sb.toString();
  }
}

void dumpStompFrame(StompServerFrame frame) {
  print("BEGIN STOMP FRAME DUMP");
  print(frame.toString());
  print("END STOMP FRAME DUMP");
}

class StompTransformer extends StreamEventTransformer<List<int>, StompServerFrame> {
  List<String> serverFrames = ['CONNECTED', 'MESSAGE', 'RECEIPT', 'ERROR'];
  String state = 'COMMAND'; // 'COMMAND', 'HEADERS', 'BODY'
  List<int> token = new List<int>();
  StompServerFrame stompServerFrame = new StompServerFrame();
  StompTransformer() {}
  int lastValue = -1;
  void handleData(List<int> intList, EventSink<StompServerFrame> sink) {
    for(int b in intList) {
      switch(state) {
        case 'COMMAND':
          if(b == 0x0a) { // done with command
            stompServerFrame.frame = new String.fromCharCodes(token);
            state = 'HEADERS';
            token.clear();
          } else {
            token.add(b);
          }
          lastValue = b;
          break;
        case 'HEADERS':
          if(b == 0x0a && lastValue == 0x0a) { // done with all headers
            state = 'BODY';
            token.clear();
            lastValue = -1;
          } else if(b == 0x0a && lastValue != 0x0a) { // done with a header
            String tokenString = new String.fromCharCodes(token);
            List<String> tokenStringParts = tokenString.split(":");
            if(tokenStringParts.length == 2) {
              stompServerFrame.headers.putIfAbsent(tokenStringParts.elementAt(0),
                  () => tokenStringParts.elementAt(1));
            } else {
              // possible header format error
              print("was here with $tokenString");
            }
            token.clear();
            lastValue = b;
          } else {
            token.add(b);
            lastValue = b;
          }
          break;
        case 'BODY':
          if(b == 0x00) { // done with body
            sink.add(stompServerFrame);
            stompServerFrame = new StompServerFrame();
            state = 'COMMAND';
            token.clear();
            lastValue = -1;
          } else {
            stompServerFrame.body.add(b);
          }
          break;
        default:
          break;
      }
    }
  }
}
于 2013-07-15T06:15:53.873 回答