1

您在这里有 websocket 协议和服务器文件。如何启动 WebSocketSyncServer?对不起,如果这是一个愚蠢的问题,但我是节点新手。

在让服务器工作后,我认为至少它开始时没有错误。在客户端,我现在得到 db.hasBeenClosed 不是一个函数。

客户端代码:

function start(){
    console.log('start')
var shoeSize = "11"
    var db = new Dexie("test");

    db.syncable.connect ("websocket", "localhost:8075");
    db.syncable.on('statusChanged', function (newStatus, url) {
        console.log ("Sync Status changed: " + Dexie.Syncable.StatusTexts[newStatus]);
    });

    var db = new Dexie("test1");
    db.version(1).stores({
            friends: "$$oid,name,shoeSize",
            pets: "$$oid,name,kind"
    });

    db.syncable.connect ("websocket", "https://localhost:8089");
    db.syncable.on('statusChanged', function (newStatus, url) {
        console.log ("Sync Status changed: " + Dexie.Syncable.StatusTexts[newStatus]);
    });

    db.transaction('rw', db.friends, function (friends) {
        friends.add({name: "Arne", shoeSize: 47});
        friends.where(shoeSize).above(40).each(function (friend) {
            console.log("Friend with shoeSize over 40: " + friend.name);
        });
    });
}

服务器代码:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8089 });

// CREATE / UPDATE / DELETE constants:
var CREATE = 1,
    UPDATE = 2,
    DELETE = 3;

const server = new WebSocket.Server({ port: 9078 });

var db = {
    tables: {},  // Tables: Each key is a table and its value is another object where each key is the primary key and value is the record / object that is stored in ram.
    changes: [], // Special table that records all changes made to the db. In this simple sample, we let it grow infinitly. In real world, we would have had a regular cleanup of old changes.
    uncommittedChanges: {}, // Map<clientID,Array<change>> Changes where partial=true buffered for being committed later on.
    revision: 0, // Current revision of the database.
    subscribers: [], // Subscribers to when database got changes. Used by server serverections to be able to push out changes to their clients as they occur.

    create: function (table, key, obj, clientIdentity) {
        // Create table if it doesnt exist:
        db.tables[table] = db.tables[table] || {};
        // Put the obj into to table
        db.tables[table][key] = obj;
        // Register the change:
        db.changes.push({
            rev: ++db.revision,
            source: clientIdentity,
            type: CREATE,
            table: table,
            key: key,
            obj: obj
        });
        db.trigger();
    },
    update: function (table, key, modifications, clientIdentity) {
        if (db.tables[table]) {
            var obj = db.tables[table][key];
            if (obj) {
                applyModifications(obj, modifications);
                db.changes.push({
                    rev: ++db.revision,
                    source: clientIdentity,
                    type: UPDATE,
                    table: table,
                    key: key,
                    mods: modifications
                });
                db.trigger();
            }
        }
    },
    'delete': function (table, key, clientIdentity) {
        if (db.tables[table]) {
            if (db.tables[table][key]) {
                delete db.tables[table][key];
                db.changes.push({
                    rev: ++db.revision,
                    source: clientIdentity,
                    type: DELETE,
                    table: table,
                    key: key,
                });
                db.trigger();
            }
        }
    },
    trigger: function () {
        if (!db.trigger.delayedHandle) {
            // Delay the trigger so that it's only called once per bunch of changes instead of being called for each single change.
            db.trigger.delayedHandle = setTimeout(function () {
                delete db.trigger.delayedHandle;
                db.subscribers.forEach(function (subscriber) {
                    try { subscriber(); } catch (e) { }
                });
            }, 0);
        }
    },
    subscribe: function (fn) {
        db.subscribers.push(fn);
    },
    unsubscribe: function (fn) {
        db.subscribers.splice(db.subscribers.indexOf(fn), 1);
    }
};

wss.on('connection', function(server) {
    console.log('test')
    function sendAnyChanges() {
        // Get all changes after syncedRevision that was not performed by the client we're talkin' to.
        var changes = db.changes.filter(function (change) { return change.rev > syncedRevision && change.source !== ws.clientIdentity; });
        // Compact changes so that multiple changes on same object is merged into a single change.
        var reducedSet = reduceChanges(changes, ws.clientIdentity);
        // Convert the reduced set into an array again.
        var reducedArray = Object.keys(reducedSet).map(function (key) { return reducedSet[key]; });
        // Notice the current revision of the database. We want to send it to client so it knows what to ask for next time.
        var currentRevision = db.revision;

        ws.sendText(JSON.stringify({
            type: "changes",
            changes: reducedArray,
            currentRevision: currentRevision,
            partial: false // Tell client that these are the only changes we are aware of. Since our mem DB is syncronous, we got all changes in one chunk.
        }));

        syncedRevision = currentRevision; // Make sure we only send revisions coming after this revision next time and not resend the above changes over and over.
    }

    ws.on('message', function incoming(message) {
        console.log('received: %s', message);
    })

    ws.on("text", function (message) {
        var request = JSON.parse(message);
        var type = request.type;
        if (type == "clientIdentity") {
            if (request.clientIdentity) {
                // Client has an identity that we have given earlier
                ws.clientIdentity = request.clientIdentity;
            } else {
                // Client requests an identity. Provide one.
                ws.clientIdentity = nextClientIdentity++;
                ws.sendText(JSON.stringify({
                    type: "clientIdentity",
                    clientIdentity: ws.clientIdentity
                }));
            }
        } else if (type == "subscribe") {
            // Client wants to subscribe to server changes happened or happening after given syncedRevision
            syncedRevision = request.syncedRevision || 0;
            // Send any changes we have currently:
            sendAnyChanges();
            // Start subscribing for additional changes:
            db.subscribe(sendAnyChanges);

        } else if (type == "changes") {
            // Client sends its changes to us.
            var requestId = request.requestId;
            try {
                if (!request.changes instanceof Array) {
                    throw "Property 'changes' must be provided and must be an array";
                }
                if (!("baseRevision" in request)) {
                    throw "Property 'baseRevision' missing";
                }
                // First, if sent change set is partial.
                if (request.partial) {
                    if (db.uncommittedChanges[ws.clientIdentity]) {
                        // Concat the changes to existing change set:
                        db.uncommittedChanges[ws.clientIdentity] = db.uncommittedChanges[ws.clientIdentity].concat(request.changes);
                    } else {
                        // Create the change set:
                        db.uncommittedChanges[ws.clientIdentity] = request.changes;
                    }
                } else {
                  if (db.uncommittedChanges[ws.clientIdentity]) {
                        request.changes = db.uncommittedChanges[ws.clientIdentity].concat(request.changes);
                        delete db.uncommittedChanges[ws.clientIdentity];
                    }

                    var baseRevision = request.baseRevision || 0;
                    var serverChanges = db.changes.filter(function (change) { return change.rev > baseRevision });
                    var reducedServerChangeSet = reduceChanges(serverChanges);
                    var resolved = resolveConflicts(request.changes, reducedServerChangeSet);

                    // Now apply the resolved changes:
                    resolved.forEach(function (change) {
                        switch (change.type) {
                            case CREATE:
                                db.create(change.table, change.key, change.obj, ws.clientIdentity);
                                break;
                            case UPDATE:
                                db.update(change.table, change.key, change.mods, ws.clientIdentity);
                                break;
                            case DELETE:
                                db.delete(change.table, change.key, ws.clientIdentity);
                                break;
                        }
                    });
                }

                ws.sendText(JSON.stringify({
                    type: "ack",
                    requestId: requestId,
                }));
            } catch (e) {
                ws.sendText(JSON.stringify({
                    type: "error",
                    requestId: requestId,
                    message: e.toString()
                }));
                ws.close();
            }
        }

    });

    ws.on("close", function () {
       db.unsubscribe(sendAnyChanges);
    });
});

function reduceChanges(changes) {
    // Converts an Array of change objects to a set of change objects based on its unique combination of (table ":" key).
    // If several changes were applied to the same object, the resulting set will only contain one change for that object.
    return changes.reduce(function (set, nextChange) {
        var id = nextChange.table + ":" + nextChange.key;
        var prevChange = set[id];
        if (!prevChange) {
            // This is the first change on this key. Add it unless it comes from the source that we are working against
            set[id] = nextChange;
        } else {
            // Merge the oldchange with the new change
            set[id] = (function () {
                switch (prevChange.type) {
                    case CREATE:
                        switch (nextChange.type) {
                            case CREATE: return nextChange; // Another CREATE replaces previous CREATE.
                            case UPDATE: return combineCreateAndUpdate(prevChange, nextChange); // Apply nextChange.mods into prevChange.obj
                            case DELETE: return nextChange;  // Object created and then deleted. If it wasnt for that we MUST handle resent changes, we would skip entire change here. But what if the CREATE was sent earlier, and then CREATE/DELETE at later stage? It would become a ghost object in DB. Therefore, we MUST keep the delete change! If object doesnt exist, it wont harm!
                        }
                        break;
                    case UPDATE:
                        switch (nextChange.type) {
                            case CREATE: return nextChange; // Another CREATE replaces previous update.
                            case UPDATE: return combineUpdateAndUpdate(prevChange, nextChange); // Add the additional modifications to existing modification set.
                            case DELETE: return nextChange;  // Only send the delete change. What was updated earlier is no longer of interest.
                        }
                        break;
                    case DELETE:
                        switch (nextChange.type) {
                            case CREATE: return nextChange; // A resurection occurred. Only create change is of interest.
                            case UPDATE: return prevChange; // Nothing to do. We cannot update an object that doesnt exist. Leave the delete change there.
                            case DELETE: return prevChange; // Still a delete change. Leave as is.
                        }
                        break;
                }
            })();
        }
        return set;
    }, {});
}

function resolveConflicts(clientChanges, serverChangeSet) {
    var resolved = [];
    clientChanges.forEach(function (clientChange) {
        var id = clientChange.table + ":" + clientChange.key;
        var serverChange = serverChangeSet[id];
        if (!serverChange) {
            // No server change on same object. Totally conflict free!
            resolved.push(clientChange);
        } else if (serverChange.type == UPDATE) {
            // Server change overlaps. Only if server change is not CREATE or DELETE, we should consider merging in the client change.
            switch (clientChange.type) {
                case CREATE:
                    // Server has updated an object with same key as client has recreated. Let the client recreation go through, but also apply server modifications.
                    applyModifications(clientChange.obj, serverChange.mods); // No need to clone clientChange.obj beofre applying modifications since noone else refers to clientChanges (it was retrieved from the server serverection in current request)
                    resolved.push(clientChange);
                    break;
                case UPDATE:
                    // Server and client has updated the same obejct. Just remove any overlapping keyPaths and only apply non-conflicting parts.
                    Object.keys(serverChange.mods).forEach(function (keyPath) {
                        // Remote this property from the client change
                        delete clientChange.mods[keyPath];
                        // Also, remote all changes to nestled objects under this keyPath from the client change:
                        Object.keys(clientChange.mods).forEach(function (clientKeyPath) {
                            if (clientKeyPath.indexOf(keyPath + '.') == 0) {
                                delete clientChange.mods[clientKeyPath];
                            }
                        });
                    });
                    // Did we delete all keyPaths in the modification set of the clientChange?
                    if (Object.keys(clientChange.mods).length > 0) {
                        // No, there were some still there. Let this wing-clipped change be applied:
                        resolved.push(clientChange);
                    }
                    break;
                case DELETE:
                    // Delete always win over update. Even client over a server
                    resolved.push(clientChange);
                    break;
            }
        }
    });
    return resolved;
}

function deepClone(obj) {
    return JSON.parse(JSON.stringify(obj));
}

function applyModifications(obj, modifications) {
    Object.keys(modifications).forEach(function (keyPath) {
        setByKeyPath(obj, keyPath, modifications[keyPath]);
    });
    return obj;
}

function combineCreateAndUpdate(prevChange, nextChange) {
    var clonedChange = deepClone(prevChange);// Clone object before modifying since the earlier change in db.changes[] would otherwise be altered.
    applyModifications(clonedChange.obj, nextChange.mods); // Apply modifications to existing object.
    return clonedChange;
}

function combineUpdateAndUpdate(prevChange, nextChange) {
    var clonedChange = deepClone(prevChange); // Clone object before modifying since the earlier change in db.changes[] would otherwise be altered.
    Object.keys(nextChange.mods).forEach(function (keyPath) {
        // If prev-change was changing a parent path of this keyPath, we must update the parent path rather than adding this keyPath
        var hadParentPath = false;
        Object.keys(prevChange.mods).filter(function (parentPath) { return keyPath.indexOf(parentPath + '.') === 0 }).forEach(function (parentPath) {
            setByKeyPath(clonedChange.mods[parentPath], keyPath.substr(parentPath.length + 1), nextChange.mods[keyPath]);
            hadParentPath = true;
        });
        if (!hadParentPath) {
            // Add or replace this keyPath and its new value
            clonedChange.mods[keyPath] = nextChange.mods[keyPath];
        }
       Object.keys(prevChange.mods).filter(function (subPath) { return subPath.indexOf(keyPath + '.') === 0 }).forEach(function (subPath) {
            delete clonedChange.mods[subPath];
        });
    });
    return clonedChange;
}

function setByKeyPath(obj, keyPath, value) {
    if (!obj || typeof keyPath !== 'string') return;
    var period = keyPath.indexOf('.');
    if (period !== -1) {
        var currentKeyPath = keyPath.substr(0, period);
        var remainingKeyPath = keyPath.substr(period + 1);
        if (remainingKeyPath === "")
            obj[currentKeyPath] = value;
        else {
            var innerObj = obj[currentKeyPath];
            if (!innerObj) innerObj = (obj[currentKeyPath] = {});
            setByKeyPath(innerObj, remainingKeyPath, value);
        }
    } else {
        obj[keyPath] = value;
    }
}
4

2 回答 2

1

该示例使用“nodejs-websocket”。在您的服务器代码中,您正在使用“ws”,这是另一个包。尝试按原样使用示例,但向其中添加 package.json。

于 2017-08-28T11:25:20.920 回答
0

我知道这是一个老问题,但是对于任何刚接触节点的人来说,如果您尝试运行WebSocketSyncServer.js 中提供的示例 Websocket 服务器,我就是这样做的:

  1. 使用名为 main.js 的空文件创建一个新目录
  2. 把 WebSocketSyncServer.js 也放在那里
  3. 创建一个 package.json
  4. npm install --save nodejs-websocket
  5. 将以下代码复制到 main.js 并运行它node main.js
'use strict';

const SyncServer = require('./WebSocketSyncServer');
const server = new SyncServer(8080);
server.start();
于 2019-08-05T15:39:07.450 回答