8

我正在尝试对一个将数据写入 S3 的函数进行单元测试,然后从相同的 S3 位置读取相同的数据。我正在尝试使用 a motoand boto(2.x) 来实现[1]。问题是服务返回我被禁止访问密钥 [2]。moto github 存储库 [3] 中报告了类似的问题(即使错误消息有点不同),但尚未解决。

有没有人在 PySpark 中成功测试过模拟 s3 读/写以分享一些见解?

[1]

import boto
from boto.s3.key import Key
from moto import mock_s3

_test_bucket = 'test-bucket'
_test_key = 'data.csv'

@pytest.fixture(scope='function')
def spark_context(request):
    conf = SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")
    sc = SparkContext(conf=conf)
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'test-access-key-id')
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'test-secret-access-key')
    request.addfinalizer(lambda: sc.stop())
    quiet_py4j(sc)
    return sc

spark_test = pytest.mark.usefixtures("spark_context")

@spark_test
@mock_s3
def test_tsv_read_from_and_write_to_s3(spark_context):
    spark = SQLContext(spark_context)

    s3_conn = boto.connect_s3()
    s3_bucket = s3_conn.create_bucket(_test_bucket)
    k = Key(s3_bucket)
    k.key = _test_key 
    k.set_contents_from_string('')    

    s3_uri = 's3n://{}/{}'.format(_test_bucket, _test_key)
    df = (spark
          .read
          .csv(s3_uri))

[2]

(...)
E py4j.protocol.Py4JJavaError: An error occurred while calling o33.csv.
E : org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/data.csv' - ResponseCode=403, ResponseMessage=Forbidden
(...)

[3] https://github.com/spulec/moto/issues/1543

4

0 回答 0