本文共 2622 字,大约阅读时间需要 8 分钟。
//导包import org.apache.spark.sql.SparkSession//编写代码//1.定义SparkSession val spark = SparkSession.builder().master("local[*]").appName("wordCount").getOrCreate()//2.根据SparkSession获取SparkContext 上下文对象 val sc = spark.sparkContext//3.根据sparkContext读取文件并按照空格 返回RDD val wordRDD = sc.textFile("./data/words.txt").flatMap(_.split(" "))//4.导入隐式转换 import spark.implicits._ val dataFrame = wordRDD.toDF()//5.wordRDD 进行计算WordContext dataFrame.createOrReplaceTempView("word") //=============SQL============//6.编写SQL var sql="select value,count(value) as count from word group by value order by count desc"//7.提交SQL语句并查询 spark.sql(sql).show() //============SDL============ dataFrame.groupBy($"value").count().orderBy($"count" desc).show()
Spark SQL 可以与多种数据源交互,如普通文本、json、Parquet、csv等
//导包import org.apache.spark.sql.{SaveMode, SparkSession}// 定义样例类用来快速保存数据 case class Person(id: Int, name: String, age: Int) def main(args: Array[String]): Unit = { //1.实例SparkSession val spark = SparkSession.builder().master("local[*]").appName("sql").getOrCreate() //2.根据SparkSession 获取SparkContext 获取上下文对象 val sc = spark.sparkContext //3.读取数据并按照空格切分保存到person中返回RDD val personRDD = sc.textFile("./data/person.txt").map(_.split(" ")).map(x => Person(x(0).toInt, x(1), x(2).toInt)) //4.导入隐式类 import spark.implicits._ //5.RDD 转换为DataFrame val personDF = personRDD.toDF() //6.将数据转换成json输出 personDF.write.json("./data/json") //7.将数据转换成cav personDF.write.csv("./data/csv") //8.将数据转成 parquet personDF.write.parquet("./data/parquet") //9.将数据写入到mysql中 val prop = new Properties() // 添加用户名 prop.setProperty("user","root") // 添加密码 prop.setProperty("password","root") // 写入数据并连接 personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8","person",prop)
//导包import org.apache.spark.sql.SparkSession //1.实例SparkSession val spark = SparkSession.builder().master("local[*]").appName("sql").getOrCreate() //2.根据SparkSession获取SparkContext 获取上下文对象 val sc = spark.sparkContext //3.读取json数据 spark.read.json("./data/json").show() //4.读取csv文件 添加元数据信息 spark.read.csv("./data/csv").toDF("id", "name", "age").show() //5.读取parquet数据 spark.read.parquet("./data/parquet").show() //6.读取mysql数据 val prop = new Properties() // 添加用户名 prop.setProperty("user", "root") // 添加密码 prop.setProperty("password", "root") // 添加连接配置并获取数据 spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8","person",prop).show()
转载地址:http://dokzi.baihongyu.com/