情感测试
情感测试

您现在的位置: 情感测试简介_情感测试玩法 > 情感测试玩法 > PySpark入门级学习教程,框架思维

PySpark入门级学习教程,框架思维

发布时间:2021-7-16 15:35:12   点击数:

这周工作好忙,晚上陆陆续续写了好几波,周末来一次集合输出,不过这个PySpark原定是分上下两篇的,但是越学感觉越多,所以就分成了3Parts,今天这一part主要就是讲一下SparkSQL,这个实在好用!建议收藏学习哈哈。上一节的可点击回顾下哈。《PySpark入门级学习教程,框架思维(上)》

??SparkSQL使用

在讲SparkSQL前,先解释下这个模块。这个模块是Spark中用来处理结构化数据的,提供一个叫SparkDataFrame的东西并且自动解析为分布式SQL查询数据。我们之前用过Python的Pandas库,也大致了解了DataFrame,这个其实和它没有太大的区别,只是调用的API可能有些不同罢了。

我们通过使用SparkSQL来处理数据,会让我们更加地熟悉,比如可以用SQL语句、用SparkDataFrame的API或者DatasetsAPI,我们可以按照需求随心转换,通过SparkDataFrameAPI和SQL写的逻辑,会被Spark优化器Catalyst自动优化成RDD,即便写得不好也可能运行得很快(如果是直接写RDD可能就挂了哈哈)。

创建SparkDataFrame

开始讲SparkDataFrame,我们先学习下几种创建的方法,分别是使用RDD来创建、使用python的DataFrame来创建、使用List来创建、读取数据文件来创建、通过读取数据库来创建。

1.使用RDD来创建

主要使用RDD的toDF方法。

rdd=sc.parallelize([("Sam",28,88),("Flora",28,90),("Run",1,60)])df=rdd.toDF(["name","age","score"])df.show()df.printSchema()#+-----+---+-----+#

name

age

score

#+-----+---+-----+#

Sam

28

88

#

Flora

28

90

#

Run

1

60

#+-----+---+-----+#root#

--name:string(nullable=true)#

--age:long(nullable=true)#

--score:long(nullable=true)2.使用python的DataFrame来创建

df=pd.DataFrame([[Sam,28,88],[Flora,28,90],[Run,1,60]],columns=[name,age,score])print("打印DataFrame:")print(df)print("\n")Spark_df=spark.createDataFrame(df)print("打印SparkDataFrame:")Spark_df.show()#打印DataFrame:#nameagescore#0Sam#1Flora#2Run#打印SparkDataFrame:#+-----+---+-----+#

name

age

score

#+-----+---+-----+#

Sam

28

88

#

Flora

28

90

#

Run

1

60

#+-----+---+-----+3.使用List来创建

list_values=[[Sam,28,88],[Flora,28,90],[Run,1,60]]Spark_df=spark.createDataFrame(list_values,[name,age,score])Spark_df.show()#+-----+---+-----+#

name

age

score

#+-----+---+-----+#

Sam

28

88

#

Flora

28

90

#

Run

1

60

#+-----+---+-----+4.读取数据文件来创建

#4.1CSV文件df=spark.read.option("header","true")\.option("inferSchema","true")\.option("delimiter",",")\.csv("./test/data/titanic/train.csv")df.show(5)df.printSchema()#4.2json文件df=spark.read.json("./test/data/hello_samshare.json")df.show(5)df.printSchema()5.通过读取数据库来创建

#5.1读取hive数据spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATHdata/kv1.txtINTOTABLEsrc")df=spark.sql("SELECTkey,valueFROMsrcWHEREkey10ORDERBYkey")df.show(5)#5.2读取mysql数据url="jdbc:mysql://localhost:/test"df=spark.read.format("jdbc")\.option("url",url)\.option("dbtable","runoob_tbl")\.option("user","root")\.option("password","")\.load()\df.show()常用的SparkDataFrameAPI

这里我大概是分成了几部分来看这些APIs,分别是查看DataFrame的APIs、简单处理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路变换操作APIs、DataFrame的一些统计操作APIs,这样子也有助于我们了解这些API的功能,以后遇见实际问题的时候可以解决。

首先我们这小节全局用到的数据集如下:

frompyspark.sqlimportfunctionsasFfrompyspark.sqlimportSparkSession#SparkSQL的许多功能封装在SparkSession的方法接口中,SparkContext则不行的。spark=SparkSession.builder\.appName("sam_SamShare")\.config("master","local[4]")\.enableHiveSupport()\.getOrCreate()sc=spark.sparkContext#创建一个SparkDataFramerdd=sc.parallelize([("Sam",28,88,"M"),("Flora",28,90,"F"),("Run",1,60,None),("Peter",55,,"M"),("Mei",54,95,"F")])df=rdd.toDF(["name","age","score","sex"])df.show()df.printSchema()#+-----+---+-----+----+#

name

age

score

sex

#+-----+---+-----+----+#

Sam

28

88

M

#

Flora

28

90

F

#

Run

1

60

null

#

Peter

55

M

#

Mei

54

95

F

#+-----+---+-----+----+#root#

--name:string(nullable=true)#

--age:long(nullable=true)#

--score:long(nullable=true)#

--sex:string(nullable=true)1、查看DataFrame的APIs

#DataFrame.collect#以列表形式返回行df.collect()#[Row(name=Sam,age=28,score=88,sex=M),#Row(name=Flora,age=28,score=90,sex=F),#Row(name=Run,age=1,score=60,sex=None),#Row(name=Peter,age=55,score=,sex=M),#Row(name=Mei,age=54,score=95,sex=F)]#DataFrame.countdf.count()#5#DataFrame.columnsdf.columns#[name,age,score,sex]#DataFrame.dtypesdf.dtypes#[(name,string),(age,bigint),(score,bigint),(sex,string)]#DataFrame.describe#返回列的基础统计信息df.describe([age]).show()#+-------+------------------+#

summary

age

#+-------+------------------+#

count

5

#

mean

33.2

#

stddev

22.

#

min

1

#

max

55

#+-------+------------------+df.describe().show()#+-------+-----+------------------+------------------+----+#

summary

name

age

score

sex

#+-------+-----+------------------+------------------+----+#

count

5

5

5

4

#

mean

null

33.2

86.6

null

#

stddev

null

22.

15.

null

#

min

Flora

1

60

F

#

max

Sam

55

M

#+-------+-----+------------------+------------------+----+#DataFrame.select#选定指定列并按照一定顺序呈现df.select("sex","score").show()#DataFrame.first#DataFrame.head#查看第1条数据df.first()#Row(name=Sam,age=28,score=88,sex=M)df.head(1)#[Row(name=Sam,age=28,score=88,sex=M)]#DataFrame.freqItems#查看指定列的枚举值df.freqItems(["age","sex"]).show()#+---------------+-------------+#

age_freqItems

sex_freqItems

#+---------------+-------------+#

[55,1,28,54]

[M,F,]

#+---------------+-------------+#DataFrame.summarydf.summary().show()#+-------+-----+------------------+------------------+----+#

summary

name

age

score

sex

#+-------+-----+------------------+------------------+----+#

count

5

5

5

4

#

mean

null

33.2

86.6

null

#

stddev

null

22.

15.

null

#

min

Flora

1

60

F

#

25%

null

28

88

null

#

50%

null

28

90

null

#

75%

null

54

95

null

#

max

Sam

55

M

#+-------+-----+------------------+------------------+----+#DataFrame.sample#按照一定规则从df随机抽样数据df.sample(0.5).show()#+-----+---+-----+----+#

name

age

score

sex

#+-----+---+-----+----+#

Sam

28

88

M

#

Run

1

60

null

#

Peter

55

M

#+-----+---+-----+----+2、简单处理DataFrame的APIs

#DataFrame.distinct#对数据集进行去重df.distinct().show()#DataFrame.dropDuplicates#对指定列去重df.dropDuplicates(["sex"]).show()#+-----+---+-----+----+#

name

age

score

sex

#+-----+---+-----+----+#

Flora

28

90

F

#

Run

1

60

null

#

Sam

28

88

M

#+-----+---+-----+----+#DataFrame.exceptAll#DataFrame.subtract#根据指定的df对df进行去重df1=spark.createDataFrame([("a",1),("a",1),("b",3),("c",4)],["C1","C2"])df2=spark.createDataFrame([("a",1),("b",3)],["C1","C2"])df3=df1.exceptAll(df2)#没有去重的功效df4=df1.subtract(df2)#有去重的奇效df1.show()df2.show()df3.show()df4.show()#+---+---+#

C1

C2

#+---+---+#

a

1

#

a

1

#

b

3

#

c

4

#+---+---+#+---+---+#

C1

C2

#+---+---+#

a

1

#

b

3

#+---+---+#+---+---+#

C1

C2

#+---+---+#

a

1

#

c

4

#+---+---+#+---+---+#

C1

C2

#+---+---+#

c

4

#+---+---+#DataFrame.intersectAll#返回两个DataFrame的交集df1=spark.createDataFrame([("a",1),("a",1),("b",3),("c",4)],["C1","C2"])df2=spark.createDataFrame([("a",1),("b",4)],["C1","C2"])df1.intersectAll(df2).show()#+---+---+#

C1

C2

#+---+---+#

a

1

#+---+---+#DataFrame.drop#丢弃指定列df.drop(age).show()#DataFrame.withColumn#新增列df1=df.withColumn("birth_year",-df.age)df1.show()#+-----+---+-----+----+----------+#

name

age

score

sex

birth_year

#+-----+---+-----+----+----------+#

Sam

28

88

M

#

Flora

28

90

F

#

Run

1

60

null

#

Peter

55

M

1

#

Mei

54

95

F

#+-----+---+-----+----+----------+#DataFrame.withColumnRenamed#重命名列名df1=df.withColumnRenamed("sex","gender")df1.show()#+-----+---+-----+------+#

name

age

score

gender

#+-----+---+-----+------+#

Sam

28

88

M

#

Flora

28

90

F

#

Run

1

60

null

#

Peter

55

M

#

Mei

54

95

F

#+-----+---+-----+------+#DataFrame.dropna#丢弃空值,DataFrame.dropna(how=any,thresh=None,subset=None)df.dropna(how=all,subset=[sex]).show()#+-----+---+-----+---+#

name

age

score

sex

#+-----+---+-----+---+#

Sam

28

88

M

#

Flora

28

90

F

#

Peter

55

M

#

Mei

54

95

F

#+-----+---+-----+---+#DataFrame.fillna#空值填充操作df1=spark.createDataFrame([("a",None),("a",1),(None,3),("c",4)],["C1","C2"])#df2=df1.na.fill({"C1":"d","C2":99})df2=df1.fillna({"C1":"d","C2":99})df1.show()df2.show()#DataFrame.filter#根据条件过滤df.filter(df.age50).show()#+-----+---+-----+---+#

name

age

score

sex

#+-----+---+-----+---+#

Peter

55

M

#

Mei

54

95

F

#+-----+---+-----+---+df.where(df.age==28).show()#+-----+---+-----+---+#

name

age

score

sex

#+-----+---+-----+---+#

Sam

28

88

M

#

Flora

28

90

F

#+-----+---+-----+---+df.filter("age18").show()#+----+---+-----+----+#

name

age

score

sex

#+----+---+-----+----+#

Run

1

60

null

#+----+---+-----+----+#DataFrame.join#这个不用多解释了,直接上案例来看看具体的语法即可,DataFrame.join(other,on=None,how=None)df1=spark.createDataFrame([("a",1),("d",1),("b",3),("c",4)],["id","num1"])df2=spark.createDataFrame([("a",1),("b",3)],["id","num2"])df1.join(df2,df1.id==df2.id,left).select(df1.id.alias("df1_id"),df1.num1.alias("df1_num"),df2.num2.alias("df2_num")).sort(["df1_id"],ascending=False)\.show()#DataFrame.agg(*exprs)#聚合数据,可以写多个聚合方法,如果不写groupBy的话就是对整个DF进行聚合#DataFrame.alias#设置列或者DataFrame别名#DataFrame.groupBy#根据某几列进行聚合,如有多列用列表写在一起,如df.groupBy(["sex","age"])df.groupBy("sex").agg(F.min(df.age).alias("最小年龄"),F.expr("avg(age)").alias("平均年龄"),F.expr("collect_list(name)").alias("姓名集合")).show()#+----+--------+--------+------------+#

sex

最小年龄

平均年龄

姓名集合

#+----+--------+--------+------------+#

F

28

41.0

[Flora,Mei]

#

null

1

1.0

[Run]

#

M

28

41.5

[Sam,Peter]

#+----+--------+--------+------------+#DataFrame.foreach#对每一行进行函数方法的应用deff(person):print(person.name)df.foreach(f)#Peter#Run#Sam#Flora#Mei#DataFrame.replace#修改df里的某些值df1=df.na.replace({"M":"Male","F":"Female"})df1.show()#DataFrame.union#相当于SQL里的unionall操作df1=spark.createDataFrame([("a",1),("d",1),("b",3),("c",4)],["id","num"])df2=spark.createDataFrame([("a",1),("b",3)],["id","num"])df1.union(df2).show()df1.unionAll(df2).show()#这里union没有去重,不知道为啥,有知道的朋友麻烦解释下,谢谢了。#+---+---+#

id

num

#+---+---+#

a

1

#

d

1

#

b

3

#

c

4

#

a

1

#

b

3

#+---+---+#DataFrame.unionByName#根据列名来进行合并数据集df1=spark.createDataFrame([[1,2,3]],["col0","col1","col2"])df2=spark.createDataFrame([[4,5,6]],["col1","col2","col0"])df1.unionByName(df2).show()#+----+----+----+#

col0

col1

col2

#+----+----+----+#

1

2

3

#

6

4

5

#+----+----+----+3、DataFrame的列操作APIs

这里主要针对的是列进行操作,比如说重命名、排序、空值判断、类型判断等,这里就不展开写demo了,看看语法应该大家都懂了。

Column.alias(*alias,**kwargs)#重命名列名Column.asc()#按照列进行升序排序Column.desc()#按照列进行降序排序Column.astype(dataType)#类型转换Column.cast(dataType)#强制转换类型Column.between(lowerBound,upperBound)#返回布尔值,是否在指定区间范围内Column.contains(other)#是否包含某个关键词Column.endswith(other)#以什么结束的值,如df.filter(df.name.endswith(ice)).collect()Column.isNotNull()#筛选非空的行Column.isNull()Column.isin(*cols)#返回包含某些值的行df[df.name.isin("Bob","Mike")].collect()Column.like(other)#返回含有关键词的行Column.when(condition,value)#给True的赋值Column.otherwise(value)#与when搭配使用,df.select(df.name,F.when(df.age3,1).otherwise(0)).show()Column.rlike(other)#可以使用正则的匹配df.filter(df.name.rlike(ice$)).collect()Column.startswith(other)#df.filter(df.name.startswith(Al)).collect()Column.substr(startPos,length)#df.select(df.name.substr(1,3).alias("col")).collect()4、DataFrame的一些思路变换操作APIs

#DataFrame.createOrReplaceGlobalTempView#DataFrame.dropGlobalTempView#创建全局的试图,注册后可以使用sql语句来进行操作,生命周期取决于Sparkapplication本身df.createOrReplaceGlobalTempView("people")spark.sql("select*fromglobal_temp.peoplewheresex=M").show()#+-----+---+-----+---+#

name

age

score

sex

#+-----+---+-----+---+#

Sam

28

88

M

#

Peter

55

M

#+-----+---+-----+---+#DataFrame.createOrReplaceTempView#DataFrame.dropTempView#创建本地临时试图,生命周期取决于用来创建此数据集的SparkSessiondf.createOrReplaceTempView("tmp_people")spark.sql("select*fromtmp_peoplewheresex=F").show()#+-----+---+-----+---+#

name

age

score

sex

#+-----+---+-----+---+#

Flora

28

90

F

#

Mei

54

95

F

#+-----+---+-----+---+#DataFrame.cache\DataFrame.persist#可以把一些数据放入缓存中,defaultstoragelevel(MEMORY_AND_DISK).df.cache()df.persist()df.unpersist()#DataFrame.crossJoin#返回两个DataFrame的笛卡尔积关联的DataFramedf1=df.select("name","sex")df2=df.select("name","sex")df3=df1.crossJoin(df2)print("表1的记录数",df1.count())print("表2的记录数",df2.count())print("笛卡尔积后的记录数",df3.count())#表1的记录数5#表2的记录数5#笛卡尔积后的记录数25#DataFrame.toPandas#把SparkDataFrame转为Pandas的DataFramedf.toPandas()#DataFrame.rdd#把SparkDataFrame转为rdd,这样子可以用rdd的语法来操作数据df.rdd5、DataFrame的一些统计操作APIs

#DataFrame.cov#计算指定两列的样本协方差df.cov("age","score")#.97#DataFrame.corr#计算指定两列的相关系数,DataFrame.corr(col1,col2,method=None),目前method只支持Pearson相关系数df.corr("age","score",method="pearson")#0.#DataFrame.cube#创建多维度聚合的结果,通常用于分析数据,比如我们指定两个列进行聚合,比如name和age,那么这个函数返回的聚合结果会#groupby("name","age")#groupby("name")#groupby("age")#groupby(all)#四个聚合结果的unionall的结果df1=df.filter(df.name!="Run")print(df1.show())df1.cube("name","sex").count().show()#+-----+---+-----+---+#

name

age

score

sex

#+-----+---+-----+---+#

Sam

28

88

M

#

Flora

28

90

F

#

Peter

55

M

#

Mei

54

95

F

#+-----+---+-----+---+#cube聚合之后的结果#+-----+----+-----+#

name

sex

count

#+-----+----+-----+#

null

F

2

#

null

null

4

#

Flora

null

1

#

Peter

null

1

#

null

M

2

#

Peter

M

1

#

Sam

M

1

#

Sam

null

1

#

Mei

F

1

#

Mei

null

1

#

Flora

F

1

#+-----+----+-----+

嘻嘻,恭喜你读完啦,奖励你一首歌,一起加油。

预览时标签不可点收录于话题#个上一篇下一篇

转载请注明:http://www.zmax-alibaba.com/qgwf/137634.html

网站简介 | 发布优势 | 服务条款 | 隐私保护 | 广告合作 | 合作伙伴 | 版权申明 | 网站地图

当前时间: