0

我正在尝试使用使用 cogs 的单个 python 脚本运行两个不同的 Discord Bot。但是当我尝试运行第二个机器人时,它会抛出一个 ImportError,即使我没有使用那个特定的库。没有反垃圾邮件机器人,反应角色机器人可以正常工作。这是我的代码。仅供参考,我在虚拟环境中工作。

主文件

if __name__ == "__main__":
    try:
        reaction_role_bot = commands.Bot(command_prefix=config["reaction_role_bot"]["bot_prefix"], intents=discord.Intents.all())
        reaction_slash = SlashCommand(reaction_role_bot, sync_commands=True)
        reaction_role_bot.load_extension(f"cogs.{str(os.path.basename('cogs/reaction_roles.py')[:-3])}")
        
        anti_spam_bot = commands.Bot(command_prefix=config["anti_spam_bot"]["bot_prefix"], intents=discord.Intents.default())
        spam_slash = SlashCommand(anti_spam_bot, sync_commands=True)   
        anti_spam_bot.load_extension(f"cogs.{str(os.path.basename('cogs/anti_spam.py')[:-3])}")
        
        event_loop = asyncio.get_event_loop()
        event_loop.create_task(reaction_role_bot.run(config["reaction_role_bot"]["token"]))
        event_loop.create_task(anti_spam_bot.run(config["anti_spam_bot"]["token"]))
        event_loop.run_forever()
        
    except Exception as e:
        print(e)

反垃圾邮件.py

import platform
import os

import discord
from discord.ext import commands

from antispam import AntiSpamHandler
from antispam.plugins import AntiSpamTracker, Options

class AntiSpamBot(commands.Cog):
    def __init__(self, client):
        self.client = client
        
        # Initialize the AntiSpamHandler
        self.client.handler = AntiSpamHandler(self.client, options=Options(no_punish=True))
        # 3 Being how many 'punishment requests' before is_spamming returns True
        self.client.tracker = AntiSpamTracker(self.client.handler, 3) 
        self.client.handler.register_extension(self.client.tracker)
        
    @commands.Cog.listener()
    async def on_ready(self):
        print("---------------------------------")
        print(f"Logged in as {str(self.client.user)}")
        print(f"Discord.py API version: {discord.__version__}")
        print(f"Python version: {platform.python_version()}")
        print(f"Running on: {platform.system()} {platform.release()} ({os.name})")
        await self.client.change_presence(status=discord.Status.idle, activity=discord.Game(name="Head of Security"))
        print("---------------------------------\n")
        
    # The code in this event is executed every time a valid commands catches an error
    @commands.Cog.listener()
    async def on_command_error(context, error):
        raise error
    
    @commands.Cog.listener()
    async def on_message(self, message):
        await self.client.handler.propagate(message)
        
        if self.client.tracker.is_spamming(message):
            await message.delete()
            await message.channel.send(f"{message.author.mention} has been automatically kicked for spamming.")
            await message.author.kick()
        
        await self.client.process_commands(message)
    
        
def setup(client):
    client.add_cog(AntiSpamBot(client))

错误

Extension 'cogs.anti_spam' raised an error: ImportError: cannot import name 'AsyncMock' from 'unittest.mock' (/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/unittest/mock.py)

我没有使用齿轮的经验,这让我有点困惑。任何形式的帮助都会帮助我解决这个问题!提前致谢!

4

1 回答 1

0

我不认为这是一个 cog 注册问题。我相信这是您的 cog 文件中某些依赖项的导入错误。我搜索了您的错误并发现了类似的内容,我建议您在此处查看以获取更多信息。

作为一个笼统的声明,我会仔细检查您是否安装了 mock,并且您正在将它安装在您认为正在安装它的 Python 版本上。如果您安装了多个 python 版本,它可能会变得很不稳定。

另外,在不相关的说明中:最好避免在一个 python 文件中运行多个机器人实例,但我可以帮助您以最好的方式做到这一点。

对于初学者,您必须意识到这Client.run是对几个更底层概念的抽象。

Client.login哪些登录客户端,然后Client.connect哪些实际运行处理。这些是协程。

asyncio提供将事物放入事件循环的能力,以便它在有时间的时候工作。

像这样的东西,例如

loop = asyncio.get_event_loop()

async def foo():
  await asyncio.sleep(10)
  loop.close()

loop.create_task(foo())
loop.run_forever()

如果我们想等待另一个协程发生某些事情,asyncio 也通过 asyncio.Event 的同步方式为我们提供了这个功能。您可以将其视为您正在等待的布尔值:

e = asyncio.Event()
loop = asyncio.get_event_loop()

async def foo():
  await e.wait()
  print('we are done waiting...')
  loop.stop()

async def bar():
  await asyncio.sleep(20)
  e.set()

loop.create_task(bar())
loop.create_task(foo())
loop.run_forever() # foo will stop this event loop when 'e' is set to true
loop.close()

使用这个概念,我们可以将其应用于不和谐机器人本身。

import asyncio
import discord
from collections import namedtuple

# First, we must attach an event signalling when the bot has been
# closed to the client itself so we know when to fully close the event loop.

Entry = namedtuple('Entry', 'client event')
entries = [
  Entry(client=discord.Client(), event=asyncio.Event()),
  Entry(client=discord.Client(), event=asyncio.Event())
]

# Then, we should login to all our clients and wrap the connect call
# so it knows when to do the actual full closure

loop = asyncio.get_event_loop()

async def login():
  for e in entries:
    await e.client.login()

async def wrapped_connect(entry):
  try:
    await entry.client.connect()
  except Exception as e:
    await entry.client.close()
    print('We got an exception: ', e.__class__.__name__, e)
    entry.event.set()

# actually check if we should close the event loop:
async def check_close():
  futures = [e.event.wait() for e in entries]
  await asyncio.wait(futures)

# here is when we actually login
loop.run_until_complete(login())

# now we connect to every client
for entry in entries:
  loop.create_task(wrapped_connect(entry))

# now we're waiting for all the clients to close
loop.run_until_complete(check_close())

# finally, we close the event loop
loop.close()


于 2021-11-06T02:10:00.093 回答