2

我正在使用 PyFlink,我想对我用 Python 编写的 UDF 进行单元测试。

要测试下面的简单 udf:

# tasks/helloworld/udf.py
from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(input_types=[DataTypes.INT(), DataTypes.INT()], result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

我创建了一个应该失败的测试文件:

from tasks.helloworld.udf import add

def test_add():
    assert add(1,1) == 3

可悲的是,如果我运行它就会通过pytest

> pytest
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item

tests/test_helloworld.py .                                                                                                                                                                           [100%]

============================================================================================= warnings summary =============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
  /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
    from collections import (

../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291
  /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
    if not isinstance(input_types, collections.Iterable) \

-- Docs: https://docs.pytest.org/en/stable/warnings.html
====================================================================================== 1 passed, 6 warnings in 0.98s =======================================================================================

@udf(input_types=[...], result_type=...)但是,如果我从 udf.py中删除注释,测试将按预期失败。

# tasks/helloworld/udf.py
from pyflink.table import DataTypes
from pyflink.table.udf import udf

# Comment the udf annotation
# @udf(input_types=[DataTypes.INT(), DataTypes.INT()], result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

结果:

> pytest
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item

tests/test_helloworld.py F                                                                                                                                                                           [100%]

================================================================================================= FAILURES =================================================================================================
_________________________________________________________________________________________________ test_add _________________________________________________________________________________________________

    def test_add():
>       assert add(1,1) == 3
E       assert 2 == 3
E        +  where 2 = add(1, 1)

tests/test_helloworld.py:4: AssertionError
============================================================================================= warnings summary =============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
  /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
    from collections import (

-- Docs: https://docs.pytest.org/en/stable/warnings.html
========================================================================================= short test summary info ==========================================================================================
FAILED tests/test_helloworld.py::test_add - assert 2 == 3
====================================================================================== 1 failed, 5 warnings in 0.17s =======================================================================================

完整示例可以在https://github.com/YikSanChan/how-to-pytest-flink找到。

4

1 回答 1

2

我从 Apache Flink 用户邮件列表中移植了Dian Fu 的答案,这解决了我的问题。

由于 udfadd是用@udf装饰器装饰的,所以如果你引用它,它不再是一个简单的 Python 函数add。如果你执行print(type(add(1, 1))),你会看到输出类似于“<class 'pyflink.table.expression.Expression'>”。

您可以尝试以下代码: assert add._func(1, 1) == 3

add._func 返回原始 Python 函数。

于 2021-03-24T00:06:01.897 回答