0

I think that it is normal to use a single Socket.IO connection in the browser since I have a single Socket.IO server which handles both channels of information that I include in the messages.

The code below does not work well, it is not logically correct. I am a beginner when it comes to complex asynchronous programming.

I don't get errors, but the behavior of the code is not good enough.

I have this code that I tried but it does not receive messages from the server always when it should and I am sure it has some bugs. The following is a part of my code, modified a little bit so that I can post it here.

The types

export interface INotificationsChannelInitParams {
  userIds: string[];
}

export interface ITimersChannelInitParams {
  tids: string[];
}

export interface IGetNotificationsQueryParams {
  authToken: string | null;
  userId: string | null;
  limit?: number;
}

export interface IGetTimersQueryParams {
  tids: string[];
}

The store

let lastConnectHandler: ReturnType<typeof connectHandler> | null = null;

export const getFollowedThingIds = () => {
  return uniq(followedThingIds); // `uniq` is from the `lodash` library
};

const connectHandler = (
  initAs: "timers" | "notifications",
  arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
  const fn = () => {
    if (initAs === "timers") {
      const argument = arg as ITimersChannelInitParams;
      followedThingIds = argument.tids.concat(followedThingIds);
      argument.tids = uniq(followedThingIds);
      myIo!.emit("configure timers channel", argument);
      timersAreSetUp = true;
    }

    if (initAs === "notifications") {
      const argument = arg as INotificationsChannelInitParams;
      followedUserIds = argument.userIds.concat(followedUserIds);
      argument.userIds = followedUserIds;
      myIo!.emit("configure notifications channel", argument);
      notificationsAreSetUp = true;
    }
  };

  lastConnectHandler = fn;

  return fn;
};

let myIo: Socket | null = null;
let timersAreSetUp = false;
let notificationsAreSetUp = false;
let followedUserIds: string[] = [];
let followedThingIds: string[] = [];
export const getWebSocketConnection = (
  initAs: "timers" | "notifications",
  arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
  if (myIo === null) {
    // TODO: move this replacement set to its own function next to toAbsoluteUrl
    const wsUrl = toAbsoluteUrl("/").replace(/\:(\d)+(\/)+$/g, ":8080/");

    myIo = io(wsUrl, { autoConnect: false, port: "8080" });

    myIo.onAny((event, ...args) => {
      // console.log("S.IO", event, args);
    });

    // TODO: use this somewhere so it is not dead code
    let invalidAuthToken = false;

    myIo.on("connect_error", err => {
      if (err.message === "invalid auth token") {
        invalidAuthToken = true;
      }
    });
  }

  console.log('5.16.', store.getState().auth.authToken);
  myIo.auth = { authToken: store.getState().auth.authToken };

  myIo.on("connect", connectHandler(initAs, arg));

  myIo.connect();

  return myIo;
};

export const resetFollowedUserIds = (userIds: string[]) => {
  // followedUserIds = userIds;
  // const argument = { userIds: followedUserIds } as INotificationsChannelInitParams;
  // myIo!.emit("configure notifications channel", argument);
  // notificationsAreSetUp = true;
};

// TODO: use this function so that the followed things (for timers) and
// users (for notifications) don't add up
const closeWebSocketConnection = (uninitAs: "timers" | "notifications") => {
  if (myIo === null) {
    return;
  }

  if (uninitAs === "timers") {
    const argument = { tids: [] } as ITimersChannelInitParams;
    myIo.emit("configure timers channel", argument);
    timersAreSetUp = false;
  }

  if (uninitAs === "notifications") {
    const argument = { userIds: [] } as INotificationsChannelInitParams;
    myIo.emit("configure notifications channel", argument);
    notificationsAreSetUp = false;
  }

  // if (!timersAreSetUp && !notificationsAreSetUp) {
  //   myIo.off("connect_error");
  //   myIo.disconnect();
  //   myIo = null;
  // }
};

The RTKQ's

getTimers: build.query<
    { [index: string]: number },
    IGetTimersQueryParams
  >({
    query: (params: IGetTimersQueryParams) => ({
      url: `timers?things=` + params.tids.join(","),
      method: "GET"
    }),
    async onCacheEntryAdded(
      arg,
      { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
    ) {
      const myIo = getWebSocketConnection("timers", clone(arg));

      // when data is received from the socket connection to the server,
      // if it is a valid JSON object, update our query result with the
      // received message
      const listener = (eventData: number[]) => {
        updateCachedData(draft => {
          getFollowedThingIds().forEach((x: string, i: number) => {
            draft[x] = eventData[i];
          });

          // while (draft.length > 0) {
          //   draft.pop();
          // }
          // eventData.forEach((x: number, idx: number) => {
          //   // TODO: cleanup dead timers (<= 0, maybe use a call like
          //   // ws.send(JSON.stringify(arg)))
          //   draft.push(x);
          // });
        });
      };

      try {
        // wait for the initial query to resolve before proceeding
        await cacheDataLoaded;

        myIo.on("timers", listener);
      } catch {
        // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
        // in which case `cacheDataLoaded` will throw
      }

      // cacheEntryRemoved will resolve when the cache subscription is no longer active
      await cacheEntryRemoved;

      // perform cleanup steps once the `cacheEntryRemoved` promise resolves
      // closeWebSocketConnection("timers");

      myIo.off("timers", listener);
      myIo.off("connect", lastConnectHandler as any);
    }
  }),
  getNotifications: build.query<
    IDbThing[],
    IGetNotificationsQueryParams
  >({
    query: (params: IGetNotificationsQueryParams) => ({
      url: `notifications?authToken=${params.authToken || ""}&limit=${
        typeof params.limit === "number" ? params.limit : 5
      }&userId=${params.userId || ""}`,
      method: "GET"
    }),
    async onCacheEntryAdded(
      arg,
      { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
    ) {
      // TODO: notifications for not-logged-in users?
      if (arg.userId === null) {
        return;
      }

      // TODO: here keep the previous user ID set up in the notifications
      // channel, besides the new one, and make sure each notification
      // returned by the WS server tells the user to which it should be
      // served
      const myIo = getWebSocketConnection(
        "notifications",
        clone({
          userIds: [arg.userId]
        })
      );

      // when data is received from the socket connection to the server,
      // if it is a valid JSON object, update our query result with the
      // received message
      const listener = (eventData: IDbNotification) => {
        // if receiving a user notification

        updateCachedData(draft => {
          draft.unshift(eventData);
          if (draft.length > 5) {
            draft.pop();
          }
        });
      };

      try {
        // wait for the initial query to resolve before proceeding
        await cacheDataLoaded;

        myIo.on("notifications", listener);
      } catch {
        // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
        // in which case `cacheDataLoaded` will throw
      }

      // cacheEntryRemoved will resolve when the cache subscription is no longer active
      await cacheEntryRemoved;

      // perform cleanup steps once the `cacheEntryRemoved` promise resolves
      // closeWebSocketConnection("notifications");
      myIo.off("notifications", listener);
      myIo.off("connect", lastConnectHandler as any);
    }
  })
})

Example usage of RTKQ's

const getNotificationsQueryParams = useMemo(
  () => ({
    authToken: user?.authToken!,
    userId: params.userId,
    limit: 0,
  }),
  [user, params.userId]
);

const {
  isLoading: notificationsIsLoading,
  isError: notificationsIsError,
  data: shownNotifications
} = useGetNotificationsQuery(getNotificationsQueryParams, {
  refetchOnMountOrArgChange: true
});

Update 1

Sometimes a second connection from the browser is created as I see in the Network tab in the DevTools.

I have checked and the issue seems to be somehow on the Socket.IO server: the client sends to the server "configure timers channel" message but it receives nothing. And the server sends to the client an array of numbers but... to the wrong client it seems, although I have only one instance of the app open in one single tab!

Update 2

I receive messages like this mostly at the start of a connection from the client. What are they?

screenshot

Update 3

The problem is on the server.

If I replace this line:

socket.to(socket.sessionUserId!).emit("timers", rv);

with this line:

io.of('/').emit("timers", rv);

it sends the result of the computation to all the clients, including the one I can see, of which I think it is the only client. If not, the message does not reach the client I am testing with. I have no idea why, because I checked socket.rooms and the socket is in the room identified by the socket.sessionUserId and I call this for every socket on the server:

io.on("connection", (normalSocket: Socket) => {
  // a user connected

  const socket = normalSocket as SocketData;

  // join the room named by the user ID that started the socket
  socket.join(socket.sessionUserId!);

...
4

1 回答 1

0

Instead of this:

socket.to(socket.sessionUserId!).emit("timers", rv);

or this:

io.of('/').emit("timers", rv);

I only have to use this:

io.to(socket.sessionUserId!).emit("timers", rv);

Relevant documentation is here.

于 2021-08-06T13:33:45.490 回答