我们开始为第一个 Apache Beam 测试实现一个 mongoDB IO 连接器,并且 Source 部分似乎工作正常。关于 Sink 部分,执行会导致错误...我们使用此指南进行实施:https ://beam.apache.org/documentation/sdks/python-custom-io/
我们的实现可以在这里找到:https ://github.com/PEAT-AI/beam-extended
我尝试运行的示例如下:
"""
A simple example of how to use the MongoDB reader and writer.
If you like, you can test it out with these commands (requires Docker and
virtualenv for python2):
$ virtualenv venv
$ source venv/bin/activate
$ pip install google-cloud-dataflow pymongo
$ # The following line is optional if mongod is running already
$ sudo service mongod stop
$ docker run -p 27017:27017 --name dataflowtest --rm mongo:3.2
$ docker exec -it dataflowtest mongo
> use mydb
> db.mycollection.insert({ _id: ObjectId() })
> exit
$ python -m simple
$ # The following line is optional if mongod was shut down previously
$ sudo service mongod start
"""
from __future__ import absolute_import
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_extended.io.mongodbio import ReadFromMongo, WriteToMongo
def transform_doc(document):
print(document)
return {'_id': str(document['_id'])}
def run(argv=None):
"""Main entry point; defines and runs the aggregation pipeline."""
connection_string = 'mongodb://localhost:27017'
# Can also fetch a connection string from a Google Cloud Storage file.
# This might be preferable to avoid pickling the mongodb connection string.
# E.g.
# connection_string = 'gs://my-bucket/mongo_connection_string.txt'
# where "mongo_connection_string.txt" contains a single line with the connection string.
# with beam.Pipeline(runner='DirectRunner', options=PipelineOptions()) as pipeline:
options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
(pipeline
| 'read' >> ReadFromMongo(connection_string, 'mydb', 'mycollection', query={}, projection=['_id'])
| 'transform' >> beam.Map(transform_doc)
| 'save' >> WriteToMongo(connection_string, 'mydb', 'mycollection'))
# | 'save' >> beam.io.WriteToText('./simple.txt'))
if __name__ == '__main__':
# logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
run()
我得到以下堆栈跟踪:
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:beam_extended.io.mongodbio:Raw query: {}
INFO:beam_extended.io.mongodbio:Cleaned query: {}
INFO:beam_extended.io.mongodbio:Starting MongoDB read from mydb.mycollection with query {}
T4: <class 'beam_extended.io.mongodbio._MongoSink'>
INFO:dill:T4: <class 'beam_extended.io.mongodbio._MongoSink'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fc9050>
INFO:dill:D2: <dict object at 0x7fdb82fc9050>
T4: <class 'pymongo.mongo_client.MongoClient'>
INFO:dill:T4: <class 'pymongo.mongo_client.MongoClient'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fc96e0>
INFO:dill:D2: <dict object at 0x7fdb82fc96e0>
T4: <class 'pymongo.monitoring._EventListeners'>
INFO:dill:T4: <class 'pymongo.monitoring._EventListeners'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fd5050>
INFO:dill:D2: <dict object at 0x7fdb82fd5050>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fdb82f605c8>
INFO:dill:D2: <dict object at 0x7fdb82f605c8>
# D2
INFO:dill:# D2
Lo: <thread.lock object at 0x7fdb8488aef0>
INFO:dill:Lo: <thread.lock object at 0x7fdb8488aef0>
F2: <function _create_lock at 0x7fdb875d57d0>
INFO:dill:F2: <function _create_lock at 0x7fdb875d57d0>
# F2
INFO:dill:# F2
# Lo
INFO:dill:# Lo
T4: <class 'pymongo.read_concern.ReadConcern'>
INFO:dill:T4: <class 'pymongo.read_concern.ReadConcern'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fcae88>
INFO:dill:D2: <dict object at 0x7fdb82fcae88>
# D2
INFO:dill:# D2
T4: <class 'pymongo.read_preferences.Primary'>
INFO:dill:T4: <class 'pymongo.read_preferences.Primary'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f32910>
INFO:dill:D2: <dict object at 0x7fdb82f32910>
# D2
INFO:dill:# D2
T4: <class 'pymongo.topology.Topology'>
INFO:dill:T4: <class 'pymongo.topology.Topology'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fca7f8>
INFO:dill:D2: <dict object at 0x7fdb82fca7f8>
T4: <class 'pymongo.client_session._ServerSessionPool'>
INFO:dill:T4: <class 'pymongo.client_session._ServerSessionPool'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f43280>
INFO:dill:D2: <dict object at 0x7fdb82f43280>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fdb82fc4e88>
INFO:dill:D2: <dict object at 0x7fdb82fc4e88>
T4: <class 'pymongo.server.Server'>
INFO:dill:T4: <class 'pymongo.server.Server'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82fd05c8>
INFO:dill:D2: <dict object at 0x7fdb82fd05c8>
T4: <class 'pymongo.server_description.ServerDescription'>
INFO:dill:T4: <class 'pymongo.server_description.ServerDescription'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f327f8>
INFO:dill:D2: <dict object at 0x7fdb82f327f8>
D2: <dict object at 0x7fdb82fd3b40>
INFO:dill:D2: <dict object at 0x7fdb82fd3b40>
# D2
INFO:dill:# D2
T4: <type 'set'>
INFO:dill:T4: <type 'set'>
# T4
INFO:dill:# T4
# D2
INFO:dill:# D2
C2: pymongo.pool.Pool
INFO:dill:C2: pymongo.pool.Pool
# C2
INFO:dill:# C2
D2: <dict object at 0x7fdb82fcfa28>
INFO:dill:D2: <dict object at 0x7fdb82fcfa28>
Lo: <thread.lock object at 0x7fdb8488afb0>
INFO:dill:Lo: <thread.lock object at 0x7fdb8488afb0>
# Lo
INFO:dill:# Lo
C2: pymongo.thread_util.BoundedSemaphore
INFO:dill:C2: pymongo.thread_util.BoundedSemaphore
# C2
INFO:dill:# C2
D2: <dict object at 0x7fdb82f8c4b0>
INFO:dill:D2: <dict object at 0x7fdb82f8c4b0>
T4: <class 'threading._Condition'>
INFO:dill:T4: <class 'threading._Condition'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f8c398>
INFO:dill:D2: <dict object at 0x7fdb82f8c398>
Lo: <thread.lock object at 0x7fdb8488afd0>
INFO:dill:Lo: <thread.lock object at 0x7fdb8488afd0>
# Lo
INFO:dill:# Lo
B3: <built-in method acquire of thread.lock object at 0x7fdb8488afd0>
INFO:dill:B3: <built-in method acquire of thread.lock object at 0x7fdb8488afd0>
F2: <function _get_attr at 0x7fdb875d5f50>
INFO:dill:F2: <function _get_attr at 0x7fdb875d5f50>
# F2
INFO:dill:# F2
# B3
INFO:dill:# B3
B3: <built-in method release of thread.lock object at 0x7fdb8488afd0>
INFO:dill:B3: <built-in method release of thread.lock object at 0x7fdb8488afd0>
# B3
INFO:dill:# B3
# D2
INFO:dill:# D2
# D2
INFO:dill:# D2
T4: <class 'pymongo.network.SocketChecker'>
INFO:dill:T4: <class 'pymongo.network.SocketChecker'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fdb82f8c168>
INFO:dill:D2: <dict object at 0x7fdb82f8c168>
Traceback (most recent call last):
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/beam_extended/examples/simple.py", line 62, in <module>
run()
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/beam_extended/examples/simple.py", line 54, in run
| 'save' >> WriteToMongo(connection_string, 'mydb', 'mycollection'))
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pvalue.py", line 111, in __or__
return self.pipeline.apply(ptransform, self)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 458, in apply
label or transform.label)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 468, in apply
return self.apply(transform, pvalueish)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 504, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
return m(transform, input)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 199, in apply_PTransform
return transform.expand(input)
File "beam_extended/io/mongodbio.py", line 206, in expand
return pcoll | iobase.Write(self._sink)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pvalue.py", line 111, in __or__
return self.pipeline.apply(ptransform, self)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 504, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
return m(transform, input)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 199, in apply_PTransform
return transform.expand(input)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 959, in expand
return pcoll | WriteImpl(self.sink)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pvalue.py", line 111, in __or__
return self.pipeline.apply(ptransform, self)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 504, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
return m(transform, input)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 199, in apply_PTransform
return transform.expand(input)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 1007, in expand
AsIter(write_result_coll))
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/transforms/core.py", line 998, in FlatMap
pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/transforms/core.py", line 838, in __init__
super(ParDo, self).__init__(fn, *args, **kwargs)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 639, in __init__
self.args = pickler.loads(pickler.dumps(self.args))
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 216, in dumps
s = dill.dumps(o)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 293, in dumps
dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 286, in dump
pik.dump(obj)
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
return old_save_module_dict(pickler, obj)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
return old_save_module_dict(pickler, obj)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
return old_save_module_dict(pickler, obj)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
return old_save_module_dict(pickler, obj)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 692, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
return old_save_module_dict(pickler, obj)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 731, in save_inst
save(stuff)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
return old_save_module_dict(pickler, obj)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 184, in new_save_module_dict
return old_save_module_dict(pickler, obj)
File "/home/pascal/Wks/GitHub/PEAT-AI/beam-extended/sdks/python/venv/local/lib/python2.7/site-packages/dill/_dill.py", line 893, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 313, in save
(t.__name__, obj))
pickle.PicklingError: Can't pickle 'poll' object: <select.poll object at 0x7fdb82f95ab0>