dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。
一,启动spark-shell
spark-shell --master yarn
二,创建SparkSession
scala> import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession scala> val spark = SparkSession.builder().appName("Spark SQL basic example").enableHiveSupport().getOrCreate() 2019-01-04 11:44:21 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect. spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2565b097
三,toDF方法创建dataframe
1,将seq序列转换成dataframe
scala> import spark.implicits._ import spark.implicits._ scala> val df = Seq( | (1, "tank", 25), | (2, "zhang", 26) | ).toDF("id", "name", "age") df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> df.show() +---+-----+---+ | id| name|age| +---+-----+---+ | 1| tank| 25| | 2|zhang| 26| +---+-----+---+
2,列表转换成dataframe
scala> val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29)) array: List[(Int, String, Int)] = List((1,tank1,25), (2,tank2,26), (3,tank3,29)) scala> val df1 = array.toDF("id", "name", "age") df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> df1.show() +---+-----+---+ | id| name|age| +---+-----+---+ | 1|tank1| 25| | 2|tank2| 26| | 3|tank3| 29| +---+-----+---+
四,通过createDataFrame创建dataframe
scala> val schema = StructType(List( | StructField("id", IntegerType, nullable = false), | StructField("name", StringType, nullable = true), | StructField("age", IntegerType, nullable = true), | StructField("birthday", DateType, nullable = true) | )) schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(birthday,DateType,true)) scala> val rdd = spark.sparkContext.parallelize(Seq( | Row(1, "tank1", 25, java.sql.Date.valueOf("1982-07-07")), | Row(2, "zhang", 26, java.sql.Date.valueOf("1983-02-19")) | )) rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[107] at parallelize at <console>:39 scala> val df2 = spark.createDataFrame(rdd, schema) df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields] scala> df2.show() +---+-----+---+----------+ | id| name|age| birthday| +---+-----+---+----------+ | 1|tank1| 25|1982-07-06| | 2|zhang| 26|1983-02-18| +---+-----+---+----------+
创建的dataframe,以及里面的数据,是没有落地的。
五,存储数据到hdfs
1,加载存储包
scala> import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SaveMode
2,不分区存储
scala> df.write.parquet("hdfs://bigserver1:9000/test/spark/tank"); scala> var test = spark.read.load("hdfs://bigserver1:9000/test/spark/tank"); test: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> test.show(); +---+-----+---+ | id| name|age| +---+-----+---+ | 2|zhang| 26| | 1| tank| 25| +---+-----+---+
3,根据id分区存储
scala> df2.write.partitionBy("id").mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1"); scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1"); test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields] scala> test1.show(); +-----+---+----------+---+ | name|age| birthday| id| +-----+---+----------+---+ |zhang| 26|1983-02-18| 2| |tank1| 25|1982-07-06| 1| +-----+---+----------+---+
4,数据的追加
scala> df1.write.mode(SaveMode.Append).parquet("hdfs://bigserver1:9000/test/spark/tank"); scala> var test2 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank"); test2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> test2.show(); +---+-----+---+ | id| name|age| +---+-----+---+ | 1|tank1| 25| | 2|zhang| 26| | 1| tank| 25| | 2|tank2| 26| | 3|tank3| 29| +---+-----+---+
5,数据的修改,分区的dataframe才有效
scala> test1.show(); +-----+---+----------+---+ | name|age| birthday| id| +-----+---+----------+---+ |zhang| 26|1983-02-18| 2| |tank1| 25|1982-07-06| 1| +-----+---+----------+---+ scala> val df4 = Seq( | (2, "zhangying", 37, java.sql.Date.valueOf("1986-11-11")) | ).toDF("id", "name", "age","birthday") df4: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields] scala> df4.write.mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1/id=2"); scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1") test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields] scala> test1.show(); +---------+---+----------+---+ | name|age| birthday| id| +---------+---+----------+---+ |zhangying| 37|1986-11-11| 2| | tank1| 25|1982-07-06| 1| +---------+---+----------+---+
对数据处理操作非常的多,这里只是简单介绍。这种数据存取的方式,实时性差
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2026.html