1

我们已经使用 React 构建了一个前端,并使用 Django Rest 框架和通道构建了一个后端。我们使用 Heroku Redis 作为我们的 Redis 提供者。我们的用户通过ReconnectingWebSocket.

我们正在使用 Python 3.6 和 Channels 2.4

问题是我们的 API 调用试图将信息传递给套接字,但它们并不总是将信息传递给消费者。我通过打印记录了调用的步骤,打印了channel_name它即将尝试发送它并确认它是在连接时返回给用户的内容,但是消费者中的打印没有被调用,这意味着消息永远不会被发送给用户。

如果我在用户连接到套接字的情况下将 dynos 的数量增加到或多或少 1-1,那么它似乎可以解决问题(或者至少使它更可靠)。据我了解,1 dyno 应该能够处理许多套接字连接。我的消费者没有收到信号是否有原因?是否有理由增加测功机的数量来解决问题?

在连接时,我让用户加入一个名为“u_{their id}”的组,以允许潜在地将信号发送到以同一用户身份登录的多台计算机。我曾尝试通过他们channel_name直接和通过该组发送消息,当消息没有通过时,似乎也没有通过。prints验证是否正确,channel_names消费者仍然没有收到消息。似乎没有发生任何错误。它可能不起作用,然后我将刷新收件人并且它会起作用,然后我将再次刷新收件人并且它又恢复为不起作用。

套接字连接肯定是活动的——我在前端创建了一个简单的函数来 ping 套接字,当我这样做时(即使消费者没有从 API 调用中获得信号),它也会响应。

我还注意到,如果我重新启动我的测功机,当它们加载并且套接字重新连接时,第一个用户有信号在短时间内通过 API 调用工作,然后他们开始不再通过。此外,如果我有一段时间不使用套接字然后刷新它们似乎也开始短暂地再次工作。

档案

web: daphne doctalk.asgi:application --port $PORT --bind 0.0.0.0

消费者.py

import json
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async

from channels.generic.websocket import AsyncWebsocketConsumer
from messages.models import Thread
from profile.models import OnlineStatus, DailyOnlineUserActivity
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from .exceptions import ClientError
import datetime
from django.utils import timezone

class HeaderConsumer(AsyncWebsocketConsumer):
    async def connect(self):   
        await self.accept()
        await self.send("request_for_token")


    async def continue_connect(self):
        print(self.channel_name)
        print(self.channel_layer)
        await self.send(json.dumps({'your_channel_name': self.channel_name}))

        await self.get_user_from_token(self.scope['token'])

        await self.channel_layer.group_send(
            "online_users",
            {
                "type": "new_user_online",
                "user": self.user,
                "channel_layer": str(self.channel_layer),
                "channel_name": self.channel_name,
            }
        )

        await self.channel_layer.group_add(
            "online_users",
            self.channel_name,
        )

        print("adding to personal group u_%d" % self.user['id'])
        await self.channel_layer.group_add(
            "u_%d" % self.user['id'],
            self.channel_name,
        )


        self.message_threads = set()

        self.message_threads = await self.get_message_ids()

        for thread in self.message_threads:
            await self.monitor_thread(thread)

        self.doa = await self.check_for_or_establish_dailyonlineactivity()
        self.online_status = await self.establish_onlinestatus()
        await self.add_to_online_status_list()

        self.user_id_list = await self.get_online_user_list()
        await self.send_online_user_list()

    async def disconnect(self, code):
        # Leave all the rooms we are still in
        if hasattr(self, 'user'):
            await self.remove_from_dailyonlineactivity()

            try:
                await self.channel_layer.group_discard(
                    "u_%d" % self.user['id'],
                    self.channel_name,
                )
            except Exception as e:
                print("issue with self channel")
                print(e)

            try:
                await self.channel_layer.group_send(
                    "online_users",
                    {
                        "type": "user_went_offline",
                        "message": self.user['id'],
                    }
                )

            except Exception as e:
                print("issue with online_users")
                print(e)

            await self.channel_layer.group_discard(
                "online_users",
                self.channel_name,
            )
            try:
                for thread_id in list(self.message_threads):
                    print("leaving " + str(thread_id))
                    try:
                        self.message_threads.discard(thread_id)
                        await self.channel_layer.group_discard(
                            "m_%d" % thread_id,
                            self.channel_name,
                        )
                    except ClientError:
                        pass
            except Exception as e:
                print("issue with threads")
                print(e)

    async def receive(self, text_data):
        print(text_data)
        text_data_json = json.loads(text_data)
        if 'token' in text_data_json:
            self.scope['token'] = text_data_json['token']
            await self.continue_connect()

        #self.send(text_data=json.dumps({
        #    'message': message
        #}))

    async def new_message(self, event):
        # Send a message down to the client
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "thread": event['thread'],
                "message": event["message"],
            },
        ))

    async def user_went_offline(self, event):
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def send_call_ring(self, event):
        print("SENDING CALL RING")
        print(event["message"])
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def rejoin_call(self, event):
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def popup_notification(self, event):
        print("sending popup_notification")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def new_call_participant(self, event):
        print("new_call_participant received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def new_participants_invited(self, event):
        print("new_participants_invited received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def share_document_via_videocall(self, event):    
        print("share_document received")
        print(event)
        print(self.channel_name)
        print(self.user['id'])
        
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_share_link(self, event):   

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_video_hand_up(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_video_address_hand_up(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def you_are_dominant_speaker(self, event):

        # Send a message down to the client
        print("SENDING DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def you_are_no_longer_dominant_speaker(self, event):

        print("SENDING NO LONGER DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_screenshare(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_reaction(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def video_call_thread(self, event):

        print("sending video call thread")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def video_call_chat_message(self, event):

        print("sending video call chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_chat_message(self, event):

        print("sending event chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def to_next_agenda_item(self, event):

        print("sending video call chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def mute_all_event_participants(self, event):

        print("sending mute all participants")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_started(self, event):

        print("event started consumer")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_ended(self, event):

        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))
        
    async def video_call_reaction(self, event):

        print("sending video call reaction")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def new_user_online(self, event):

        print("user_online received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["user"],
                "channel_layer": event["channel_layer"],
                "channel_name": event["channel_name"],
            },
        ))

    @database_sync_to_async
    def get_message_ids(self):
        return set(Thread.objects.filter(participants__id=self.user['id'], subject="").values_list('id', flat=True))

    async def monitor_thread(self, thread_id):
        print("monitoring thread %d" % thread_id)
        print("on channel %s" % self.channel_name)
        await self.channel_layer.group_add(
            "m_%d" % thread_id,
            self.channel_name,
        )

    @database_sync_to_async
    def get_user_from_token(self, t):
        try:
            print("trying token" + t)
            token = Token.objects.get(key=t)
            self.user = token.user.get_profile.json()
        except Token.DoesNotExist:
            print("failed")
            self.user = AnonymousUser()

    @database_sync_to_async
    def check_for_or_establish_dailyonlineactivity(self):
        doa, created = DailyOnlineUserActivity.objects.get_or_create(date=datetime.date.today())
        if created:
            print("created DOA %d" %doa.id)
        else:
            print("found existing DOA %d" %doa.id)
        return doa

    @database_sync_to_async
    def establish_onlinestatus(self):
        old_os = OnlineStatus.objects.filter(user_id=self.user['id'], online_to=None)
        if old_os.exists():
            for os in old_os:
                print("found unclosed OS %d" % old_os[0].id)
                os.online_to = timezone.now()
                os.save()
        new_os = OnlineStatus(
            user_id=self.user['id'],
            channel_name=self.channel_name
        )
        new_os.save()
        return new_os

    @database_sync_to_async
    def add_to_online_status_list(self):
        self.doa.currently_active_users.add(self.user['id'])
        self.doa.all_daily_users.add(self.user['id'])
        self.doa.online_log.add(self.online_status)
        self.doa.save()

    @database_sync_to_async
    def remove_from_dailyonlineactivity(self):
        if hasattr(self, 'doa') and self.doa is not None:
            self.doa.currently_active_users.remove(self.user['id'])
        if hasattr(self, 'onine_status') and self.online_status is not None:
            self.online_status.online_to = timezone.now()
            self.online_status.save()

    @database_sync_to_async
    def get_online_user_list(self):   
        user_id_list = list(self.doa.currently_active_users.all().values_list('id', flat=True))
        user_id_list.remove(self.user['id'])
        return user_id_list

    async def send_online_user_list(self):
        print("sending online_users")
        await self.send(text_data=json.dumps(
            {
                "type": "online_users",
                "message": self.user_id_list,
            },
        ))

    async def participant_ignored(self, event):
        print("irgnored call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def participant_left(self, event):
        print("left call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def participant_joined(self, event):
        print("left call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def video_screenshare(self, event):

        print("sending screenshare")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

通过将 Profile 添加到 VideoRoom 触发的 django 信号:

@receiver(m2m_changed, sender=VideoRoom.invitees.through)
def invitee_added(sender, **kwargs):
    instance = kwargs.pop('instance', None)
    action = kwargs.pop('action', None)
    pk = kwargs.pop('pk_set', None)

    if action == 'post_add':    

        if len(pk) > 0:
            user = Profile.objects.get(id=list(pk)[0])
            if instance.initiator.id == user.id:
                return

            identity = "u_%d" % user.id

            # Create access token with credentials
            token = AccessToken(settings.TWILIO_ACCOUNT_SID, settings.TWILIO_API_KEY, settings.TWILIO_API_SECRET,
                                identity=identity, ttl=86399)

            # Create a Video grant and add to token
            video_grant = VideoGrant(room=instance.room_name)
            token.add_grant(video_grant)

            invitee_access_token = VideoAccessToken(user=user, token=token.to_jwt())
            invitee_access_token.save()

            instance.invitee_access_tokens.add(invitee_access_token)

            channel_layer = get_channel_layer()
            print(channel_layer)

            profiles = {"u_%d" % instance.initiator.id: instance.initiator.json()}

            for u in instance.current_participants.all():
                profiles["u_%d" % u.id] = u.json()
            print("instance.type")
            print(instance.type)
            if instance.type != 'event':
                print("sending to existing users")
                for key, value in profiles.items():
                    if value['id'] != user.id:
                        async_to_sync(channel_layer.group_send)(
                            key,
                            {'type': 'new_call_participant',
                             'message': {
                                 'key': "u_%d" % user.id,
                                 'value': user.json()
                             }
                             }
                        )

                ons = OnlineStatus.objects.get(user=user, online_to=None)
                print("in signal, sending to %s on channel %s" % (user.full_name, ons.channel_name))

                async_to_sync(channel_layer.send)(
                    ons.channel_name,
                    {'type': 'send_call_ring',
                     'message': {
                         'id': instance.id,
                         'room_name': instance.room_name,
                         'identity': "u_%d" % user.id,
                         'profiles': profiles,
                         'token': invitee_access_token.token.decode(),
                         'answered': False,
                         'initiated': False,
                         'caller': instance.initiator.json()
                     }
                     }
                )

在不成功的套接字信号期间记录:

2021-03-11T15:16:14.489596+00:00 app[web.1]: pk
2021-03-11T15:16:14.489655+00:00 app[web.1]: {113}
2021-03-11T15:16:14.518051+00:00 app[web.1]: pk
2021-03-11T15:16:14.518058+00:00 app[web.1]: {68}
2021-03-11T15:16:14.786357+00:00 app[web.1]: sending to existing users
2021-03-11T15:16:14.786377+00:00 app[web.1]: u_113
2021-03-11T15:16:14.911441+00:00 app[web.1]: u_68
2021-03-11T15:16:14.915900+00:00 app[web.1]: in signal, sending to John Doe on channel u_68
2021-03-11T15:16:15.228644+00:00 app[web.1]: 10.63.249.212:12999 - - [11/Mar/2021:10:16:15] "POST /api/start-video-chat/" 200 3523
2021-03-11T15:16:15.231562+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=7ec75a21-c6bd-452b-9517-cd500064d7ee fwd="12.34.56.78" dyno=web.1 connect=3ms service=955ms status=200 bytes=3714 protocol=http

通话成功:

2021-03-11T15:20:50.253243+00:00 app[web.4]: pk
2021-03-11T15:20:50.253248+00:00 app[web.4]: {113}
2021-03-11T15:20:50.280925+00:00 app[web.4]: pk
2021-03-11T15:20:50.280926+00:00 app[web.4]: {68}
2021-03-11T15:20:50.614504+00:00 app[web.4]: sending to existing users
2021-03-11T15:20:50.614527+00:00 app[web.4]: u_113
2021-03-11T15:20:50.713880+00:00 app[web.4]: u_68
2021-03-11T15:20:50.718141+00:00 app[web.4]: in signal, sending to John Doe on channel u_68
2021-03-11T15:20:50.799546+00:00 app[web.2]: CALLING
2021-03-11T15:20:50.801670+00:00 app[web.2]: {'type': 'send_call_ring', 'message': "some payload data"}
2021-03-11T15:20:50.965602+00:00 app[web.4]: 10.11.225.205:25635 - - [11/Mar/2021:10:20:50] "POST /api/start-video-chat/" 200 3533
2021-03-11T15:20:50.964378+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=2da9918b-b587-4db9-a3c2-9d6dfd55ef42 fwd="12.34.56.78" dyno=web.4 connect=1ms service=888ms status=200 bytes=3724 protocol=http
4

1 回答 1

0

问题最终是 Redis。我从 channels-redis 转换为 channels-rabbitmq,我所有的问题都消失了。我不知道是我的 Redis 提供程序还是通道-redis,但只需更改后端即可解决所有问题。

于 2021-03-18T20:10:26.277 回答