0

我刚刚从pyTelegramBotAPIwith重写了我的电报机器人python-telegram-bot。有一个想法是有一个公开可用的监控 url,我们可以偶尔用一些应用程序 ping 一次,以查看机器人是否仍在运行(在内部,它会测试机器人的某些功能,如数据库等)。问题是,是否可以使用嵌入式创建这样的请求点HTTPServer.HTTPServer?到目前为止,我找不到这样做的方法。如果我重用通用example.com/botTOKEN方法,我需要注意 json 有效负载,并且在失败的情况下我无法发回 HTTP 错误响应代码。

谢谢你。

Update1:​​所以,我遵循了@Marat 提供的代码片段。这就是我获取处理程序对象的方式:

# since the webhook server is started in an extra thread, it's not available immediately
while updater.httpd is None:
    pass
handler = updater.httpd.RequestHandlerClass
4

1 回答 1

1

是的你可以。我希望这个例子会有所帮助:

import SimpleHTTPServer
import SocketServer


class myServer(SimpleHTTPServer.SimpleHTTPRequestHandler):

    def do_GET(self):
        """Serve a GET request."""
        # do something here
        self.finish(
            "Hello world! Request path: " + self.path
        )

    def finish(self, value, status=200, ctype="text/html"):
        try:
            self.send_response(status)
            self.send_header("Content-type", ctype)
            self.send_header("Content-Length", str(len(value)))
            self.end_headers()
            self.wfile.write(str(value))
        finally:
            self.wfile.close()


httpd = SocketServer.TCPServer(("", 80), myServer)

httpd.serve_forever()

有关更多信息,请查看源代码SimpleHTTPRequestHandler

UPD: WebhookHandler代码可能是一个更好的例子

如果你想重用现有的实例,你可以做猴子补丁:

# wrapper for original do_GET
def patch(get_func):
    def wrapper(self):
        if self.path == '/test_url':
            # do something
            message = "Successful" # or not :(
            self.send_response(200)
            self.send_header("Content-type", 'text/plain')
            self.send_header("Content-Length", str(len(message)))
            self.end_headers()
            self.wfile.write(message)
        else:
            return get_func(self)

    return wrapper

# assume `server` is an instance of WebhookHandler
server.do_GET = patch(server.do_GET)  # monkeypatching

UPD2:在仔细查看BaseServer的代码后,我发现它为每个请求启动了一个新的请求处理程序实例。这意味着修补实例不起作用,我们需要修补类本身。下面是它的工作原理:

# ... somewhere in the code far far away

from telegram.ext import Updater
from telegram.utils import webhookhandler as wh

# IMPORTANT: do it before making an instance of updater
# IMPORTANT #2: this is considered dirty hack. Don't repeat it at home!
if not wh.WebhookHandler._fuse:
    # note we're patching handler class itself, not an instance
    wh.WebhookHandler.do_GET = patch(wh.WebhookHandler.do_GET)  # use patch() from above
    wh.WebhookHandler._fuse = True

updater = Updater(token='TOKEN')
于 2016-11-14T17:05:24.337 回答