这周工作好忙,晚上陆陆续续写了好几波,周末来一次集合输出,不过这个PySpark原定是分上下两篇的,但是越学感觉越多,所以就分成了3Parts,今天这一part主要就是讲一下SparkSQL,这个实在好用!建议收藏学习哈哈。上一节的可点击回顾下哈。《PySpark入门级学习教程,框架思维(上)》
??SparkSQL使用在讲SparkSQL前,先解释下这个模块。这个模块是Spark中用来处理结构化数据的,提供一个叫SparkDataFrame的东西并且自动解析为分布式SQL查询数据。我们之前用过Python的Pandas库,也大致了解了DataFrame,这个其实和它没有太大的区别,只是调用的API可能有些不同罢了。
我们通过使用SparkSQL来处理数据,会让我们更加地熟悉,比如可以用SQL语句、用SparkDataFrame的API或者DatasetsAPI,我们可以按照需求随心转换,通过SparkDataFrameAPI和SQL写的逻辑,会被Spark优化器Catalyst自动优化成RDD,即便写得不好也可能运行得很快(如果是直接写RDD可能就挂了哈哈)。
创建SparkDataFrame开始讲SparkDataFrame,我们先学习下几种创建的方法,分别是使用RDD来创建、使用python的DataFrame来创建、使用List来创建、读取数据文件来创建、通过读取数据库来创建。
1.使用RDD来创建主要使用RDD的toDF方法。
rdd=sc.parallelize([("Sam",28,88),("Flora",28,90),("Run",1,60)])df=rdd.toDF(["name","age","score"])df.show()df.printSchema()#+-----+---+-----+#
name
age
score
#+-----+---+-----+#
Sam
28
88
#
Flora
28
90
#
Run
1
60
#+-----+---+-----+#root#
--name:string(nullable=true)#
--age:long(nullable=true)#
--score:long(nullable=true)2.使用python的DataFrame来创建
df=pd.DataFrame([[Sam,28,88],[Flora,28,90],[Run,1,60]],columns=[name,age,score])print("打印DataFrame:")print(df)print("\n")Spark_df=spark.createDataFrame(df)print("打印SparkDataFrame:")Spark_df.show()#打印DataFrame:#nameagescore#0Sam#1Flora#2Run#打印SparkDataFrame:#+-----+---+-----+#
name
age
score
#+-----+---+-----+#
Sam
28
88
#
Flora
28
90
#
Run
1
60
#+-----+---+-----+3.使用List来创建
list_values=[[Sam,28,88],[Flora,28,90],[Run,1,60]]Spark_df=spark.createDataFrame(list_values,[name,age,score])Spark_df.show()#+-----+---+-----+#
name
age
score
#+-----+---+-----+#
Sam
28
88
#
Flora
28
90
#
Run
1
60
#+-----+---+-----+4.读取数据文件来创建
#4.1CSV文件df=spark.read.option("header","true")\.option("inferSchema","true")\.option("delimiter",",")\.csv("./test/data/titanic/train.csv")df.show(5)df.printSchema()#4.2json文件df=spark.read.json("./test/data/hello_samshare.json")df.show(5)df.printSchema()5.通过读取数据库来创建
#5.1读取hive数据spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATHdata/kv1.txtINTOTABLEsrc")df=spark.sql("SELECTkey,valueFROMsrcWHEREkey10ORDERBYkey")df.show(5)#5.2读取mysql数据url="jdbc:mysql://localhost:/test"df=spark.read.format("jdbc")\.option("url",url)\.option("dbtable","runoob_tbl")\.option("user","root")\.option("password","")\.load()\df.show()常用的SparkDataFrameAPI
这里我大概是分成了几部分来看这些APIs,分别是查看DataFrame的APIs、简单处理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路变换操作APIs、DataFrame的一些统计操作APIs,这样子也有助于我们了解这些API的功能,以后遇见实际问题的时候可以解决。
首先我们这小节全局用到的数据集如下:
frompyspark.sqlimportfunctionsasFfrompyspark.sqlimportSparkSession#SparkSQL的许多功能封装在SparkSession的方法接口中,SparkContext则不行的。spark=SparkSession.builder\.appName("sam_SamShare")\.config("master","local[4]")\.enableHiveSupport()\.getOrCreate()sc=spark.sparkContext#创建一个SparkDataFramerdd=sc.parallelize([("Sam",28,88,"M"),("Flora",28,90,"F"),("Run",1,60,None),("Peter",55,,"M"),("Mei",54,95,"F")])df=rdd.toDF(["name","age","score","sex"])df.show()df.printSchema()#+-----+---+-----+----+#
name
age
score
sex
#+-----+---+-----+----+#
Sam
28
88
M
#
Flora
28
90
F
#
Run
1
60
null
#
Peter
55
M
#
Mei
54
95
F
#+-----+---+-----+----+#root#
--name:string(nullable=true)#
--age:long(nullable=true)#
--score:long(nullable=true)#
--sex:string(nullable=true)1、查看DataFrame的APIs
#DataFrame.collect#以列表形式返回行df.collect()#[Row(name=Sam,age=28,score=88,sex=M),#Row(name=Flora,age=28,score=90,sex=F),#Row(name=Run,age=1,score=60,sex=None),#Row(name=Peter,age=55,score=,sex=M),#Row(name=Mei,age=54,score=95,sex=F)]#DataFrame.countdf.count()#5#DataFrame.columnsdf.columns#[name,age,score,sex]#DataFrame.dtypesdf.dtypes#[(name,string),(age,bigint),(score,bigint),(sex,string)]#DataFrame.describe#返回列的基础统计信息df.describe([age]).show()#+-------+------------------+#
summary
age
#+-------+------------------+#
count
5
#
mean
33.2
#
stddev
22.
#
min
1
#
max
55
#+-------+------------------+df.describe().show()#+-------+-----+------------------+------------------+----+#
summary
name
age
score
sex
#+-------+-----+------------------+------------------+----+#
count
5
5
5
4
#
mean
null
33.2
86.6
null
#
stddev
null
22.
15.
null
#
min
Flora
1
60
F
#
max
Sam
55
M
#+-------+-----+------------------+------------------+----+#DataFrame.select#选定指定列并按照一定顺序呈现df.select("sex","score").show()#DataFrame.first#DataFrame.head#查看第1条数据df.first()#Row(name=Sam,age=28,score=88,sex=M)df.head(1)#[Row(name=Sam,age=28,score=88,sex=M)]#DataFrame.freqItems#查看指定列的枚举值df.freqItems(["age","sex"]).show()#+---------------+-------------+#
age_freqItems
sex_freqItems
#+---------------+-------------+#
[55,1,28,54]
[M,F,]
#+---------------+-------------+#DataFrame.summarydf.summary().show()#+-------+-----+------------------+------------------+----+#
summary
name
age
score
sex
#+-------+-----+------------------+------------------+----+#
count
5
5
5
4
#
mean
null
33.2
86.6
null
#
stddev
null
22.
15.
null
#
min
Flora
1
60
F
#
25%
null
28
88
null
#
50%
null
28
90
null
#
75%
null
54
95
null
#
max
Sam
55
M
#+-------+-----+------------------+------------------+----+#DataFrame.sample#按照一定规则从df随机抽样数据df.sample(0.5).show()#+-----+---+-----+----+#
name
age
score
sex
#+-----+---+-----+----+#
Sam
28
88
M
#
Run
1
60
null
#
Peter
55
M
#+-----+---+-----+----+2、简单处理DataFrame的APIs
#DataFrame.distinct#对数据集进行去重df.distinct().show()#DataFrame.dropDuplicates#对指定列去重df.dropDuplicates(["sex"]).show()#+-----+---+-----+----+#
name
age
score
sex
#+-----+---+-----+----+#
Flora
28
90
F
#
Run
1
60
null
#
Sam
28
88
M
#+-----+---+-----+----+#DataFrame.exceptAll#DataFrame.subtract#根据指定的df对df进行去重df1=spark.createDataFrame([("a",1),("a",1),("b",3),("c",4)],["C1","C2"])df2=spark.createDataFrame([("a",1),("b",3)],["C1","C2"])df3=df1.exceptAll(df2)#没有去重的功效df4=df1.subtract(df2)#有去重的奇效df1.show()df2.show()df3.show()df4.show()#+---+---+#
C1
C2
#+---+---+#
a
1
#
a
1
#
b
3
#
c
4
#+---+---+#+---+---+#
C1
C2
#+---+---+#
a
1
#
b
3
#+---+---+#+---+---+#
C1
C2
#+---+---+#
a
1
#
c
4
#+---+---+#+---+---+#
C1
C2
#+---+---+#
c
4
#+---+---+#DataFrame.intersectAll#返回两个DataFrame的交集df1=spark.createDataFrame([("a",1),("a",1),("b",3),("c",4)],["C1","C2"])df2=spark.createDataFrame([("a",1),("b",4)],["C1","C2"])df1.intersectAll(df2).show()#+---+---+#
C1
C2
#+---+---+#
a
1
#+---+---+#DataFrame.drop#丢弃指定列df.drop(age).show()#DataFrame.withColumn#新增列df1=df.withColumn("birth_year",-df.age)df1.show()#+-----+---+-----+----+----------+#
name
age
score
sex
birth_year
#+-----+---+-----+----+----------+#
Sam
28
88
M
#
Flora
28
90
F
#
Run
1
60
null
#
Peter
55
M
1
#
Mei
54
95
F
#+-----+---+-----+----+----------+#DataFrame.withColumnRenamed#重命名列名df1=df.withColumnRenamed("sex","gender")df1.show()#+-----+---+-----+------+#
name
age
score
gender
#+-----+---+-----+------+#
Sam
28
88
M
#
Flora
28
90
F
#
Run
1
60
null
#
Peter
55
M
#
Mei
54
95
F
#+-----+---+-----+------+#DataFrame.dropna#丢弃空值,DataFrame.dropna(how=any,thresh=None,subset=None)df.dropna(how=all,subset=[sex]).show()#+-----+---+-----+---+#
name
age
score
sex
#+-----+---+-----+---+#
Sam
28
88
M
#
Flora
28
90
F
#
Peter
55
M
#
Mei
54
95
F
#+-----+---+-----+---+#DataFrame.fillna#空值填充操作df1=spark.createDataFrame([("a",None),("a",1),(None,3),("c",4)],["C1","C2"])#df2=df1.na.fill({"C1":"d","C2":99})df2=df1.fillna({"C1":"d","C2":99})df1.show()df2.show()#DataFrame.filter#根据条件过滤df.filter(df.age50).show()#+-----+---+-----+---+#
name
age
score
sex
#+-----+---+-----+---+#
Peter
55
M
#
Mei
54
95
F
#+-----+---+-----+---+df.where(df.age==28).show()#+-----+---+-----+---+#
name
age
score
sex
#+-----+---+-----+---+#
Sam
28
88
M
#
Flora
28
90
F
#+-----+---+-----+---+df.filter("age18").show()#+----+---+-----+----+#
name
age
score
sex
#+----+---+-----+----+#
Run
1
60
null
#+----+---+-----+----+#DataFrame.join#这个不用多解释了,直接上案例来看看具体的语法即可,DataFrame.join(other,on=None,how=None)df1=spark.createDataFrame([("a",1),("d",1),("b",3),("c",4)],["id","num1"])df2=spark.createDataFrame([("a",1),("b",3)],["id","num2"])df1.join(df2,df1.id==df2.id,left).select(df1.id.alias("df1_id"),df1.num1.alias("df1_num"),df2.num2.alias("df2_num")).sort(["df1_id"],ascending=False)\.show()#DataFrame.agg(*exprs)#聚合数据,可以写多个聚合方法,如果不写groupBy的话就是对整个DF进行聚合#DataFrame.alias#设置列或者DataFrame别名#DataFrame.groupBy#根据某几列进行聚合,如有多列用列表写在一起,如df.groupBy(["sex","age"])df.groupBy("sex").agg(F.min(df.age).alias("最小年龄"),F.expr("avg(age)").alias("平均年龄"),F.expr("collect_list(name)").alias("姓名集合")).show()#+----+--------+--------+------------+#
sex
最小年龄
平均年龄
姓名集合
#+----+--------+--------+------------+#
F
28
41.0
[Flora,Mei]
#
null
1
1.0
[Run]
#
M
28
41.5
[Sam,Peter]
#+----+--------+--------+------------+#DataFrame.foreach#对每一行进行函数方法的应用deff(person):print(person.name)df.foreach(f)#Peter#Run#Sam#Flora#Mei#DataFrame.replace#修改df里的某些值df1=df.na.replace({"M":"Male","F":"Female"})df1.show()#DataFrame.union#相当于SQL里的unionall操作df1=spark.createDataFrame([("a",1),("d",1),("b",3),("c",4)],["id","num"])df2=spark.createDataFrame([("a",1),("b",3)],["id","num"])df1.union(df2).show()df1.unionAll(df2).show()#这里union没有去重,不知道为啥,有知道的朋友麻烦解释下,谢谢了。#+---+---+#
id
num
#+---+---+#
a
1
#
d
1
#
b
3
#
c
4
#
a
1
#
b
3
#+---+---+#DataFrame.unionByName#根据列名来进行合并数据集df1=spark.createDataFrame([[1,2,3]],["col0","col1","col2"])df2=spark.createDataFrame([[4,5,6]],["col1","col2","col0"])df1.unionByName(df2).show()#+----+----+----+#
col0
col1
col2
#+----+----+----+#
1
2
3
#
6
4
5
#+----+----+----+3、DataFrame的列操作APIs
这里主要针对的是列进行操作,比如说重命名、排序、空值判断、类型判断等,这里就不展开写demo了,看看语法应该大家都懂了。
Column.alias(*alias,**kwargs)#重命名列名Column.asc()#按照列进行升序排序Column.desc()#按照列进行降序排序Column.astype(dataType)#类型转换Column.cast(dataType)#强制转换类型Column.between(lowerBound,upperBound)#返回布尔值,是否在指定区间范围内Column.contains(other)#是否包含某个关键词Column.endswith(other)#以什么结束的值,如df.filter(df.name.endswith(ice)).collect()Column.isNotNull()#筛选非空的行Column.isNull()Column.isin(*cols)#返回包含某些值的行df[df.name.isin("Bob","Mike")].collect()Column.like(other)#返回含有关键词的行Column.when(condition,value)#给True的赋值Column.otherwise(value)#与when搭配使用,df.select(df.name,F.when(df.age3,1).otherwise(0)).show()Column.rlike(other)#可以使用正则的匹配df.filter(df.name.rlike(ice$)).collect()Column.startswith(other)#df.filter(df.name.startswith(Al)).collect()Column.substr(startPos,length)#df.select(df.name.substr(1,3).alias("col")).collect()4、DataFrame的一些思路变换操作APIs
#DataFrame.createOrReplaceGlobalTempView#DataFrame.dropGlobalTempView#创建全局的试图,注册后可以使用sql语句来进行操作,生命周期取决于Sparkapplication本身df.createOrReplaceGlobalTempView("people")spark.sql("select*fromglobal_temp.peoplewheresex=M").show()#+-----+---+-----+---+#
name
age
score
sex
#+-----+---+-----+---+#
Sam
28
88
M
#
Peter
55
M
#+-----+---+-----+---+#DataFrame.createOrReplaceTempView#DataFrame.dropTempView#创建本地临时试图,生命周期取决于用来创建此数据集的SparkSessiondf.createOrReplaceTempView("tmp_people")spark.sql("select*fromtmp_peoplewheresex=F").show()#+-----+---+-----+---+#
name
age
score
sex
#+-----+---+-----+---+#
Flora
28
90
F
#
Mei
54
95
F
#+-----+---+-----+---+#DataFrame.cache\DataFrame.persist#可以把一些数据放入缓存中,defaultstoragelevel(MEMORY_AND_DISK).df.cache()df.persist()df.unpersist()#DataFrame.crossJoin#返回两个DataFrame的笛卡尔积关联的DataFramedf1=df.select("name","sex")df2=df.select("name","sex")df3=df1.crossJoin(df2)print("表1的记录数",df1.count())print("表2的记录数",df2.count())print("笛卡尔积后的记录数",df3.count())#表1的记录数5#表2的记录数5#笛卡尔积后的记录数25#DataFrame.toPandas#把SparkDataFrame转为Pandas的DataFramedf.toPandas()#DataFrame.rdd#把SparkDataFrame转为rdd,这样子可以用rdd的语法来操作数据df.rdd5、DataFrame的一些统计操作APIs
#DataFrame.cov#计算指定两列的样本协方差df.cov("age","score")#.97#DataFrame.corr#计算指定两列的相关系数,DataFrame.corr(col1,col2,method=None),目前method只支持Pearson相关系数df.corr("age","score",method="pearson")#0.#DataFrame.cube#创建多维度聚合的结果,通常用于分析数据,比如我们指定两个列进行聚合,比如name和age,那么这个函数返回的聚合结果会#groupby("name","age")#groupby("name")#groupby("age")#groupby(all)#四个聚合结果的unionall的结果df1=df.filter(df.name!="Run")print(df1.show())df1.cube("name","sex").count().show()#+-----+---+-----+---+#
name
age
score
sex
#+-----+---+-----+---+#
Sam
28
88
M
#
Flora
28
90
F
#
Peter
55
M
#
Mei
54
95
F
#+-----+---+-----+---+#cube聚合之后的结果#+-----+----+-----+#
name
sex
count
#+-----+----+-----+#
null
F
2
#
null
null
4
#
Flora
null
1
#
Peter
null
1
#
null
M
2
#
Peter
M
1
#
Sam
M
1
#
Sam
null
1
#
Mei
F
1
#
Mei
null
1
#
Flora
F
1
#+-----+----+-----+
嘻嘻,恭喜你读完啦,奖励你一首歌,一起加油。
预览时标签不可点收录于话题#个上一篇下一篇