I'm trying to create ETL in GCP which will read part of data from PostgreSQL and put it in the suitable form to BigQuery. I was able to perform this task deploying Dataflow from my computer, but I failed to make it dynamic, so it will read last transferred record and transfer next 100. So I figured out, that I'll create Dataflows from Cloud Function. Everything was working OK, reading/writing to BigQuery works like a charm, but I'm stuck on PostgreSQL requited package: beam-nuggets.
In the function I'm creating pipe arguments:
pipe_arguments = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://xxx.appspot.com/staging/',
'--temp_location=gs://xxx.appspot.com/temp/',
'--runner=DataflowRunner',
'--region=europe-west4',
'--setup_file=./setup.py'
]
pipeline_options = PipelineOptions(pipe_arguments)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
Then create pipeline:
pipeline = beam.Pipeline(argv = pipe_arguments)
and run it:
pipeline.run()
If I omit:
'--setup_file=./setup.py'
everything is fine except Dataflow cannot use PostgeQSL as import:
from beam_nuggets.io import relational_db
fails.
When I add
'--setup_file=./setup.py'
line, testing function from GCP Function web portal returns:
Error: function terminated. Recommended action: inspect logs for termination reason. Details:
Full trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/apache_beam/utils/processes.py", line 85, in check_output
out = subprocess.check_output(*args, **kwargs)
File "/opt/python3.7/lib/python3.7/subprocess.py", line 411, in check_output
**kwargs).stdout
File "/opt/python3.7/lib/python3.7/subprocess.py", line 512, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/env/bin/python3.7', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpxdvj0ulx']' returned non-zero exit status 1.
, output of the failed child process b'running sdist\nrunning egg_info\ncreating example.egg-info\n'
running
python setup.py sdist --dist-dir ./tmp/
from local computer works OK.
setup.py is deployed along with function code (main.py) and requirements.txt to the Cloud Function.
Requirements.txt is used during Function deploy and looks like this:
beam-nuggets==0.15.1
google-cloud-bigquery==1.17.1
apache-beam==2.19.0
google-cloud-dataflow==2.4.0
google-apitools==0.5.31
setup.py looks like this:
from setuptools import find_packages
from setuptools import setup
REQUIRED_PACKAGES = ['beam-nuggets>=0.15.1']
setup(
name='example',
version='0.1',
install_requires=REQUIRED_PACKAGES,
packages=find_packages(),
include_package_data=True,
description='example desc'
)
I'm stuck for couple days, tried different setup.py approaches, tried to use requirements.txt instead of setup.py - no luck.
log just says:
{
insertId: "000000-88232bc6-6122-4ec8-a4f3-90e9775e89f6"
labels: {
execution_id: "78ml14shfolv"
}
logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"
receiveTimestamp: "2020-07-13T12:08:35.898729649Z"
resource: {
labels: {
function_name: "xxx"
project_id: "xxx"
region: "europe-west6"
}
type: "cloud_function"
}
severity: "INFO"
textPayload: "Executing command: ['/env/bin/python3.7', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpxdvj0ulx']"
timestamp: "2020-07-13T12:08:31.639Z"
trace: "projects/xxx/traces/c9f1b1f68ed869f187e04ea672c487a4"
}
{
insertId: "000000-3dfb239a-4067-4f9d-bd5f-bae5174e9dc7"
labels: {
execution_id: "78ml14shfolv"
}
logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"
receiveTimestamp: "2020-07-13T12:08:35.898729649Z"
resource: {
labels: {
function_name: "xxx"
project_id: "xxx"
region: "europe-west6"
}
type: "cloud_function"
}
severity: "DEBUG"
textPayload: "Function execution took 7798 ms, finished with status: 'crash'"
timestamp: "2020-07-13T12:08:35.663674738Z"
trace: "projects/xxx/traces/c9f1b1f68ed869f187e04ea672c487a4"
}
Supplementary info:
if I'm using
'--requirements_file=./requirements.txt'
instead of
'--setup_file=./setup.py'
I'm getting:
Error: memory limit exceeded.
in GCP Functions web portal while running test function.
Afrer I increased memory to 2BG it says:
Error: function terminated. Recommended action: inspect logs for termination reason. Details:
Full traceback: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/apache_beam/utils/processes.py", line 85, in check_output
out = subprocess.check_output(*args, **kwargs)
File "/opt/python3.7/lib/python3.7/subprocess.py", line 411, in check_output
**kwargs).stdout
File "/opt/python3.7/lib/python3.7/subprocess.py", line 512, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/env/bin/python3.7', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
Pip install failed for package: -r
Output from execution of subprocess: b'Collecting beam-nuggets==0.15.1
Downloading beam-nuggets-0.15.1.tar.gz (17 kB)
Saved /tmp/dataflow-requirements-cache/beam-nuggets-0.15.1.tar.gz
Collecting google-cloud-bigquery==1.17.1
Downloading google-cloud-bigquery-1.17.1.tar.gz (228 kB)
Saved /tmp/dataflow-requirements-cache/google-cloud-bigquery-1.17.1.tar.gz
Collecting apache-beam==2.19.0
Downloading apache-beam-2.19.0.zip (1.9 MB)
Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip
Collecting google-cloud-dataflow==2.4.0
Downloading google-cloud-dataflow-2.4.0.tar.gz (5.8 kB)
Saved /tmp/dataflow-requirements-cache/google-cloud-dataflow-2.4.0.tar.gz
Collecting google-apitools==0.5.31
Downloading google-apitools-0.5.31.tar.gz (173 kB)
Saved /tmp/dataflow-requirements-cache/google-apitools-0.5.31.tar.gz
Collecting SQLAlchemy<2.0.0,>=1.2.14
Downloading SQLAlchemy-1.3.18.tar.gz (6.0 MB)
Saved /tmp/dataflow-requirements-cache/SQLAlchemy-1.3.18.tar.gz
Collecting sqlalchemy-utils<0.34,>=0.33.11
Downloading SQLAlchemy-Utils-0.33.11.tar.gz (128 kB)
Saved /tmp/dataflow-requirements-cache/SQLAlchemy-Utils-0.33.11.tar.gz
Collecting pg8000<2.0.0,>=1.12.4
Downloading pg8000-1.16.0.tar.gz (75 kB)
Saved /tmp/dataflow-requirements-cache/pg8000-1.16.0.tar.gz
Collecting PyMySQL<2.0.0,>=0.9.3
Downloading PyMySQL-0.9.3.tar.gz (75 kB)
Saved /tmp/dataflow-requirements-cache/PyMySQL-0.9.3.tar.gz
Collecting kafka>===1.3.5
Downloading kafka-1.3.5.tar.gz (227 kB)
Saved /tmp/dataflow-requirements-cache/kafka-1.3.5.tar.gz
Collecting google-cloud-core<2.0dev,>=1.0.0
Downloading google-cloud-core-1.3.0.tar.gz (32 kB)
Saved /tmp/dataflow-requirements-cache/google-cloud-core-1.3.0.tar.gz
Collecting google-resumable-media<0.5.0dev,>=0.3.1
Downloading google-resumable-media-0.4.1.tar.gz (2.1 MB)
Saved /tmp/dataflow-requirements-cache/google-resumable-media-0.4.1.tar.gz
Collecting protobuf>=3.6.0
Downloading protobuf-3.12.2.tar.gz (265 kB)
Saved /tmp/dataflow-requirements-cache/protobuf-3.12.2.tar.gz
Collecting crcmod<2.0,>=1.7
Downloading crcmod-1.7.tar.gz (89 kB)
Saved /tmp/dataflow-requirements-cache/crcmod-1.7.tar.gz
Collecting dill<0.3.2,>=0.3.1.1
Downloading dill-0.3.1.1.tar.gz (151 kB)
Saved /tmp/dataflow-requirements-cache/dill-0.3.1.1.tar.gz
Collecting fastavro<0.22,>=0.21.4
Downloading fastavro-0.21.24.tar.gz (496 kB)
Saved /tmp/dataflow-requirements-cache/fastavro-0.21.24.tar.gz
Collecting future<1.0.0,>=0.16.0
Downloading future-0.18.2.tar.gz (829 kB)
Saved /tmp/dataflow-requirements-cache/future-0.18.2.tar.gz
Collecting grpcio<2,>=1.12.1
Downloading grpcio-1.30.0.tar.gz (19.7 MB)
ERROR: Command errored out with exit status 1:
command: /env/bin/python3.7 -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-download-yjpzrbur/grpcio/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-download-yjpzrbur/grpcio/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\
\'"\'"\', \'"\'"\'\
\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' egg_info --egg-base /tmp/pip-download-yjpzrbur/grpcio/pip-egg-info
cwd: /tmp/pip-download-yjpzrbur/grpcio/
Complete output (11 lines):
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-download-yjpzrbur/grpcio/setup.py", line 196, in <module>
if check_linker_need_libatomic():
File "/tmp/pip-download-yjpzrbur/grpcio/setup.py", line 156, in check_linker_need_libatomic
stderr=PIPE)
File "/opt/python3.7/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/opt/python3.7/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: \'cc\': \'cc\'
----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
WARNING: You are using pip version 20.0.2; however, version 20.1.1 is available.
You should consider upgrading via the \'/env/bin/python3.7 -m pip install --upgrade pip\' command.
'
Logs in this case:
{
insertId: "000000-5e4c10f4-d542-4631-8aaa-b9306d1390fd"
labels: {
execution_id: "15jww0sd8uyz"
}
logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"
receiveTimestamp: "2020-07-13T14:01:33.505683371Z"
resource: {
labels: {
function_name: xxx"
project_id: "xxx"
region: "europe-west6"
}
type: "cloud_function"
}
severity: "DEBUG"
textPayload: "Function execution took 18984 ms, finished with status: 'crash'"
timestamp: "2020-07-13T14:01:32.953194652Z"
trace: "projects/xxx/traces/262224a3d230cd9a66b1eebba3d7c3e0"
}
From local machine Dataflow deployment works OK.
Command from logs:
python -m pip download --dest ./tmp -r ./requirements.txt --exists-action i --no-binary :all:
also works OK although it seems like downloading half of the internet for couple of minutes, even if I reduce requirements.txt to beam-nuggets==0.15.1 only.
It stucks on
grpcio-1.30.0.tar.gz (19.7 MB)
exactly during setup from this package, function:
def check_linker_need_libatomic():
"""Test if linker on system needs libatomic."""
code_test = (b'#include <atomic>\n' +
b'int main() { return std::atomic<int64_t>{}; }')
cc_test = subprocess.Popen(['cc', '-x', 'c++', '-std=c++11', '-'],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE)
cc_test.communicate(input=code_test)
return cc_test.returncode != 0