0

软件版本:

pyflink:1.14.2
python:3.8.10
pycaret:2.3.5

我将 Pycaret 集成到 pyflink&flink docker-compose 环境中,并且我有一个Pycaret模型iforest.pkl文件,当 python 脚本加载pkl模型时ProcessFunction发生错误。它总是显示错误module '__main__' has no attribute 'print',我在本地调试并没有找到' main '方法。我的本地调试,我使用python命令直接执行。在 compose env 中,我./bin/flink run --python xxx.py用来运行pyflink脚本。

有谁知道这个错误?错误详情如下:

    at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
    at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
    at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
    ... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 463, in get
    processor = bundle_processor.BundleProcessor(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 921, in create_execution_tree
    return collections.OrderedDict([(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation
    transform_consumers = {
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 908, in get_operation
    return transform_factory.create_operation(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 129, in create_data_stream_keyed_process_function
    return _create_user_defined_function_operation(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 199, in _create_user_defined_function_operation
    return beam_operation_cls(
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", line 80, in __init__
    extract_stateless_function(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", line 158, in extract_stateless_function
    user_defined_func = pickle.loads(user_defined_function_proto.payload)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
    return cloudpickle.loads(payload)
  File "/usr/local/lib/python3.8/site-packages/pycaret/anomaly.py", line 16, in <module>
    import pycaret.internal.tabular
  File "/usr/local/lib/python3.8/site-packages/pycaret/internal/tabular.py", line 48, in <module>
    import pycaret.internal.preprocess
  File "/usr/local/lib/python3.8/site-packages/pycaret/internal/preprocess.py", line 27, in <module>
    from pyod.models.knn import KNN
  File "/usr/local/lib/python3.8/site-packages/pyod/__init__.py", line 4, in <module>
    from . import utils
  File "/usr/local/lib/python3.8/site-packages/pyod/utils/__init__.py", line 12, in <module>
    from .stat_models import pairwise_distances_no_broadcast
  File "/usr/local/lib/python3.8/site-packages/pyod/utils/stat_models.py", line 11, in <module>
    from numba import njit
  File "/usr/local/lib/python3.8/site-packages/numba/__init__.py", line 39, in <module>
    from numba.core.decorators import (cfunc, generated_jit, jit, njit, stencil,
  File "/usr/local/lib/python3.8/site-packages/numba/core/decorators.py", line 12, in <module>
    from numba.stencils.stencil import stencil
  File "/usr/local/lib/python3.8/site-packages/numba/stencils/stencil.py", line 11, in <module>
    from numba.core import types, typing, utils, ir, config, ir_utils, registry
  File "/usr/local/lib/python3.8/site-packages/numba/core/registry.py", line 4, in <module>
    from numba.core import utils, typing, dispatcher, cpu
  File "/usr/local/lib/python3.8/site-packages/numba/core/dispatcher.py", line 14, in <module>
    from numba.core import (
  File "/usr/local/lib/python3.8/site-packages/numba/core/compiler.py", line 6, in <module>
    from numba.core import (utils, errors, typing, interpreter, bytecode, postproc,
  File "/usr/local/lib/python3.8/site-packages/numba/core/callconv.py", line 12, in <module>
    from numba.core.base import PYOBJECT, GENERIC_POINTER
  File "/usr/local/lib/python3.8/site-packages/numba/core/base.py", line 23, in <module>
    from numba.cpython import builtins
  File "/usr/local/lib/python3.8/site-packages/numba/cpython/builtins.py", line 519, in <module>
    from numba.core.typing.builtins import IndexValue, IndexValueType
  File "/usr/local/lib/python3.8/site-packages/numba/core/typing/builtins.py", line 22, in <module>
    @infer_global(print)
  File "/usr/local/lib/python3.8/site-packages/numba/core/typing/templates.py", line 1262, in register_global
    if getattr(mod, val.__name__) is not val:
AttributeError: module '__main__' has no attribute 'print'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    ... 3 more
4

0 回答 0