我使用 MongoDB API 创建了一个 azure CosmosDB 帐户。我需要将 CosmosDB(MongoDB API) 连接到 Azure Databricks 集群,以便从 cosmos 读取和写入数据。
如何将 Azure Databricks 群集连接到 CosmosDB 帐户?
我使用 MongoDB API 创建了一个 azure CosmosDB 帐户。我需要将 CosmosDB(MongoDB API) 连接到 Azure Databricks 集群,以便从 cosmos 读取和写入数据。
如何将 Azure Databricks 群集连接到 CosmosDB 帐户?
这是我使用 Azure Databricks(5.2 ML Beta(包括 Apache Spark 2.4.0、Scala 2.11)和 MongoDB 连接器的 MongoDB API 连接到 CosmosDB 数据库的 pyspark 代码:org.mongodb.spark:mongo-spark-连接器_2.11:2.4.0):
from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("uri", CONNECTION_STRING) \
.load()
使用如下所示的 CONNECTION_STRING:“mongodb://USERNAME:PASSWORD@testgp.documents.azure.com:10255/ DATABASE_NAME.COLLECTION_NAME ?ssl=true&replicaSet=globaldb”
我尝试了许多不同的其他选项(将数据库和集合名称添加为 SparkSession 的选项或配置),但均未成功。告诉我它是否适合你...
添加org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
软件包后,这对我有用:
import json
query = {
'$limit': 100,
}
query_config = {
'uri': 'myConnectionString'
'database': 'myDatabase',
'collection': 'myCollection',
'pipeline': json.dumps(query),
}
df = spark.read.format("com.mongodb.spark.sql") \
.options(**query_config) \
.load()
但是,我确实在某些集合中遇到了这个错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.139.64.6, executor 0): com.mongodb.MongoInternalException: The reply message length 10168676 is less than the maximum message length 4194304
以我对自己的问题所做的相同方式回答。
使用MAVEN作为源,我使用路径将正确的库安装到我的集群
org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
火花 2.4
我使用的代码示例如下(对于那些想尝试的人):
# Read Configuration
readConfig = {
"URI": "<URI>",
"Database": "<database>",
"Collection": "<collection>",
"ReadingBatchSize" : "<batchSize>"
}
pipelineAccounts = "{'$sort' : {'account_contact': 1}}"
# Connect via azure-cosmosdb-spark to create Spark DataFrame
accountsTest = (spark.read.
format("com.mongodb.spark.sql").
options(**readConfig).
option("pipeline", pipelineAccounts).
load())
accountsTest.select("account_id").show()