《Spark 编程基础(Scala 版)》第 6 章 Spark SQL 实验 5 Spark SQL 编程初级实践 (超级详细版)
一、实验目的
(1)通过实验掌握 Spark SQL 的基本编程方法;
(2)熟悉 RDD 到 DataFrame 的转化方法;
(3)熟悉利用 Spark SQL 管理来自不同数据源的数据。
二、实验平台 操作系统: Ubuntu16.04 Spark 版本:2.1.0 数据库:MySQL
三、实验内容和要求
1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。 { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" } 为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除 id 字段;
(4) 筛选出 age>30 的记录;
(5) 将数据按 age 分组;
(6) 将数据按 name 升序排列;
(7) 取出前 3 行数据;
(8) 查询所有记录的 name 列,并为其取别名为 username;
(9) 查询年龄 age 的平均值;
(10) 查询年龄 age 第 2 页 的最小值。
1.再MobaXter里
cd`/home/hadoop
创建
vim employee.json`
把数据粘上去


2.再IDEA上
创建
case object SparkSQLjibencaozuo

里面代码
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case object SparkSQLjibencaozuo{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("EmployeeData").getOrCreate()
import spark.implicits._
// 加载 JSON 数据到 DataFrame
//val employeeDF = spark.read.json("employee.json")
val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
// (1) 查询所有数据
employeeDF.show()
// (2) 查询所有数据,并去除重复的数据
val distinctEmployeeDF = employeeDF.distinct()
distinctEmployeeDF.show()
// (3) 查询所有数据,打印时去除 id 字段
employeeDF.select("name", "age").show()
// (4) 筛选出 age>30 的记录
employeeDF.filter($"age" > 30).show()
// (5) 将数据按 age 分组
employeeDF.groupBy("age").count().show()
// (6) 将数据按 name 升序排列
employeeDF.orderBy($"name".asc).show()
// (7) 取出前 3 行数据
employeeDF.limit(3).show()
// (8) 查询所有记录的 name 列,并为其取别名为 username
employeeDF.select($"name".as("username")).show()
// (9) 查询年龄 age 的平均值
employeeDF.select(avg($"age")).show()
// (10) 查询年龄 age 的最小值
employeeDF.select(min($"age")).show()
// 停止 Spark 会话
spark.stop()
}
}
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case object SparkSQLjibencaozuo{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("EmployeeData").getOrCreate()
import spark.implicits._
// 加载 JSON 数据到 DataFrame
//val employeeDF = spark.read.json("employee.json")
val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
// (1) 查询所有数据
employeeDF.show()
// (2) 查询所有数据,并去除重复的数据
val distinctEmployeeDF = employeeDF.distinct()
distinctEmployeeDF.show()
// (3) 查询所有数据,打印时去除 id 字段
employeeDF.select("name", "age").show()
// (4) 筛选出 age>30 的记录
employeeDF.filter($"age" > 30).show()
// (5) 将数据按 age 分组
employeeDF.groupBy("age").count().show()
// (6) 将数据按 name 升序排列
employeeDF.orderBy($"name".asc).show()
// (7) 取出前 3 行数据
employeeDF.limit(3).show()
// (8) 查询所有记录的 name 列,并为其取别名为 username
employeeDF.select($"name".as("username")).show()
// (9) 查询年龄 age 的平均值
employeeDF.select(avg($"age")).show()
// (10) 查询年龄 age 的最小值
employeeDF.select(min($"age")).show()
// 停止 Spark 会话
spark.stop()
}
}
注意一下
```
// 加载 JSON 数据到 DataFrame
//val employeeDF = spark.read.json("employee.json")
val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
```
记好employee.json文件在哪个目录下
1.运行代码
有这个结果就行了,不需要在意红色
Process finished with exit code 1
2.打包项目 build->package->git add jar->commit->push,在虚拟机中 git pull origin master 上传的 jar 包

出现这个结果就正确的
3.以 spark-local 模式提交 spark 任务
先 cd /opt/module/spark-local/
再运行
bin/spark-submit --master local[*] --jars /opt/module/spark-local/jars/mysql-connector-java-5.1.27-bin.jar --class IDEA自己文件的地址 ~/gitdata/target/scala_demo-1.0-SNAPSHOT.jar
4.10结果就出来了

2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):
1,Ella,36 2,Bob,29 3,Jack,29
请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代 码。
employee.txt 如上面一样
再IDEA上建立RDDToDataFrameExample
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types._
case object RDDToDataFrameExample{
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("RDD to DataFrame Example")
.master("local[*]") // 使用本地模式,如果连接到集群请更改这里
.getOrCreate()
import spark.implicits._
// 指定employee.txt文件的位置
val inputFilePath = "file:///home/hadoop/employee.txt"
// 从文本文件读取数据创建RDD
val rdd = spark.sparkContext.textFile(inputFilePath)
// 定义DataFrame的schema
val schema = StructType(Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
// 将RDD转换为DataFrame
val dataFrame = spark.createDataFrame(rdd.map { line =>
val parts = line.split(",")
Row(parts(0).toInt, parts(1), parts(2).toInt)
}, schema)
// 显示DataFrame内容
dataFrame.show(false)
// 按照指定格式打印所有数据
dataFrame.collect().foreach { row =>
println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
}
// 停止SparkSession
spark.stop()
}
}
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types._
case object RDDToDataFrameExample{
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("RDD to DataFrame Example")
.master("local[*]") // 使用本地模式,如果连接到集群请更改这里
.getOrCreate()
import spark.implicits._
// 指定employee.txt文件的位置
val inputFilePath = "file:///home/hadoop/employee.txt"
// 从文本文件读取数据创建RDD
val rdd = spark.sparkContext.textFile(inputFilePath)
// 定义DataFrame的schema
val schema = StructType(Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
// 将RDD转换为DataFrame
val dataFrame = spark.createDataFrame(rdd.map { line =>
val parts = line.split(",")
Row(parts(0).toInt, parts(1), parts(2).toInt)
}, schema)
// 显示DataFrame内容
dataFrame.show(false)
// 按照指定格式打印所有数据
dataFrame.collect().foreach { row =>
println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
}
// 停止SparkSession
spark.stop()
}
}

1.建表
CREATE DATABASE sparktest;
USE sparktest;
CREATE TABLE employee (
id INT PRIMARY KEY,
name VARCHAR(50),
gender CHAR(1),
age INT
);
INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);
————————————————
.

再IDEA上建立MySQLDataFrameExample
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum
case object MySQLDataFrameExample{
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("MySQL DataFrame Example MySQL写入与读取")
.master("local[*]") // 使用本地模式,如果连接到集群请更改这里
.getOrCreate()
import spark.implicits._
// 配置MySQL JDBC连接
val jdbcProperties = new Properties()
jdbcProperties.setProperty("user", "root")
jdbcProperties.setProperty("password", "自己的密码")
jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver")
//jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
// 定义MySQL的JDBC连接URL
val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest"
// 创建DataFrame以插入数据
val newEmployeeData = Seq(
(3, "Mary", "F", 26),
(4, "Tom", "M", 23)
).toDF("id", "name", "gender", "age")
// 将DataFrame数据插入到MySQL的employee表中
newEmployeeData.write
.mode("append") // 使用append模式来添加数据,而不是覆盖
.jdbc(jdbcUrl, "employee", jdbcProperties)
// 从MySQL读取employee表的数据
val employeeDF = spark.read
.jdbc(jdbcUrl, "employee", jdbcProperties)
// 打印age的最大值
val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
println(s"Max age: $maxAge")
// 打印age的总和
val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
println(s"Sum of ages: $sumAge")
// 停止SparkSession
spark.stop()
}
}
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum
case object MySQLDataFrameExample{
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("MySQL DataFrame Example MySQL写入与读取")
.master("local[*]") // 使用本地模式,如果连接到集群请更改这里
.getOrCreate()
import spark.implicits._
// 配置MySQL JDBC连接
val jdbcProperties = new Properties()
jdbcProperties.setProperty("user", "root")
jdbcProperties.setProperty("password", "自己的")
jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver")
//jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
// 定义MySQL的JDBC连接URL
val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest"
// 创建DataFrame以插入数据
val newEmployeeData = Seq(
(3, "Mary", "F", 26),
(4, "Tom", "M", 23)
).toDF("id", "name", "gender", "age")
// 将DataFrame数据插入到MySQL的employee表中
newEmployeeData.write
.mode("append") // 使用append模式来添加数据,而不是覆盖
.jdbc(jdbcUrl, "employee", jdbcProperties)
// 从MySQL读取employee表的数据
val employeeDF = spark.read
.jdbc(jdbcUrl, "employee", jdbcProperties)
// 打印age的最大值
val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
println(s"Max age: $maxAge")
// 打印age的总和
val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
println(s"Sum of ages: $sumAge")
// 停止SparkSession
spark.stop()
}
}
再如实验一,上创到虚拟机上查看结果