0

在花了几天时间寻找信息但没有找到任何关于它的信息之后,我在这里发布了一个问题,并且还没有得到任何地方进行更改。

我有一个 cronjob,它每 30 分钟启动一个带有非常简单的 python3 脚本的 Docker 容器(请参阅下面的日志),它收集(通过 API 请求)每种语言的前 100 个 Twitch 流。API 响应使用 Pandas 处理,最后使用 Pandas to_sql() 函数存储在 AWS RDS 上托管的 MariaDB 中。

一切正常,数据已存储并可访问。一切都是正确的。但完全随机,脚本在 to_sql() 步骤上失败。

让我解释一下我的 .log 文件(如下):您将在晚上 20:30:04 看到容器启动,发出 API 请求,创建数据帧,在 RDS DB 上转储,并且作业正常结束(几秒钟后)。然后,哦,惊喜!下一个捕获,在晚上 21:00:04 转储到 RDS DB 失败。

然后,在几个小时没有工作后,它会在下午 23:30:04 再次正确捕获数据并存储信息。直到现在。这种情况每天发生一次或两次,完全随机。

更多信息:我正在与 MariaDB 本地数据库并行运行相同的 python 脚本,并且一切正常。我还检查了数据的完整性,以确保没有错误

有人可以告诉我可能发生的事情吗?谢谢!

这里的日志序列:

[36mtwitch_capture_1  |[0m starting capture for es at: 22-04-2021 20:30:04
[36mtwitch_capture_1  |[0m creating API request for es
[36mtwitch_capture_1  |[0m request ok
[36mtwitch_capture_1  |[0m parsing response
[36mtwitch_capture_1  |[0m creating Dataframe
[36mtwitch_capture_1  |[0m Dumping on MYSQL RDS
[36mtwitch_capture_1  |[0m Language es captured status: OK at 22-04-2021 20:30:04
[36mtwitch_capture_1  |[0m JOB ENDED AT: 22-04-2021 20:30:08
[36mtwitch_capture_1  |[0m starting capture for es at: 22-04-2021 21:00:04
[36mtwitch_capture_1  |[0m creating API request for es
[36mtwitch_capture_1  |[0m request ok
[36mtwitch_capture_1  |[0m parsing response
[36mtwitch_capture_1  |[0m creating Dataframe
[36mtwitch_capture_1  |[0m Dumping on MYSQL RDS
[36mtwitch_capture_1  |[0m Traceback (most recent call last):
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 732, in _read_bytes
[36mtwitch_capture_1  |[0m     data = self._rfile.read(num_bytes)
[36mtwitch_capture_1  |[0m   File "/usr/lib/python3.8/socket.py", line 669, in readinto
[36mtwitch_capture_1  |[0m     return self._sock.recv_into(b)
[36mtwitch_capture_1  |[0m ConnectionResetError: [Errno 104] Connection reset by peer
[36mtwitch_capture_1  |[0m 
[36mtwitch_capture_1  |[0m During handling of the above exception, another exception occurred:
[36mtwitch_capture_1  |[0m 
[36mtwitch_capture_1  |[0m Traceback (most recent call last):
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3141, in _wrap_pool_connect
[36mtwitch_capture_1  |[0m     return fn()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 301, in connect
[36mtwitch_capture_1  |[0m     return _ConnectionFairy._checkout(self)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 755, in _checkout
[36mtwitch_capture_1  |[0m     fairy = _ConnectionRecord.checkout(pool)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 419, in checkout
[36mtwitch_capture_1  |[0m     rec = pool._do_get()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
[36mtwitch_capture_1  |[0m     self._dec_overflow()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
[36mtwitch_capture_1  |[0m     compat.raise_(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 211, in raise_
[36mtwitch_capture_1  |[0m     raise exception
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
[36mtwitch_capture_1  |[0m     return self._create_connection()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
[36mtwitch_capture_1  |[0m     return _ConnectionRecord(self)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 362, in __init__
[36mtwitch_capture_1  |[0m     self.__connect(first_connect_check=True)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 605, in __connect
[36mtwitch_capture_1  |[0m     pool.logger.debug("Error on connect(): %s", e)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
[36mtwitch_capture_1  |[0m     compat.raise_(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 211, in raise_
[36mtwitch_capture_1  |[0m     raise exception
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 599, in __connect
[36mtwitch_capture_1  |[0m     connection = pool._invoke_creator(self)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/create.py", line 578, in connect
[36mtwitch_capture_1  |[0m     return dialect.connect(*cargs, **cparams)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/default.py", line 559, in connect
[36mtwitch_capture_1  |[0m     return self.dbapi.connect(*cargs, **cparams)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 353, in __init__
[36mtwitch_capture_1  |[0m     self.connect()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 632, in connect
[36mtwitch_capture_1  |[0m     self._get_server_information()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 1055, in _get_server_information
[36mtwitch_capture_1  |[0m     packet = self._read_packet()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 692, in _read_packet
[36mtwitch_capture_1  |[0m     packet_header = self._read_bytes(4)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 738, in _read_bytes
[36mtwitch_capture_1  |[0m     raise err.OperationalError(
[36mtwitch_capture_1  |[0m pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query ([Errno 104] Connection reset by peer)')
[36mtwitch_capture_1  |[0m 
[36mtwitch_capture_1  |[0m The above exception was the direct cause of the following exception:
[36mtwitch_capture_1  |[0m 
[36mtwitch_capture_1  |[0m Traceback (most recent call last):
[36mtwitch_capture_1  |[0m   File "home/app/app/get_ranking_es.py", line 91, in <module>
[36mtwitch_capture_1  |[0m     get_language(lang)
[36mtwitch_capture_1  |[0m   File "home/app/app/get_ranking_es.py", line 87, in get_language
[36mtwitch_capture_1  |[0m     dataframe.to_sql(table, index=False, con=engine, if_exists='append', chunksize=50)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pandas/core/generic.py", line 2779, in to_sql
[36mtwitch_capture_1  |[0m     sql.to_sql(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pandas/io/sql.py", line 601, in to_sql
[36mtwitch_capture_1  |[0m     pandas_sql.to_sql(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pandas/io/sql.py", line 1406, in to_sql
[36mtwitch_capture_1  |[0m     table.create()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pandas/io/sql.py", line 736, in create
[36mtwitch_capture_1  |[0m     if self.exists():
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pandas/io/sql.py", line 720, in exists
[36mtwitch_capture_1  |[0m     return self.pd_sql.has_table(self.name, self.schema)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pandas/io/sql.py", line 1454, in has_table
[36mtwitch_capture_1  |[0m     insp = sa.inspect(self.connectable)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/inspection.py", line 64, in inspect
[36mtwitch_capture_1  |[0m     ret = reg(subject)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/reflection.py", line 182, in _engine_insp
[36mtwitch_capture_1  |[0m     return Inspector._construct(Inspector._init_engine, bind)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/reflection.py", line 117, in _construct
[36mtwitch_capture_1  |[0m     init(self, bind)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/reflection.py", line 128, in _init_engine
[36mtwitch_capture_1  |[0m     engine.connect().close()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3095, in connect
[36mtwitch_capture_1  |[0m     return self._connection_cls(self, close_with_result=close_with_result)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 91, in __init__
[36mtwitch_capture_1  |[0m     else engine.raw_connection()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3174, in raw_connection
[36mtwitch_capture_1  |[0m     return self._wrap_pool_connect(self.pool.connect, _connection)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3144, in _wrap_pool_connect
[36mtwitch_capture_1  |[0m     Connection._handle_dbapi_exception_noconnection(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 2003, in _handle_dbapi_exception_noconnection
[36mtwitch_capture_1  |[0m     util.raise_(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 211, in raise_
[36mtwitch_capture_1  |[0m     raise exception
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3141, in _wrap_pool_connect
[36mtwitch_capture_1  |[0m     return fn()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 301, in connect
[36mtwitch_capture_1  |[0m     return _ConnectionFairy._checkout(self)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 755, in _checkout
[36mtwitch_capture_1  |[0m     fairy = _ConnectionRecord.checkout(pool)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 419, in checkout
[36mtwitch_capture_1  |[0m     rec = pool._do_get()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
[36mtwitch_capture_1  |[0m     self._dec_overflow()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
[36mtwitch_capture_1  |[0m     compat.raise_(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 211, in raise_
[36mtwitch_capture_1  |[0m     raise exception
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
[36mtwitch_capture_1  |[0m     return self._create_connection()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
[36mtwitch_capture_1  |[0m     return _ConnectionRecord(self)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 362, in __init__
[36mtwitch_capture_1  |[0m     self.__connect(first_connect_check=True)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 605, in __connect
[36mtwitch_capture_1  |[0m     pool.logger.debug("Error on connect(): %s", e)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
[36mtwitch_capture_1  |[0m     compat.raise_(
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 211, in raise_
[36mtwitch_capture_1  |[0m     raise exception
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 599, in __connect
[36mtwitch_capture_1  |[0m     connection = pool._invoke_creator(self)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/create.py", line 578, in connect
[36mtwitch_capture_1  |[0m     return dialect.connect(*cargs, **cparams)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/default.py", line 559, in connect
[36mtwitch_capture_1  |[0m     return self.dbapi.connect(*cargs, **cparams)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 353, in __init__
[36mtwitch_capture_1  |[0m     self.connect()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 632, in connect
[36mtwitch_capture_1  |[0m     self._get_server_information()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 1055, in _get_server_information
[36mtwitch_capture_1  |[0m     packet = self._read_packet()
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 692, in _read_packet
[36mtwitch_capture_1  |[0m     packet_header = self._read_bytes(4)
[36mtwitch_capture_1  |[0m   File "/usr/local/lib/python3.8/dist-packages/pymysql/connections.py", line 738, in _read_bytes
[36mtwitch_capture_1  |[0m     raise err.OperationalError(
[36mtwitch_capture_1  |[0m sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query ([Errno 104] Connection reset by peer)')
[36mtwitch_capture_1  |[0m (Background on this error at: http://sqlalche.me/e/14/e3q8)
[36mtwitch_capture_1  |[0m starting capture for es at: 22-04-2021 23:30:04
[36mtwitch_capture_1  |[0m creating API request for es
[36mtwitch_capture_1  |[0m request ok
[36mtwitch_capture_1  |[0m parsing response
[36mtwitch_capture_1  |[0m creating Dataframe
[36mtwitch_capture_1  |[0m Dumping on MYSQL RDS
[36mtwitch_capture_1  |[0m Language es captured status: OK at 22-04-2021 23:30:04
[36mtwitch_capture_1  |[0m JOB ENDED AT: 22-04-2021 23:30:08

这里是python脚本

# Global Libraries
from twitchAPI.twitch import Twitch
from datetime import datetime, timedelta
import pandas as pd
import credentials as cr

# MYSQL Libraries
import pymysql
from sqlalchemy import create_engine

# CREATE DATABASE CONNECTION
host = "xxxxxxxxxxxxxx"
user = "xxx"
pw = "xxxxxx"
port = nnnnn
database = "xxxxxxxxxxx"
table = "xxxxxxxxxxx"

engine = create_engine(f"mysql+pymysql://{user}:{pw}@{host}:{port}/{database}?charset=utf8mb4")

# Normalize datetime
hour = datetime.now()
correct_hour = hour+timedelta(hours=2)
time = correct_hour.strftime("%d-%m-%Y %H:%M:%S")

# Create Login Twitch API
twitch = Twitch(cr.CLIENT, cr.SECRET)

# Languages to capture
langs = ["es"]

def get_language(lang):

    # ACTUAL TIME
    hour = datetime.now()
    correct_hour = hour+timedelta(hours=2)
    capture_time = correct_hour.strftime("%d-%m-%Y %H:%M:%S")
    operation_time = correct_hour

    # MAKE THE REQUEST
    print(f"starting capture for {lang} at: "+capture_time)
    print(f"creating API request for {lang}")
    twitch.authenticate_app([])
    response = twitch.get_streams(language=lang, first=100)
    print("request ok")
    # ADD ACTUAL TIME TO REQUEST RESPONSE
    response["captured_at"] = capture_time

    # GET ELEMENT INDEX IN RESPONSE
    elements_list = response["data"]
    print("parsing response")
    lang_dataframe = []

    for element in elements_list:
        position = (elements_list.index(element)) + 1
        element["position"] = position
        element["captured_at"] = capture_time
        element["capture_time_obj"] = operation_time;

        df = pd.DataFrame({
            "captured_at_str": element["captured_at"],
            "captured_at_obj": element["capture_time_obj"],
            "position": element["position"],
            "language": element["language"],
            "channel_id": element["id"],
            "streamer_id": element["user_id"],
            "streamer_login": element["user_login"],
            "streamer_name": element["user_name"],
            "game_id": element["game_id"],
            "game_name": element["game_name"],
            "stream_type": element["type"],
            "stream_title": element["title"],
            "viewer_count": element["viewer_count"],
            "started_at": element["started_at"],
        }, index=[element["position"]])

        lang_dataframe.append(df)
    print("creating Dataframe")
    dataframe = pd.concat(lang_dataframe)
    print("Dumping on MYSQL RDS")
    dataframe.to_sql(table, index=False, con=engine, if_exists='append', chunksize=50)
    print(f"Language {lang} captured status: OK at "+ capture_time)

for lang in langs:
    get_language(lang)

hour = datetime.now()
correct_hour = hour+timedelta(hours=2)
time = correct_hour.strftime("%d-%m-%Y %H:%M:%S")

print("JOB ENDED AT: "+ time)
engine.dispose()
4

0 回答 0