我目前正在尝试了解套接字是如何工作的。我正在使用 Flask-socketio 和 python socketio 客户端并运行一个基本示例。这是我到目前为止所做的
使用模块
-i https://pypi.org/simple
certifi==2019.6.16
chardet==3.0.4
click==7.0
confluent-kafka==1.1.0
dependency-injector==3.14.7
docker==4.0.2
flask-socketio==4.2.1
flask==1.1.1
idna==2.8
itsdangerous==1.1.0
jinja2==2.10.1
markupsafe==1.1.1
python-engineio==3.10.0
python-socketio==4.4.0
requests==2.22.0
six==1.12.0
urllib3==1.25.3
websocket-client==0.56.0
werkzeug==0.15.5
服务器.py
import json
import logging
import os
import sys
import threading
from threading import Lock
from containers import Configs, Consumers, Managers
from errors import ObjectNotFound
from flask import Response
from flask_socketio import SocketIO, emit, join_room, leave_room, \
close_room, rooms, disconnect
from flask import Flask, render_template, session, request, \
copy_current_request_context
import sys, traceback
# Configure logger
logging.basicConfig(
level=logging.ERROR,
format='%(name)s - %(levelname)s - %(message)s'
)
broker = 'localhost:9092'
# Check environment variable
if 'KAFKA_BROKER' in os.environ:
broker = os.environ['KAFKA_BROKER']
elif len(sys.argv) > 1 and sys.argv[1]:
node_id = sys.argv[1]
# Override configuration
Configs.config.override({
'broker': broker,
'groupId': 'grpactconsumer'
})
async_mode = None
api = Flask(__name__)
api.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(
api,
async_handlers=False,
ping_timeout=60,
async_mode=async_mode,
cors_allowed_origins="*",
always_connect=True,
engineio_logger=True
)
thread = None
thread_lock = Lock()
logging.getLogger('engineio').setLevel(logging.ERROR)
stat_consumer = Consumers.consumer()
def worker(topic):
def on_receive(value):
socketio.sleep(0)
socketio.emit('object_stat', value, namespace='/event')
stat_consumer.consume([topic], on_receive)
worker_thread = threading.Thread(target=worker, args=['Stats'])
worker_thread.start()
def on_event_received():
socketio.emit('object_event', {}, namespace='/event')
# Start listening events
object_manager = Managers.object_manager()
object_manager.listen_events(['Events'], on_event_received)
@socketio.on('disconnect', namespace='/event')
def test_disconnect():
print('Client disconnected', request.sid)
@socketio.on('my event')
def handle_my_custom_event(json):
logging.error('received json: ' + str(json))
print('received json: ' + str(json))
# CORS Policy handlers
@api.after_request
def after_request(response):
header = response.headers
header['Access-Control-Allow-Origin'] = '*'
return response
# Object not found error handler for api
@api.errorhandler(ObjectNotFound)
def not_found_exception(error):
return json.dumps({}), 404, {'ContentType': 'application/json'}
# Unhandled error handler for api
@api.errorhandler(Exception)
def unhandled_exception(error):
return json.dumps({}), 500, {'ContentType': 'application/json'}
# Node Endpoints
@api.route('/node/list', methods=['GET'])
def list_nodes():
nodes = object_manager.node.list()
objects = {'nodes': nodes}
return json.dumps(objects, indent=4), 200, {'ContentType': 'application/json'}
# Service Endpoints
@api.route('/service', methods=['POST'])
def create_service():
service_id = object_manager.service.create(
request.json['name'],
request.json['image'],
request.json['command'],
request.json['node_labels'],
request.json['customer_id'],
request.json['application_id'],
request.json['limit']
)
return json.dumps({'success': True, 'id': service_id}), 200, {'ContentType': 'application/json'}
@api.route('/service/<service_id>', methods=['GET'])
def get_service(service_id):
service = object_manager.service.get(service_id)
return Response(json.dumps(service, indent=4), mimetype='application/json')
@api.route('/service/<service_id>/migrate', methods=['PUT'])
def migrate_service(service_id):
object_manager.service.migrate(
service_id,
request.json['node_labels']
)
return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}
@api.route('/service/<service_id>/scale', methods=['PUT'])
def scale_service(service_id):
object_manager.service.scale(
service_id,
request.json['replica_count']
)
return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}
@api.route('/service/<service_id>', methods=['DELETE'])
def remove_service(service_id):
object_manager.service.remove(service_id)
return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}
@api.route('/service/list', methods=['GET'])
def list_services():
services = object_manager.service.list()
objects = {'services': services}
return json.dumps(objects, indent=4), 200, {'ContentType': 'application/json'}
# Container Endpoints
@api.route('/container/list', methods=['GET'])
def list_containers():
containers = object_manager.container.list()
objects = {'containers': containers}
return json.dumps(objects, indent=4), 200, {'ContentType': 'application/json'}
# Start flask server
socketio.run(api, debug=True, use_reloader=False, host='0.0.0.0')
客户端.js
/** * 此文件用于套接字连接。此类接收的任何数据仅与 data.js 共享。*/
let namespace = 'http://0.0.0.0:5000/event';
let socket = io.connect(namespace, {'forceNew': true });
socket.on('object_event', function (msg, cb) {
$.get('http://0.0.0.0:5000/node/list', function (response) {
node_data_source = JSON.parse(response)['nodes'];
for (let data_table_id of Object.keys(data_table_listeners)) {
if ($.fn.DataTable.isDataTable('#' + data_table_id)) {
data_table_listeners[data_table_id]();
}
}
});
if (cb)
cb();
});
socket.on('object_stat', function (msg, cb) {
let data = JSON.parse(msg);
if (stat_data_source) {
let node_stats = stat_data_source[data['node']];
node_stats = ( typeof node_stats != 'undefined' && node_stats instanceof Array ) ?
node_stats : [];
node_stats[data['container']] = data;
stat_data_source[data['node']] = node_stats;
}
console.log("Object_stats Socket.js");
console.log(data);
for (let stat_listener of Object.keys(stat_data_listeners)) {
if (stat_data_listeners[stat_listener]) {
stat_data_listeners[stat_listener]();
}
}
if (cb)
cb();
});
错误
linkp-master.0.dey9j12wugl1@vm | INFO:engineio.server:3f2175ed5d72425da816e24ffeddb275: Sending packet MESSAGE data 2/event,["object_stat","{\"ram_limit\": 61254823575.552, \"customer_id\": \"-\", \"time\": 1575638732170, \"io_limit\": 0.0, \"container\": \"b703eadb700f\", \"node\": \"953mxmlwyvltfrx88ujlpkx3k\", \"io_usage\": 0.0, \"application_id\": \"-\", \"cpu_percent\": \"1.38\", \"ram_usage\": 407266918.4, \"network_limit\": 6144000.0, \"network_usage\": 3768000.0, \"pids\": \"25\"}"]
linkp-master.0.dey9j12wugl1@vm | f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm | INFO:engineio.server:f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm | f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm | INFO:engineio.server:f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm | emitting event "object_stat" to all [/event]
linkp-master.0.dey9j12wugl1@vm | INFO:socketio.server:emitting event "object_stat" to all [/event]