0

我们开始为第一个 Apache Beam 测试实现一个 mongoDB IO 连接器,并且 Source 部分似乎工作正常。关于 Sink 部分,执行会导致错误...我们使用此指南进行实施:https ://beam.apache.org/documentation/sdks/python-custom-io/

我们的实现可以在这里找到:https ://github.com/PEAT-AI/beam-extended

我尝试运行的示例如下:

"""
A simple example of how to use the MongoDB reader and writer.

If you like, you can test it out with these commands (requires Docker and
virtualenv for python2):

    $ virtualenv venv
    $ source venv/bin/activate
    $ pip install google-cloud-dataflow pymongo
    $ # The following line is optional if mongod is running already
    $ sudo service mongod stop
    $ docker run -p 27017:27017 --name dataflowtest --rm mongo:3.2
    $ docker exec -it dataflowtest mongo
    > use mydb
    > db.mycollection.insert({ _id: ObjectId() })
    > exit
    $ python -m simple
    $ # The following line is optional if mongod was shut down previously
    $ sudo service mongod start

"""

from __future__ import absolute_import

import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from beam_extended.io.mongodbio import ReadFromMongo, WriteToMongo


def transform_doc(document):
    print(document)
    return {'_id': str(document['_id'])}


def run(argv=None):
    """Main entry point; defines and runs the aggregation pipeline."""

    connection_string = 'mongodb://localhost:27017'
    # Can also fetch a connection string from a Google Cloud Storage file.
    # This might be preferable to avoid pickling the mongodb connection string.
    # E.g.
    # connection_string = 'gs://my-bucket/mongo_connection_string.txt'
    # where "mongo_connection_string.txt" contains a single line with the connection string.

    # with beam.Pipeline(runner='DirectRunner', options=PipelineOptions()) as pipeline:
    options = PipelineOptions()
    with beam.Pipeline(options=options) as pipeline:
        (pipeline
         | 'read' >> ReadFromMongo(connection_string, 'mydb', 'mycollection', query={}, projection=['_id'])
         | 'transform' >> beam.Map(transform_doc)
         | 'save' >> WriteToMongo(connection_string, 'mydb', 'mycollection'))
        #  | 'save' >> beam.io.WriteToText('./simple.txt'))


if __name__ == '__main__':
    # logging.getLogger().setLevel(logging.DEBUG)
    logging.getLogger().setLevel(logging.INFO)

    run()

我得到以下堆栈跟踪:

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:beam_extended.io.mongodbio:Raw query: {}
INFO:beam_extended.io.mongodbio:Cleaned query: {}
INFO:beam_extended.io.mongodbio:Starting MongoDB read from mydb.mycollection with query {}
T4: <class 'beam_extended.io.mongodbio._MongoSink'>
INFO:dill:T4: <class 'beam_extended.io.mongodbio._MongoSink'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fc9050>
INFO:dill:D2: <dict object at 0x7fdb82fc9050>
T4: <class 'pymongo.mongo_client.MongoClient'>
INFO:dill:T4: <class 'pymongo.mongo_client.MongoClient'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fc96e0>
INFO:dill:D2: <dict object at 0x7fdb82fc96e0>
T4: <class 'pymongo.monitoring._EventListeners'>
INFO:dill:T4: <class 'pymongo.monitoring._EventListeners'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fd5050>
INFO:dill:D2: <dict object at 0x7fdb82fd5050>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fdb82f605c8>
INFO:dill:D2: <dict object at 0x7fdb82f605c8>
# D2
INFO:dill:# D2
Lo: <thread.lock object at 0x7fdb8488aef0>
INFO:dill:Lo: <thread.lock object at 0x7fdb8488aef0>
F2: <function _create_lock at 0x7fdb875d57d0>
INFO:dill:F2: <function _create_lock at 0x7fdb875d57d0>
# F2
INFO:dill:# F2
# Lo
INFO:dill:# Lo
T4: <class 'pymongo.read_concern.ReadConcern'>
INFO:dill:T4: <class 'pymongo.read_concern.ReadConcern'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fcae88>
INFO:dill:D2: <dict object at 0x7fdb82fcae88>
# D2
INFO:dill:# D2
T4: <class 'pymongo.read_preferences.Primary'>
INFO:dill:T4: <class 'pymongo.read_preferences.Primary'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f32910>
INFO:dill:D2: <dict object at 0x7fdb82f32910>
# D2
INFO:dill:# D2
T4: <class 'pymongo.topology.Topology'>
INFO:dill:T4: <class 'pymongo.topology.Topology'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fca7f8>
INFO:dill:D2: <dict object at 0x7fdb82fca7f8>
T4: <class 'pymongo.client_session._ServerSessionPool'>
INFO:dill:T4: <class 'pymongo.client_session._ServerSessionPool'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f43280>
INFO:dill:D2: <dict object at 0x7fdb82f43280>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fdb82fc4e88>
INFO:dill:D2: <dict object at 0x7fdb82fc4e88>
T4: <class 'pymongo.server.Server'>
INFO:dill:T4: <class 'pymongo.server.Server'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fd05c8>
INFO:dill:D2: <dict object at 0x7fdb82fd05c8>
T4: <class 'pymongo.server_description.ServerDescription'>
INFO:dill:T4: <class 'pymongo.server_description.ServerDescription'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f327f8>
INFO:dill:D2: <dict object at 0x7fdb82f327f8>
D2: <dict object at 0x7fdb82fd3b40>
INFO:dill:D2: <dict object at 0x7fdb82fd3b40>
# D2
INFO:dill:# D2
T4: <type 'set'>
INFO:dill:T4: <type 'set'>
# T4
INFO:dill:# T4
# D2
INFO:dill:# D2
C2: pymongo.pool.Pool
INFO:dill:C2: pymongo.pool.Pool
# C2
INFO:dill:# C2
D2: <dict object at 0x7fdb82fcfa28>
INFO:dill:D2: <dict object at 0x7fdb82fcfa28>
Lo: <thread.lock object at 0x7fdb8488afb0>
INFO:dill:Lo: <thread.lock object at 0x7fdb8488afb0>
# Lo
INFO:dill:# Lo
C2: pymongo.thread_util.BoundedSemaphore
INFO:dill:C2: pymongo.thread_util.BoundedSemaphore
# C2
INFO:dill:# C2
D2: <dict object at 0x7fdb82f8c4b0>
INFO:dill:D2: <dict object at 0x7fdb82f8c4b0>
T4: <class 'threading._Condition'>
INFO:dill:T4: <class 'threading._Condition'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f8c398>
INFO:dill:D2: <dict object at 0x7fdb82f8c398>
Lo: <thread.lock object at 0x7fdb8488afd0>
INFO:dill:Lo: <thread.lock object at 0x7fdb8488afd0>
# Lo
INFO:dill:# Lo
B3: <built-in method acquire of thread.lock object at 0x7fdb8488afd0>
INFO:dill:B3: <built-in method acquire of thread.lock object at 0x7fdb8488afd0>
F2: <function _get_attr at 0x7fdb875d5f50>
INFO:dill:F2: <function _get_attr at 0x7fdb875d5f50>
# F2
INFO:dill:# F2
# B3
INFO:dill:# B3
B3: <built-in method release of thread.lock object at 0x7fdb8488afd0>
INFO:dill:B3: <built-in method release of thread.lock object at 0x7fdb8488afd0>
# B3
INFO:dill:# B3
# D2
INFO:dill:# D2
# D2
INFO:dill:# D2
T4: <class 'pymongo.network.SocketChecker'>
INFO:dill:T4: <class 'pymongo.network.SocketChecker'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f8c168>
INFO:dill:D2: <dict object at 0x7fdb82f8c168>
Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/beam_extended/examples/simple.py", line 62, in <module>
    run()
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/beam_extended/examples/simple.py", line 54, in run
    | 'save' >> WriteToMongo(connection_string, 'mydb', 'mycollection'))
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pvalue.py", line 111, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 458, in apply
    label or transform.label)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 468, in apply
    return self.apply(transform, pvalueish)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 504, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
    return m(transform, input)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 199, in apply_PTransform
    return transform.expand(input)
  File "beam_extended/io/mongodbio.py", line 206, in expand
    return pcoll | iobase.Write(self._sink)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pvalue.py", line 111, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 504, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
    return m(transform, input)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 199, in apply_PTransform
    return transform.expand(input)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 959, in expand
    return pcoll | WriteImpl(self.sink)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pvalue.py", line 111, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 504, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
    return m(transform, input)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 199, in apply_PTransform
    return transform.expand(input)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 1007, in expand
    AsIter(write_result_coll))
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/transforms/core.py", line 998, in FlatMap
    pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/transforms/core.py", line 838, in __init__
    super(ParDo, self).__init__(fn, *args, **kwargs)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 639, in __init__
    self.args = pickler.loads(pickler.dumps(self.args))
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 216, in dumps
    s = dill.dumps(o)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 293, in dumps
    dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 286, in dump
    pik.dump(obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 692, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 731, in save_inst
    save(stuff)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 313, in save
    (t.__name__, obj))
pickle.PicklingError: Can't pickle 'poll' object: <select.poll object at 0x7fdb82f95ab0>
4

0 回答 0