0

我的学生数据库在 Student 表中有每个学生的多条记录。

我正在将数据读入 Spark 数据框,然后遍历 Spark 数据框,为每个学生隔离记录并为每个学生记录做一些处理。

到目前为止我的代码:

from pyspark.sql import SparkSession
spark_session = SparkSession \
    .builder \
    .appName("app") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2") \
    .getOrCreate()

class_3A = spark_session.sql("SQL") 

for row in class_3A:
    #for each student
        #Print Name, Age and Subject Marks

我该怎么做呢?

4

4 回答 4

2

另一种方法是使用 SparkSQL

>>> df = spark.createDataFrame([('Ankit',25),('Jalfaizy',22),('Suresh',20),('Bala',26)],['name','age'])
>>> df.show()
+--------+---+                                                                  
|    name|age|
+--------+---+
|   Ankit| 25|
|Jalfaizy| 22|
|  Suresh| 20|
|    Bala| 26|
+--------+---+

>>> df.where('age > 20').show()
+--------+---+
|    name|age|
+--------+---+
|   Ankit| 25|
|Jalfaizy| 22|
|    Bala| 26|
+--------+---+

>>> from pyspark.sql.functions import *
>>> df.select('name', col('age') + 100).show()
+--------+-----------+
|    name|(age + 100)|
+--------+-----------+
|   Ankit|        125|
|Jalfaizy|        122|
|  Suresh|        120|
|    Bala|        126|
+--------+-----------+
于 2019-10-23T05:46:29.923 回答
1

命令式方法(除了 Bala 的 SQL 方法):

class_3A = spark_session.sql("SQL") 

def process_student(student_row):
    # Do Something with student_row
    return processed_student_row

#"isolate records for each student"
# Each student record will be passed to process_student function for processing.
# Results will be accumulated to a new DF - result_df
result_df = class_3A.map(process_student)

# If you don't care about results and just want to do some processing:
class_3A.foreach(process_student)
于 2019-10-23T11:12:06.950 回答
0

不确定我是否正确理解了这个问题,但如果你想根据任何列对行进行操作,你可以使用数据框函数来做到这一点。例子 :

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from  pyspark.sql import Window



sc = SparkSession.builder.appName("example").\
config("spark.driver.memory","1g").\
config("spark.executor.cores",2).\
config("spark.max.cores",4).getOrCreate()

df1 = sc.read.format("csv").option("header","true").load("test.csv")

w = Window.partitionBy("student_id")

df2 = df1.groupBy("student_id").agg(f.sum(df1["marks"]).alias("total"))
df3 = df1.withColumn("max_marks_inanysub",f.max(df1["marks"]).over(w))
df3 = df3.filter(df3["marks"] == df3["max_marks_inanysub"])


df1.show()
df3.show()

样本数据

student_id,主题,标记 1,数学,3 1,科学,6 2,数学,4 2,科学,7

输出

+----------+--------+-----+ |student_id|subject|marks| +---------+-------+-----+ | 1| 数学| 3| | 1|科学| 6| | 2| 数学| 4| | 2|科学| 7| +---------+--------+------+

+---------+--------+-----+------------------+ |student_id|subject|标记|max_marks_inanysub| +---------+-------+-----+------------------+ | 1|科学| 6| 6| | 2|科学| 7| 7| +---------+--------+------+------------------+

于 2019-10-23T14:41:45.463 回答
0

您可以遍历数据框中的每条记录并使用列名访问它们

from pyspark.sql import Row
from pyspark.sql.functions import *
l = [('Ankit',25),('Jalfaizy',22),('Suresh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = spark.createDataFrame(people)

schemaPeople.show(10, False)

for row in schemaPeople.rdd.collect():
  print("Hi " + str(row.name) + " your age is : " + str(row.age) )

这将产生如下输出

+---+--------+
|age|name    |
+---+--------+
|25 |Ankit   |
|22 |Jalfaizy|
|20 |Suresh  |
|26 |Bala    |
+---+--------+

Hi Ankit your age is : 25
Hi Jalfaizy your age is : 22
Hi Suresh your age is : 20
Hi Bala your age is : 26

因此,您可以对数据帧的每条记录进行处理或一些需要执行的逻辑。

于 2019-10-23T05:05:52.387 回答