软件版本:
pyflink:1.14.2
python:3.8.10
pycaret:2.3.5
我将 Pycaret 集成到 pyflink&flink docker-compose 环境中,并且我有一个Pycaret
模型iforest.pkl
文件,当 python 脚本加载pkl
模型时ProcessFunction
发生错误。它总是显示错误module '__main__' has no attribute 'print'
,我在本地调试并没有找到' main '方法。我的本地调试,我使用python命令直接执行。在 compose env 中,我./bin/flink run --python xxx.py
用来运行pyflink
脚本。
有谁知道这个错误?错误详情如下:
at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 463, in get
processor = bundle_processor.BundleProcessor(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 921, in create_execution_tree
return collections.OrderedDict([(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation
transform_consumers = {
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 908, in get_operation
return transform_factory.create_operation(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 129, in create_data_stream_keyed_process_function
return _create_user_defined_function_operation(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 199, in _create_user_defined_function_operation
return beam_operation_cls(
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", line 80, in __init__
extract_stateless_function(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", line 158, in extract_stateless_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
File "/usr/local/lib/python3.8/site-packages/pycaret/anomaly.py", line 16, in <module>
import pycaret.internal.tabular
File "/usr/local/lib/python3.8/site-packages/pycaret/internal/tabular.py", line 48, in <module>
import pycaret.internal.preprocess
File "/usr/local/lib/python3.8/site-packages/pycaret/internal/preprocess.py", line 27, in <module>
from pyod.models.knn import KNN
File "/usr/local/lib/python3.8/site-packages/pyod/__init__.py", line 4, in <module>
from . import utils
File "/usr/local/lib/python3.8/site-packages/pyod/utils/__init__.py", line 12, in <module>
from .stat_models import pairwise_distances_no_broadcast
File "/usr/local/lib/python3.8/site-packages/pyod/utils/stat_models.py", line 11, in <module>
from numba import njit
File "/usr/local/lib/python3.8/site-packages/numba/__init__.py", line 39, in <module>
from numba.core.decorators import (cfunc, generated_jit, jit, njit, stencil,
File "/usr/local/lib/python3.8/site-packages/numba/core/decorators.py", line 12, in <module>
from numba.stencils.stencil import stencil
File "/usr/local/lib/python3.8/site-packages/numba/stencils/stencil.py", line 11, in <module>
from numba.core import types, typing, utils, ir, config, ir_utils, registry
File "/usr/local/lib/python3.8/site-packages/numba/core/registry.py", line 4, in <module>
from numba.core import utils, typing, dispatcher, cpu
File "/usr/local/lib/python3.8/site-packages/numba/core/dispatcher.py", line 14, in <module>
from numba.core import (
File "/usr/local/lib/python3.8/site-packages/numba/core/compiler.py", line 6, in <module>
from numba.core import (utils, errors, typing, interpreter, bytecode, postproc,
File "/usr/local/lib/python3.8/site-packages/numba/core/callconv.py", line 12, in <module>
from numba.core.base import PYOBJECT, GENERIC_POINTER
File "/usr/local/lib/python3.8/site-packages/numba/core/base.py", line 23, in <module>
from numba.cpython import builtins
File "/usr/local/lib/python3.8/site-packages/numba/cpython/builtins.py", line 519, in <module>
from numba.core.typing.builtins import IndexValue, IndexValueType
File "/usr/local/lib/python3.8/site-packages/numba/core/typing/builtins.py", line 22, in <module>
@infer_global(print)
File "/usr/local/lib/python3.8/site-packages/numba/core/typing/templates.py", line 1262, in register_global
if getattr(mod, val.__name__) is not val:
AttributeError: module '__main__' has no attribute 'print'
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
... 3 more