Python学习之PySpark案例实战

"""
演示获取PySpark的执行环境入库对象: SparkContext
并通过SparkContext对象获取当前PySpark的版本
"""

# 导包

from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象中
conf = SparkConf().setSparkHome("local[*]").setAppName("test_spqrk_app")
#基FSparkConf类对象创LSparkContext对象
sc = SparkContext(conf = conf)
#打印PySpark的运行版本
print(sc.version)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。PySpark的编程,主要分为如下三大步骤

通过SparkContext对象,完成数据输入

输入数据后得到RDD对象,对RDD对象进行选代计算

最终通过RDD对象的成员方法,完成数据输出工作

数据输入

只要数据输入到spark就一定是rdd

RDD对象

如图可见,PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)PySpark针对数据的处理,都是以RDD对象作为载体,即:

数据存储在RDD内

各类数据的计算方法,也都是RDD的成员方法

RDD的数据计算方法,返回值依旧是RDD对象

Python数据容器转RDD对象

PySpark支持通过SparkContext对象parallelize成员方法

List

Tuple

Set

Dic

tstr

转换为PySpark的RDD对象

注意:

字符串会被拆分出1个个的字符,存入RDD对象

字典仅有key会被存入RDD对象

"""
演示通过PySpark代码加载数据,即数据输入
"""
from pyspark import SparkContext,SparkConf

#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象
#构建实验数据
list_1 = [1,2,3,4,5]
turple_1 = (1,2,3,4)
str_1 = ("asdoac")
dict_1 = {"key1":"value1","key2":"value2"}
set_1 = {1,2,3,4,5}
#通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize(list_1)  #列表转spark的rdd
rdd2 = sc.parallelize(turple_1)  #元组转spark的rdd
rdd3 = sc.parallelize(str_1)  #字符串转spark的rdd
rdd4 = sc.parallelize(dict_1)  #字典转spark的rdd
rdd5 = sc.parallelize(set_1)  #集合转spark的rdd

print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

sc.stop()
读取文件转RDD对象

PySpark也支持通过SparkContext入口对象来读取文件来构建出RDD对象

"""
演示通过PySpark代码加载数据,即数据输入
"""
from pyspark import SparkContext,SparkConf

#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("D:/bill.txt")
print(rdd.collect())

sc.stop()
总结:
RDD对象是什么?为什么要使用它?

通过spark的contxt加载数据为rdd

RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体它可以:

提供数据存储

提供数据计算的各类方法

****数据计算的方法,返回值依旧是RDD(RDD迭代计算)后续对数据进行各类计算,****都是基于RDD对象进行

如何输入数据到Spark(即得到RDD对象)

通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象

通过SparkContext的textFile成员方法读取文本文件得到RDD对象

数据计算

map方法

PySpark的数据计算,都是基于RDD对象来进行的那么如何进行呢?

自然是依赖RDD对象内置丰富的:成员方法(算子)

map算子

功能:map算子,是将RDD的数据一条条处理( 处理的逻基于ap算子中接收的处理函数 )返回新的RDD

语法:

(T)-> U :表示你传入一个参数T,有传出的东西,类型不限;

(T)-> T :表示的是传入参数T之后,传出的也是和T一样的数据类型

需要添加python.exe的位置

"""
演示map算子
"""
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize([1,2,3,4,5,6])

#使用map方法使得每个都乘10
def func(data):
    return data * 10

rdd2 = rdd.map(func)   #调用这个func的函数去对参数进行操作

print(rdd2.collect())

因为def的函数有些简单就一行,所以可以使用lamba匿名函数来优化

"""
演示map算子
"""
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize([1,2,3,4,5,6])

#使用map方法使得每个都乘10
# def func(data):
#     return data * 10

rdd2 = rdd.map(lambda x:x*10)   #调用这个func的函数去对参数进行操作

print(rdd2.collect())

Map调用之后,乘以10了,返回值依旧是rdd,那么如果还想对数据进行操作的话,那么就可以在后面继续加map+匿名函数(链式调用),但是匿名函数只限于函数语句少的,多的话还是def外部定义

"""
演示map算子
"""
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize([1,2,3,4,5,6])

#使用map方法使得每个都乘10
# def func(data):
#     return data * 10

rdd2 = rdd.map(lambda x:x*10).map(lambda x:x+5)   #调用这个func的函数去对参数进行操作

print(rdd2.collect())
总结:
map算子(成员方法)

接受一个处理函数可用lambda表达式快速编写

对RDD内的元素逐个处理,并返回一个新的RDD

链式调用

对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子

flatMap方法

flatMap算子

功能:对rdd执行map操作,然后进行 解除嵌套操作

"""
演示flatmap算子
"""
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize(["master,servant,fate","saber,archer,lacher","basker,rider"])

#需求,将RDD数据里面的一个个单词拿出来
rdd2 = rdd.map(lambda x:x.split(" "))
print(rdd2.collect())

"""
演示flatmap算子
"""
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize(["master servant fate","saber archer lacher","basker rider"])

#需求,将RDD数据里面的一个个单词拿出来
rdd2 = rdd.flatMap(lambda x:x.split(" "))
print(rdd2.collect())
总结:
flatMap算子

计算逻辑和map一样

可以比map多出,解除一层嵌套的功能

reduceByKey方法

功能: 针对KV型 RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成****组内数据[value)****的聚合操作.

两个传入参数,和返回的类型需要是一致的。

聚合逻辑

注意: reduceByKey中接收的函数 只负责聚合,不理会分组,分组是自动 by key来分组的。

这里的a+b是指代的传入的两个key的value的实现,比如a的value有两个,就是这两个value相加

"""
演示reduceBYKey算子
"""
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize([('a',1),('a',2),('b',3),('b',4)])
rdd2 = rdd.reduceByKey(lambda a,b : a+b)
print(rdd2.collect())
sc.stop()
总结:

练习案例1

读取文件,对文件内的单词进行计数

#1.构建执行环境入口对象
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象
#2.读取数据文件
rdd = sc.textFile("D:/hello.txt")
#3.取出全部单词
world_rdd = rdd.flatMap(lambda a : a.split(" ")).map(lambda x:x.strip())
# print(world_rdd.collect())
#4.将所有单词都转换成二元元组,单词为Key,value设置为1,有几个就有几个1,相加
word_with_one = world_rdd.map(lambda word:(word,1))
# print(word_with_one.collect())
#5.分组并求和
result_rdd = word_with_one.reduceByKey(lambda a,b :a+b )
#6.打印输出结果
print(result_rdd.collect())

filter方法

功能:过滤想要的数据进行保留

"""
演示RDD的filter成员方法的使用
"""
#1.构建执行环境入口对象
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize([1,2,3,4,5,6,7,8])
#对rdd的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())

接受一个处理函数,可用lambda快速编写

函数对RDD数据逐个处理,得到True的保留至返回值的RDD中

distinct方法

功能:对RDD数据进行去重,返回新RDD

"""
演示RDD的distint成员方法的使用
"""
#1.构建执行环境入口对象
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

#准备一个rdd
rdd = sc.parallelize([1,1,2,3,4,4,4,4,3,4,6,6,6,7])
#对rdd的数据进行去重操作
rdd2 = rdd.distinct()
print(rdd2.collect())

sortBy方法

功能:对RDD数据进行排序基于你指定的排序依据

语法:

func: (T) - U: 告知按照rdd中的哪个数据进行排序,比如 Lambda x: x[1]表示rdd中的第二列进行排序

ascending =True升序 False 降序

numPartitions: 用多少分区排序

#1.构建执行环境入口对象
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象
#2.读取数据文件
rdd = sc.textFile("D:/hello.txt")
#3.取出全部单词
world_rdd = rdd.flatMap(lambda a : a.split(" ")).map(lambda x:x.strip())
# print(world_rdd.collect())
#4.将所有单词都转换成二元元组,单词为Key,value设置为1,有几个就有几个1,相加
word_with_one = world_rdd.map(lambda word:(word,1))
# print(word_with_one.collect())
#5.分组并求和
result_rdd = word_with_one.reduceByKey(lambda a,b :a+b )
#6.对结果进行排序
# print(result_rdd.collect())
final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())

接收一个处理函数,可用lambda快速编写

函数表示用来决定排序的依据

可以控制升序或降序

全局排序需要设置分区数为1

练习案例2

多个Json数据串联在一起的;

"""
案例需求:
各个城市销售额排名,从大到小全部城市,
有哪些商品类别在售卖
北京市有哪些商品类别在售卖
"""

from pyspark import SparkContext,SparkConf
import os
import json
os.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"
#构建接口对象
conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")
sc = SparkContext(conf=conf) #构建接口对象

# TODO 而状1: 城市箭售额接名1.1/歌文件得到RDD
# 1.2 取出一个个JSON字符串
file_rdd = sc.textFile("D:/orders.txt")
json_str_rdd = file_rdd.flatMap(lambda x :x.split("|"))
"""
map可以将数据一条一条的取出来,因为文本内一行内有多条json数据以为|隔开
"""
# 1.3 一个个JSON字符审转换为字典
dict_rdd = json_str_rdd.map(lambda x :json.loads(x)) #将每条json数据转换为字典
# 1.4取出城市和销售额数据,通过lamad函数去让它成为二元元组
city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money']))) #取出城市和销售额数据
# 1.5 技城市分组按销售额聚合
city_result = city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按销售额聚合结果进行排序
result_rdd = city_result.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(f"最后的结果是{result_rdd.collect()}")
# TODO 需求2: 全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别,对全部商品类别进行去重
category_rdd = dict_rdd.map(lambda x :x["category"]).distinct()
print(f"全部售卖的结果去重后是{category_rdd.collect()}")
# TODO 状3: 北京市有哪些商品类别在售灵
#3.1 过滤北京市的数据
beijing_rdd = dict_rdd.filter(lambda x :x['areaName'] == '北京')
# 3.2 取出全部商品类别
result3_rdd = beijing_rdd.map(lambda x :x['category']).distinct()
print(f"北京的类别有{result3_rdd.collect()}")

数据输出(rdd转换为python数据)

collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

用法:

rdd.collect()

返回值是一个list

之前也使用过去print(rdd.collect())这是将rdd对象转变为一个python的list进行打印。

reduce算子

take算子

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Python工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Python开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

img

img

img

img

img

img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上前端开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以扫码获取!!!(备注:Python)

绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。**

深知大多数Python工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Python开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

[外链图片转存中…(img-JqTdrHpT-1713660936323)]

[外链图片转存中…(img-RuBR7tTj-1713660936324)]

[外链图片转存中…(img-RaqP2WAl-1713660936324)]

[外链图片转存中…(img-N25BAMNC-1713660936325)]

img

img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上前端开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以扫码获取!!!(备注:Python)