我正在使用 Python 中的雪花 PUT 命令将文件从本地系统移动到雪花暂存。
我有 400 个(每个 40 MB)文件,所以我使用的命令是 -> put file:///Path/file_name*
它正在工作并加载所有文件,但大约需要 30 分钟。
我想知道进度,以便我可以确定它的进度,是否有办法在每个文件加载后打印日志(文件 1 移至暂存,文件 2 移至暂存等)
我正在使用 Python 中的雪花 PUT 命令将文件从本地系统移动到雪花暂存。
我有 400 个(每个 40 MB)文件,所以我使用的命令是 -> put file:///Path/file_name*
它正在工作并加载所有文件,但大约需要 30 分钟。
我想知道进度,以便我可以确定它的进度,是否有办法在每个文件加载后打印日志(文件 1 移至暂存,文件 2 移至暂存等)
有没有办法在每个文件加载后打印日志?
虽然从库中使用语句执行是非交互式的,但 Snowflake python 连接器确实支持记录其执行工作。
这是一个简短的片段,其中包含上面链接中的示例:
# Assumes a 'con' object pre-exists and is connected to Snowflake already
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter('%(asctime)s - %(funcName)s() - %(message)s'))
logger.addHandler(ch)
con.cursor().execute("put file:///Path/file_name* @stage_name")
# Optional, custom app log:
# logging.info("put command completed execution, exiting")
con.close()
在该程序运行时观察输出(到标准错误)将产生以下内容(仅针对上传消息进行过滤):
~> python3 your_logging_script.py 2>&1 | grep -F "upload_one_file()"
[…]
2020-06-24 04:57:06,495 - upload_one_file() - done: status=ResultStatus.UPLOADED, file=/Path/file_name1, (…)
2020-06-24 04:57:07,312 - upload_one_file() - done: status=ResultStatus.UPLOADED, file=/Path/file_name2, (…)
2020-06-24 04:57:09,121 - upload_one_file() - done: status=ResultStatus.UPLOADED, file=/Path/file_name3, (…)
[…]
您还可以将 python 记录器配置为使用文件,并尾随文件,而不是依赖于上面用于简单的stderr
(from logging.StreamHandler
) 。
如果您只需要过滤特定消息的日志记录,logging
python 模块支持附加您自己的过滤器,这些过滤器决定了发出的每条记录。以下过滤器仅用于upload_one_file()
函数调用消息(使用record.message
字段过滤日志消息而不是下面示例中使用的函数名称):
class UploadFilter(logging.Filter):
def filter(self, record):
# Only tests one condition, but you could chain conditions here
return "upload_one_file" in record.funcName.lower()
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter('%(asctime)s - %(funcName)s() - %(message)s'))
ch.addFilter(UploadFilter())
# ch.addFilter(AnyOtherFilterClass())
logger.addHandler(ch)
注意:如果要更改处理程序(流到文件),请确保将过滤器也添加到实际的新处理程序,并将处理程序添加到记录器。您可以阅读Python 登录教程以更好地了解其机制。