3

我使用 MongoDB API 创建了一个 azure CosmosDB 帐户。我需要将 CosmosDB(MongoDB API) 连接到 Azure Databricks 集群,以便从 cosmos 读取和写入数据。

如何将 Azure Databricks 群集连接到 CosmosDB 帐户?

4

3 回答 3

1

这是我使用 Azure Databricks(5.2 ML Beta(包括 Apache Spark 2.4.0、Scala 2.11)和 MongoDB 连接器的 MongoDB API 连接到 CosmosDB 数据库的 pyspark 代码:org.mongodb.spark:mongo-spark-连接器_2.11:2.4.0):

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource") \
  .option("uri", CONNECTION_STRING) \
  .load()

使用如下所示的 CONNECTION_STRING:“mongodb://USERNAME:PASSWORD@testgp.documents.azure.com:10255/ DATABASE_NAME.COLLECTION_NAME ?ssl=true&replicaSet=globaldb”

我尝试了许多不同的其他选项(将数据库和集合名称添加为 SparkSession 的选项或配置),但均未成功。告诉我它是否适合你...

于 2019-01-22T13:08:18.030 回答
0

添加org.mongodb.spark:mongo-spark-connector_2.11:2.4.0软件包后,这对我有用:

import json

query = {
  '$limit': 100,
}

query_config = {
  'uri': 'myConnectionString'
  'database': 'myDatabase',
  'collection': 'myCollection',
  'pipeline': json.dumps(query),
}

df = spark.read.format("com.mongodb.spark.sql") \
  .options(**query_config) \
  .load()

但是,我确实在某些集合中遇到了这个错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.139.64.6, executor 0): com.mongodb.MongoInternalException: The reply message length 10168676 is less than the maximum message length 4194304
于 2019-02-17T13:59:40.273 回答
0

以我对自己的问题所做的相同方式回答。

使用MAVEN作为源,我使用路径将正确的库安装到我的集群

org.mongodb.spark:mongo-spark-connector_2.11:2.4.0

火花 2.4

我使用的代码示例如下(对于那些想尝试的人):

# Read Configuration
readConfig = {
    "URI": "<URI>",
    "Database": "<database>",
    "Collection": "<collection>",
  "ReadingBatchSize" : "<batchSize>"
  }


pipelineAccounts = "{'$sort' : {'account_contact': 1}}"

# Connect via azure-cosmosdb-spark to create Spark DataFrame 
accountsTest = (spark.read.
                 format("com.mongodb.spark.sql").
                 options(**readConfig).
                 option("pipeline", pipelineAccounts).
                 load())

accountsTest.select("account_id").show()
于 2020-07-01T17:48:35.227 回答