Spark_05 SparkSQL进阶
说明
本章主要介绍df的高级用法,session对象,数据源和格式。
DataFrame的高级DSL方法
join关联方法
内连接:df1.join(df2,on='关联条件',how=‘inner’)
左外连接:df1.join(df2,on='关联条件',how=‘left’)
右外连接:df1.join(df2,on='关联条件',how=‘right’)
满外连接:df1.join(df2,on='关联条件',how=‘full’)
示例代码
from pyspark.sql import SparkSession
from pyspark.sql.types import *
ss=SparkSessio.buider.getOrCreate()
sc=ss.sparkContext()
rdd1 = sc.parallelize([
[1, 'zhangsan', 20],
[2, 'lisi', 20],
[3, 'wangwu', 22]
])
rdd2 = sc.parallelize([
[3, 'zhaoliu', 20],
[4, 'xiaoming', 21],
[5, 'itcast', 22]
])
schema_type = StructType(). \
add('id', IntegerType()). \
add('name', StringType()). \
add('age', IntegerType(), False)
# toDF 将二维rdd数据转为dataframe数据
df1 = rdd1.toDF(schema_type)
df2 = rdd2.toDF(schema_type)
df1.show()
df2.show()
# 内关联 找关联字段相同的数据 默认join方法时内关联
df_join = df1.join(df2,df1.id==df2.id)
df_join.show()
# 左外关联
df_join_left = df1.join(df2,df1.id==df2.id,'left')
df_join_left.show()
# 右外关联
df_join_right = df1.join(df2,df1.id==df2.id,'right')
df_join_right.show()
# 满外关联
df_join_full = df1.join(df2, df1.id == df2.id, 'full')
df_join_full.show()
缓存和checkpoint
既有缓存又有checkpoint时,优先读取缓存数据
缓存读取比checkpoint读取速度快
实例代码
# DF的缓存和checkpoint
from pyspark.sql import SparkSession
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 使用sc指定checkpoint位置
sc.setCheckpointDir('hdfs://node1:8020/dataframecheck')
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
#df数据缓存
df.persist()
df.checkpoint()
# 缓存后的操作
df_select6 = df.select(df.id.cast('int'), df.name, df.gender, df.birthday, df.age.cast('int'), df.major, df.hobby,df.create_time)
df_select6.show()
内置函数
字符串函数
-
F.concat(字段1, 字段2, ...): 字符串拼接
-
F.concat_ws(拼接符, 字段1, 字段2, ...): 根据指定拼接符进行字符串拼接
-
F.split(字段, 分割符): 根据指定分割符将字段进行分割
-
F.substring(字段, 索引值, 长度): 字符串截取
-
F.substring_index(字段, 截取符, 第n个截取符): 字符串截取, 截取第n个截取符左右的子串
-
n为正数,从1开始,截取截取符左侧子串
-
n为负数,从-1开始,截取截取符右侧子串
-
# DF的内置函数
# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致
# todo:导入内置函数模块,as命名别称F 使用模块时可以用F表示
from pyspark.sql import SparkSession, functions as F
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
#字符串拼接
df_res =df.select(F.concat_ws(':','name','gender').alias('concat_ws'))
#字符串切割
df_res =df.select(F.split('name','-').alias('concat_ws'))
#字符串截取
df_res =df.select(F.substring('birthday',0,4).show()
df_res =df.select(F.substring('birthday','-',2)).show()
df_res =df.select(F.substring('birthday','-',2)).show()
时间函数
-
F.current_date(): 获取当前日期 时间格式yyyy-MM-dd
-
F.current_timestamp(): 获取当前日期时间 时间格式yyyy-MM-dd HH:mm:ss.ms
-
F.unix_timestamp(): 获取时间戳
-
F.from_unixtime(uninx, 日期时间格式): uninx时间转化为日期时间
-
时间取值: F.year()、F.month()...
-
时间加减: F.date_add()、F.date_sub()、F.datediff()
-
日期时间格式化: F.date_format()
from pyspark.sql import SparkSession, functions as F
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
获取当前时间
# 获取当前日期 年月日
df.select(F.current_date()).show()
# 获取当前日期时间 年月日时分秒毫秒
df.select(F.current_timestamp()).show(truncate=False)
# 获取当前时间戳 秒
df.select(F.unix_timestamp()).show(truncate=False)
#日期时间格式化
# 将日期时间格式数据进行格式化
# yyyy->年
# MM->月
# dd->日
# HH->小时
# mm->分钟
# ss->秒
df.select(F.date_format('birthday', 'yyyy/MM/dd')).show()
df.select(F.date_format('birthday', 'yyyy-MM')).show()
#unix时间转化
# 将unix时间戳转换成日期时间格式
df.select(F.from_unixtime('create_time', 'yyyy-MM-dd HH:mm:ss')).show()
df.select(F.from_unixtime('create_time', 'yyyy-MM-dd')).show
#获取部分时间
# 获取年份
df.select(F.year('birthday')).show()
# 获取月份
df.select(F.month('birthday')).show()
# 获取日
df.select(F.dayofmonth('birthday')).show()
# 获取季度
df.select(F.quarter('birthday')).show()
#时间运算
# 加多少天
df.select(F.date_add('birthday', 10)).show()
# 减多少天
df.select(F.date_sub('birthday', 10)).show()
# 加多少个
df.select(F.add_months('birthday', 2)).show()
# 时间作差
df.select(F.datediff(F.current_date(), 'birthday')).show()
# 获取当前月的最后一天
df.select(F.last_day('birthday')).show()
# 下一个星期日期
df.select(F.next_day('birthday', 'Sun')).show()
聚合函数
sum->求和
max/min->最大/最小
mean/avg->求平均
count->计数
agg/aggregate->聚合操作, 结合聚合函数使用
# 对某列直接聚合, 返回一个值
df.select(F.sum('age'), F.count('id'), F.countDistinct('id')).show()
# 分组聚合
df_select = df.select(df.id.cast('int'), 'name', 'gender',df['age'].cast('int'),'birthday', 'major', 'hobby','create_time')
# 对一列数据进行分组聚合
df_select.groupby('gender').avg('age').show()
df_select.groupby('gender').agg(F.mean('age').alias('mean_age')).show()
# 对多列数据进行分组聚合
# round(): 保留几位小数
df_select.groupby('gender').agg(F.round(F.mean('age'),2).alias('mean_age'),F.count('id').alias('count')).show()
df_select.groupby('gender').agg({'age': 'mean', 'id': 'count'}).show()
print(df_select.count())
Sparksession操作
概念:sparksession是操作sparksql的入口类
sparksession基本使用
from pyspark.sql import SparkSession
#sparksession参数指定
# master 指定资源调度 不指定默认是local
# appName 指定任务名称
# config 指定配置信息
# builder --中间指定参数 --getOrCreate
ss.SparkSession.builder.master('yarn').appName('spark_sql').config('PYSPARK_PYTHON', '/export/server/anaconda3/bin/python3').getOrCreate()
df=ss.createDataFrame([[1, '张三', 20], [2, '李四', 20]], schema='id int,name string,age int')
df.show()
指定分区数量:
ss=SparkSession.builder.master('yarn').config('spark.sql.shuffle.partition'.'6').getOrCreate()
数据源和格式
数据读取
-
txt格式文件
-
ss.read.text(path=)
-
-
json格式文件
-
ss.read.json(path=)
-
-
csv格式文件
-
ss.read.csv(path=, sep=, schema=) -
csv文件由两个部分构成 头部数据 行数据
-
sep: 分隔符
-
schema: 字段数据
-
-
orc格式文件
-
ORC的全称是(Optimized Record Columnar),优化后的列式存储格式,默认大小256M
-
ss.read.orc(path=)
-
-
parquet格式文件
-
Parquet是一种列式存储格式
-
ss.read.parquet(path=)
-
示例代码
from pyspark.sql import SparkSession, functions as F
ss = SparkSession.builder.getOrCreate()
# read方法读取
#txt文件读取
df = ss.read.text('hdfs://node1:8020/data/words.txt')
df.show(truncate = false)
df2 =df.select(F.split(','))
df2.show(truncate = false)
#json文件读取
df_json = ss.read.json('hdfs://node1:8020/data/x0.json')
df_json.show(truncate=False)
#csv文件读取
df_csv = ss.read.csv('hdfs://node1:8020/data/stu.csv', sep=',',schema='name string,age int,gender string,phone string,email string,city string,address string')
df_csv.show()
#parquet文件读取
df_parquet = ss.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet')
df_parquet.show()
#数据库读取
df_mysql =ss.read.jdbc(url='jdbc:mysql://node1:3306/itcast?characterEncoding=UTF-8',table='stu',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
options = {
'url': 'jdbc:mysql://node1:3306/shopnc_db?characterEncoding=UTF-8',
'user': 'root',
'password': '123456',
'driver': 'com.mysql.jdbc.Driver',
# 'dbtable': 'uc_store'
'query': 'select store_id, store_name from uc_store limit 10'
}
ss.read.options(**options).load(fromat='jdbc').show()
数据写入
因为数据是在df中存储,所以使用dataframe进行数据写入
使用dtaframe的write方法
写入文件有个模式参数,覆盖和追加两种方式,用mode参数指定
覆盖 overwrite
追加 append
-
txt格式文件
-
df.write.text(path=)
-
-
json格式文件
-
df.write.json(path=, mode=)
-
-
csv格式文件
-
df.write.csv(path=, sep=, mode=)
-
-
orc格式文件
-
df.write.orc(path=, mode=)
-
-
parquet格式文件
-
df.write.parquet(path=, mode=)
-
-
mysql数据
-
df.write.jdbc(url=, table=, mode=,properties=)
-
from pyspark.sql import SparkSession, functions as F
ss = SparkSession.builder.getOrCreate()
df = ss.createDataFrame([[1, '张三', 20, '男'], [2, '李四', 20, '男']], schema='id int,name string,age int,gender string')
# 将df数据写入
# 写入text
# 需要将所有字段数据拼接在一起形成一行字符串
df_text = df.select(F.concat_ws(',', df.id.cast('string'), df.name, df.age.cast('string'), df.gender))
df_text.write.text('hdfs://node1:8020/data_text')
# 写入数据时,有参数 mode
# mode='append' 追加写入
# mode='overwrite' 覆盖写入
df.write.json('hdfs://node1:8020/data_json', mode='overwrite')
#写入csv文件
df.write.csv('hdfs://node1:8020/data_csv', mode='overwrite', sep=':')
#写入orc文件
df.write.orc('hdfs://node1:8020/data_orc', mode='overwrite')
#写入parquet文件
df.write.parquet('hdfs://node1:8020/data_parquet', mode='overwrite')
#写入mysql文件
df.write.jdbc(url='jdbc:mysql://node1:3306/itcast?characterEncoding=UTF-8', table='stu', mode='overwrite',properties={'user': 'root', 'password': '123456', 'driver': 'com.mysql.jdbc.Driver'})