1

我目前是 .NET for Spark 的新手,需要将 C# 列表附加到增量表。我假设我首先需要创建一个 Spark DataFrame 来执行此操作。在示例代码中,我将如何将“名称”附加到数据框“df”?

现在看来,这已被弃用(https://github.com/Microsoft/Mobius),使用 RDD 不适用于新版本(https://github.com/dotnet/spark

using System.Collections.Generic;
using Microsoft.Spark.Sql;

namespace HelloSpark
{
    class Program
    {
        static void Main(string[] args)
        {
            var spark = SparkSession.Builder().GetOrCreate();
            var df = spark.Read().Json("people.json");
            df.Show();

            var names = new List<string> { "john", "20" };

        }
    }
}

示例文件 people.json 如下所示:

{"name":"Michael"}
{"name":"Andy", "age":"30"}
{"name":"Justin", "age":"19"}
4

2 回答 2

0

您现在可以在 .NET 中为 Apache Spark 创建一个数据框(在编写此问题时您不能)。

为此,您传入一个 GenericRow 数组,该数组为每列获取一个对象数组。您还需要定义架构:


using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;

namespace CreateDataFrame
{
    class Program
    {
        static void Main(string[] args)
        {
            var spark = SparkSession.Builder().GetOrCreate();
            
            var df = spark.Read().Json("people.json");
            df.Show();

            var names = new List<string> { "john", "20" };

            var newNamesDataFrame = spark.CreateDataFrame(
                new List<GenericRow>{new GenericRow(names.ToArray())},
                    new StructType(
                    new List<StructField>()
                    {
                        new StructField("name", new StringType()),
                        new StructField("age", new StringType())
                    }));
            
            newNamesDataFrame.Union(df).Show();
        }
    }
}

现在您有了数据框,您可以使用它来编写它DataFrameWriter.Write.Format("delta").Save("/Path/ToFile")

于 2020-09-09T12:02:57.827 回答
0

您需要使用列表创建另一个数据框并将其与原始数据框合并。完成后,您可以将其写入外部存储。您可以根据下面的 Psuedo 代码查找对应的 C# api

 var names = new List<string> { "john", "20" };
 // Create a Dataframe using this list
 // In scala you can do spark.createDataFrame using the list.
 var newdf = spark.createDataFrame(names,yourschemaclass)
 // union it with original df
 var joineddf = df.union(newdf)
 // write to external storage if you want
 joineddf.write()
于 2019-08-07T17:47:46.887 回答