1

如何在PySpark中设置流式传输DataFrame的模式。使用套接字进行火花传输,设置SCHEMA,在控制台中显示DATAFRAME

from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode 
from pyspark.sql.functions import split 
# Import data types 
from pyspark.sql.types import * 

spark = SparkSession\ 
    .builder\ 
    .appName("StructuredNetworkWordCount")\ 
    .getOrCreate() 

# Create DataFrame representing the stream of input lines from connection to localhost:5560 
lines = spark\ 
    .readStream\ 
    .format('socket')\ 
    .option('host', '192.168.0.113')\ 
    .option('port', 5560)\ 
    .load() 

比如我需要一个像一个表:

Name, lastName, PhoneNumber  
Bob, Dylan, 123456  
Jack, Ma, 789456 
.... 

如何设置页眉/模式为[ '姓名', 'lastName的', '******中国'] 与它们的数据类型。

此外,是否有可能持续显示此表,或说是DataFrame的前20行。当我尝试了,我得到了错误

“pyspark.sql.utils.AnalysisException:不支持“完全输出模式时,有上的流DataFrames /数据集;; \ nProject没有流聚合”

回答

4

TextSocketSource不提供任何集成解析选项。只有可以使用两种格式之一:

  • 时间戳和文本,如果includeTimestamp设置为true下面的模式:

    StructType([ 
        StructField("value", StringType()), 
        StructField("timestamp", TimestampType()) 
    ]) 
    
  • 文本只有includeTimestamp设置为false与如下所示的模式:

    StructType([StructField("value", StringType())])) 
    

如果要更改此格式,您必须将流转换为提取感兴趣区域,例如使用正则表达式:

from pyspark.sql.functions import regexp_extract 
from functools import partial 

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$" 
) 

lines.select(
    fields(idx=1).alias("name"), 
    fields(idx=2).alias("last_name"), 
    fields(idx=3).alias("phone_number") 
) 
相关问题