3

如何DataFrame在 PySpark 中为流式传输设置模式。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', '192.168.0.113')\
   .option('port', 5560)\
   .load()

例如我需要一个像这样的表:

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456    
Jack, Ma, 789456
....

如何将标头/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。

另外,是否可以连续显示此表,或者说DataFrame. 当我尝试它时,我得到了错误

"pyspark.sql.utils.AnalysisException: '当流式 DataFrames/Datasets 上没有流式聚合时,不支持完整输出模式;;\nProject"

4

1 回答 1

7

TextSocketSource不提供任何集成的解析选项。只能使用以下两种格式之一:

  • 时间戳和文本 ifincludeTimestamp设置为true使用以下模式:

    StructType([
        StructField("value", StringType()),
        StructField("timestamp", TimestampType())
    ])
    
  • text only ifincludeTimestamp设置为false使用如下所示的架构:

    StructType([StructField("value", StringType())]))
    

如果要更改此格式,则必须转换流以提取感兴趣的字段,例如使用正则表达式:

from pyspark.sql.functions import regexp_extract
from functools import partial

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)

lines.select(
    fields(idx=1).alias("name"),
    fields(idx=2).alias("last_name"), 
    fields(idx=3).alias("phone_number")
)
于 2016-12-29T19:17:15.477 回答