我正在尝试通过 spline 和 apacheatlas api 将 Databricks 笔记本谱系注册到 Azure Purview。代码有两个版本:1) 是使用 databricks 运行时版本 6.4 并且按预期工作的原始代码,但我们需要在至少 7.5 及更高版本的较新运行时版本中运行它,所以有 2) 第二个为运行时版本 7.5 重构的代码版本。具体来说,新代码需要新的 JSON 包,但输出(见下文)与原始代码的预期输出不匹配(如下所示)。需要重新编写代码才能正确执行,因为当前的新代码有错误。谢谢
原始旧代码:使用 Databricks Runtime version6.4 的原始代码如下
%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
//override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
//val test = dbutils.notebook.getContext.notebookPath
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"name" -> name,
"mounts" -> dbutils.fs.ls("/mnt").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
}
})
ORIGINAL EXPECTED OUTPUT:这是原始代码的预期 JSON 输出
{
'id': '618c92e1-ae79-491e-b6fd-b5080dc7ef8d',
'operations': {
'write': {
'outputSource': 'dbfs:/mnt/test_data/parquet/Customers_new',
'append': False,
'id': 0,
'childIds': [
1
],
'params': {
'path': 'dbfs:/mnt/test_data/parquet/Customers_new'
},
'extra': {
'name': 'InsertIntoHadoopFsRelationCommand',
'destinationType': 'Parquet',
'foo': 'bar4'
}
},
'reads': [
{
'inputSources': [
'dbfs:/mnt/test_data/csv/Customers.csv'
],
'id': 1,
'schema': [
'b6112e12-9b90-46db-b919-bcc9c6280759',
'9f0671fe-813e-4608-a870-adae8386c46e',
'8dfcf4df-211c-48b0-8dec-1b1486dd0db4'
],
'params': {
'delimiter': ',',
'inferschema': 'true',
'header': 'true',
'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
},
'extra': {
'name': 'LogicalRelation',
'sourceType': 'CSV',
'foo': 'bar3'
}
}
]
},
'systemInfo': {
'name': 'spark',
'version': '2.4.5'
},
'agentInfo': {
'name': 'spline',
'version': '0.5.3'
},
'extraInfo': {
'appName': 'Databricks Shell',
'dataTypes': [
{
'_typeHint': 'dt.Simple',
'id': 'df02093b-d529-4c8d-b422-9ac468baa765',
'name': 'integer',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': '88f773f8-982c-4d6c-bed3-1600a99c5943',
'name': 'string',
'nullable': True
}
],
'attributes': [
{
'id': 'b6112e12-9b90-46db-b919-bcc9c6280759',
'name': 'CustomerID',
'dataTypeId': 'df02093b-d529-4c8d-b422-9ac468baa765'
},
{
'id': '9f0671fe-813e-4608-a870-adae8386c46e',
'name': 'FirstName',
'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
},
{
'id': '8dfcf4df-211c-48b0-8dec-1b1486dd0db4',
'name': 'LastName',
'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
}
],
'notebookInfo': {
'obj': {
'name': 'initialize_spline_original',
'timestamp': 1623430575561,
'notebookURL': 'adb-2323242424.azuredatabricks.net/',
'mounts': [
'dbfs:/mnt/datalake/',
'dbfs:/mnt/landing_dde/',
'dbfs:/mnt/landing_newc/',
'dbfs:/mnt/landing_sourcedb/',
'dbfs:/mnt/testmount/',
'dbfs:/mnt/training/'
],
'user': 'user@test.com'
}
}
}
}
新代码:这是使用 Databricks 运行时版本 7.5 并升级和重构 JSON 包的新代码。
%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
val notebookInformationJson = dbutils.notebook.getContext.toJson
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val outerMap = mapper.readValue[Map[String, Object]](notebookInformationJson)
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> (notebookURL),
"user" -> (user),
"name" -> (name),
"mounts" -> (dbutils.fs.ls("/mnt").map(_.path)),
"timestamp" -> (System.currentTimeMillis))
val mapper1 = new ObjectMapper()
val notebookInfoJson = mapper1.writeValueAsString(notebookInfo)
override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
})
})
新输出:新代码的 JSON 对象输出如下所示:
{
'id': '12812131038144',
'operations': {
'write': {
'outputSource': 'dbfs:/mnt/test_data/delta/customers_join_orders_new',
'append': False,
'id': 0,
'childIds': [
1
],
'params': {
'path': 'dbfs:/mnt/test_data/delta/customers_join_orders_new'
},
'extra': {
'destinationType': 'tahoe',
'foo': 'bar4',
'name': 'SaveIntoDataSourceCommand'
}
},
'reads': [
{
'childIds': [
],
'inputSources': [
'dbfs:/mnt/test_data/csv/Orders.csv'
],
'id': 4,
'schema': [
'2018',
'2019',
'2020',
'2021',
'2022',
'2023'
],
'params': {
'inferschema': 'true',
'header': 'true',
'delimiter': ',',
'path': 'dbfs:/mnt/test_data/csv/Orders.csv'
},
'extra': {
'sourceType': 'csv',
'foo': 'bar3',
'name': 'LogicalRelation'
}
},
{
'childIds': [
],
'inputSources': [
'dbfs:/mnt/test_data/csv/Customers.csv'
],
'id': 6,
'schema': [
'2065',
'2066',
'2067'
],
'params': {
'inferschema': 'true',
'header': 'true',
'delimiter': ',',
'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
},
'extra': {
'sourceType': 'csv',
'foo': 'bar3',
'name': 'LogicalRelation'
}
}
],
'other': [
{
'id': 3,
'childIds': [
4
],
'schema': [
'2018',
'2019',
'2020',
'2021',
'2022',
'2023'
],
'params': {
'identifier': 'orders'
},
'extra': {
'foo': 'bar5',
'name': 'SubqueryAlias'
}
},
{
'id': 5,
'childIds': [
6
],
'schema': [
'2065',
'2066',
'2067'
],
'params': {
'identifier': 'customers'
},
'extra': {
'foo': 'bar5',
'name': 'SubqueryAlias'
}
},
{
'id': 2,
'childIds': [
3,
5
],
'schema': [
'2018',
'2019',
'2020',
'2021',
'2022',
'2023',
'2065',
'2066',
'2067'
],
'params': {
'condition': {
'_typeHint': 'expr.Binary',
'symbol': '=',
'dataTypeId': '05ffb715-9781-4019-aee3-21c77f80d2a1',
'children': [
{
'_typeHint': 'expr.AttrRef',
'refId': '2019'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2065'
}
]
},
'hint': '',
'joinType': 'INNER'
},
'extra': {
'foo': 'bar5',
'name': 'Join'
}
},
{
'id': 1,
'childIds': [
2
],
'schema': [
'2065',
'2020',
'2022',
'2023'
],
'params': {
'projectList': [
{
'_typeHint': 'expr.AttrRef',
'refId': '2065'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2020'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2022'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2023'
}
]
},
'extra': {
'foo': 'bar5',
'name': 'Project'
}
}
]
},
'systemInfo': {
'name': 'spark',
'version': '3.0.1'
},
'agentInfo': {
'name': 'spline',
'version': '0.6.0'
},
'extraInfo': {
'appName': 'Databricks Shell',
'dataTypes': [
{
'_typeHint': 'dt.Simple',
'id': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca',
'name': 'integer',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be',
'name': 'double',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4',
'name': 'string',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': '05ffb715-9781-4019-aee3-21c77f80d2a1',
'name': 'boolean',
'nullable': True
}
],
'notebookInfo': '{"traversableAgain":true,"empty":false}',
'attributes': [
{
'id': '2018',
'name': 'SalesOrderID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2019',
'name': 'CustomerID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2020',
'name': 'OrderQty',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2021',
'name': 'ProductID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2022',
'name': 'UnitPrice',
'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
},
{
'id': '2023',
'name': 'LineTotal',
'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
},
{
'id': '2065',
'name': 'CustomerID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2066',
'name': 'FirstName',
'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
},
{
'id': '2067',
'name': 'LastName',
'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
}
]
}
}