I think that it is normal to use a single Socket.IO connection in the browser since I have a single Socket.IO server which handles both channels of information that I include in the messages.
The code below does not work well, it is not logically correct. I am a beginner when it comes to complex asynchronous programming.
I don't get errors, but the behavior of the code is not good enough.
I have this code that I tried but it does not receive messages from the server always when it should and I am sure it has some bugs. The following is a part of my code, modified a little bit so that I can post it here.
The types
export interface INotificationsChannelInitParams {
userIds: string[];
}
export interface ITimersChannelInitParams {
tids: string[];
}
export interface IGetNotificationsQueryParams {
authToken: string | null;
userId: string | null;
limit?: number;
}
export interface IGetTimersQueryParams {
tids: string[];
}
The store
let lastConnectHandler: ReturnType<typeof connectHandler> | null = null;
export const getFollowedThingIds = () => {
return uniq(followedThingIds); // `uniq` is from the `lodash` library
};
const connectHandler = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
const fn = () => {
if (initAs === "timers") {
const argument = arg as ITimersChannelInitParams;
followedThingIds = argument.tids.concat(followedThingIds);
argument.tids = uniq(followedThingIds);
myIo!.emit("configure timers channel", argument);
timersAreSetUp = true;
}
if (initAs === "notifications") {
const argument = arg as INotificationsChannelInitParams;
followedUserIds = argument.userIds.concat(followedUserIds);
argument.userIds = followedUserIds;
myIo!.emit("configure notifications channel", argument);
notificationsAreSetUp = true;
}
};
lastConnectHandler = fn;
return fn;
};
let myIo: Socket | null = null;
let timersAreSetUp = false;
let notificationsAreSetUp = false;
let followedUserIds: string[] = [];
let followedThingIds: string[] = [];
export const getWebSocketConnection = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
if (myIo === null) {
// TODO: move this replacement set to its own function next to toAbsoluteUrl
const wsUrl = toAbsoluteUrl("/").replace(/\:(\d)+(\/)+$/g, ":8080/");
myIo = io(wsUrl, { autoConnect: false, port: "8080" });
myIo.onAny((event, ...args) => {
// console.log("S.IO", event, args);
});
// TODO: use this somewhere so it is not dead code
let invalidAuthToken = false;
myIo.on("connect_error", err => {
if (err.message === "invalid auth token") {
invalidAuthToken = true;
}
});
}
console.log('5.16.', store.getState().auth.authToken);
myIo.auth = { authToken: store.getState().auth.authToken };
myIo.on("connect", connectHandler(initAs, arg));
myIo.connect();
return myIo;
};
export const resetFollowedUserIds = (userIds: string[]) => {
// followedUserIds = userIds;
// const argument = { userIds: followedUserIds } as INotificationsChannelInitParams;
// myIo!.emit("configure notifications channel", argument);
// notificationsAreSetUp = true;
};
// TODO: use this function so that the followed things (for timers) and
// users (for notifications) don't add up
const closeWebSocketConnection = (uninitAs: "timers" | "notifications") => {
if (myIo === null) {
return;
}
if (uninitAs === "timers") {
const argument = { tids: [] } as ITimersChannelInitParams;
myIo.emit("configure timers channel", argument);
timersAreSetUp = false;
}
if (uninitAs === "notifications") {
const argument = { userIds: [] } as INotificationsChannelInitParams;
myIo.emit("configure notifications channel", argument);
notificationsAreSetUp = false;
}
// if (!timersAreSetUp && !notificationsAreSetUp) {
// myIo.off("connect_error");
// myIo.disconnect();
// myIo = null;
// }
};
The RTKQ's
getTimers: build.query<
{ [index: string]: number },
IGetTimersQueryParams
>({
query: (params: IGetTimersQueryParams) => ({
url: `timers?things=` + params.tids.join(","),
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const myIo = getWebSocketConnection("timers", clone(arg));
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: number[]) => {
updateCachedData(draft => {
getFollowedThingIds().forEach((x: string, i: number) => {
draft[x] = eventData[i];
});
// while (draft.length > 0) {
// draft.pop();
// }
// eventData.forEach((x: number, idx: number) => {
// // TODO: cleanup dead timers (<= 0, maybe use a call like
// // ws.send(JSON.stringify(arg)))
// draft.push(x);
// });
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("timers", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("timers");
myIo.off("timers", listener);
myIo.off("connect", lastConnectHandler as any);
}
}),
getNotifications: build.query<
IDbThing[],
IGetNotificationsQueryParams
>({
query: (params: IGetNotificationsQueryParams) => ({
url: `notifications?authToken=${params.authToken || ""}&limit=${
typeof params.limit === "number" ? params.limit : 5
}&userId=${params.userId || ""}`,
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// TODO: notifications for not-logged-in users?
if (arg.userId === null) {
return;
}
// TODO: here keep the previous user ID set up in the notifications
// channel, besides the new one, and make sure each notification
// returned by the WS server tells the user to which it should be
// served
const myIo = getWebSocketConnection(
"notifications",
clone({
userIds: [arg.userId]
})
);
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: IDbNotification) => {
// if receiving a user notification
updateCachedData(draft => {
draft.unshift(eventData);
if (draft.length > 5) {
draft.pop();
}
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("notifications", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("notifications");
myIo.off("notifications", listener);
myIo.off("connect", lastConnectHandler as any);
}
})
})
Example usage of RTKQ's
const getNotificationsQueryParams = useMemo(
() => ({
authToken: user?.authToken!,
userId: params.userId,
limit: 0,
}),
[user, params.userId]
);
const {
isLoading: notificationsIsLoading,
isError: notificationsIsError,
data: shownNotifications
} = useGetNotificationsQuery(getNotificationsQueryParams, {
refetchOnMountOrArgChange: true
});
Update 1
Sometimes a second connection from the browser is created as I see in the Network tab in the DevTools.
I have checked and the issue seems to be somehow on the Socket.IO server: the client sends to the server "configure timers channel" message but it receives nothing. And the server sends to the client an array of numbers but... to the wrong client it seems, although I have only one instance of the app open in one single tab!
Update 2
I receive messages like this mostly at the start of a connection from the client. What are they?
Update 3
The problem is on the server.
If I replace this line:
socket.to(socket.sessionUserId!).emit("timers", rv);
with this line:
io.of('/').emit("timers", rv);
it sends the result of the computation to all the clients, including the one I can see, of which I think it is the only client. If not, the message does not reach the client I am testing with. I have no idea why, because I checked socket.rooms
and the socket is in the room identified by the socket.sessionUserId
and I call this for every socket on the server:
io.on("connection", (normalSocket: Socket) => {
// a user connected
const socket = normalSocket as SocketData;
// join the room named by the user ID that started the socket
socket.join(socket.sessionUserId!);
...