spark 创建 dataframe

张映 发表于 2019-01-04

分类目录: hadoop/spark

标签:, , ,

dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。

一,启动spark-shell

spark-shell --master yarn

二,创建SparkSession

scala> import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession

scala> val spark = SparkSession.builder().appName("Spark SQL basic example").enableHiveSupport().getOrCreate()
2019-01-04 11:44:21 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2565b097

三,toDF方法创建dataframe

1,将seq序列转换成dataframe

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq(
 | (1, "tank", 25),
 | (2, "zhang", 26)
 | ).toDF("id", "name", "age")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1| tank| 25|
| 2|zhang| 26|
+---+-----+---+

2,列表转换成dataframe

scala> val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
array: List[(Int, String, Int)] = List((1,tank1,25), (2,tank2,26), (3,tank3,29))

scala> val df1 = array.toDF("id", "name", "age")
df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df1.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

四,通过createDataFrame创建dataframe

scala> val schema = StructType(List(
 | StructField("id", IntegerType, nullable = false),
 | StructField("name", StringType, nullable = true),
 | StructField("age", IntegerType, nullable = true),
 | StructField("birthday", DateType, nullable = true)
 | ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(birthday,DateType,true))

scala> val rdd = spark.sparkContext.parallelize(Seq(
 | Row(1, "tank1", 25, java.sql.Date.valueOf("1982-07-07")),
 | Row(2, "zhang", 26, java.sql.Date.valueOf("1983-02-19"))
 | ))
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[107] at parallelize at <console>:39

scala> val df2 = spark.createDataFrame(rdd, schema)
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df2.show()
+---+-----+---+----------+
| id| name|age| birthday|
+---+-----+---+----------+
| 1|tank1| 25|1982-07-06|
| 2|zhang| 26|1983-02-18|
+---+-----+---+----------+

创建的dataframe,以及里面的数据,是没有落地的

五,存储数据到hdfs

1,加载存储包

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

2,不分区存储

scala> df.write.parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 2|zhang| 26|
| 1| tank| 25|
+---+-----+---+
spark dataframe 不分区

spark dataframe 不分区

3,根据id分区存储

scala> df2.write.partitionBy("id").mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1");
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+
spark dataframe 分区存储

spark dataframe 分区存储

4,数据的追加

scala> df1.write.mode(SaveMode.Append).parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test2 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test2.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|zhang| 26|
| 1| tank| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

5,数据的修改,分区的dataframe才有效

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+

scala> val df4 = Seq(
 | (2, "zhangying", 37, java.sql.Date.valueOf("1986-11-11"))
 | ).toDF("id", "name", "age","birthday")
df4: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df4.write.mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1/id=2");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1")
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+---------+---+----------+---+
| name|age| birthday| id|
+---------+---+----------+---+
|zhangying| 37|1986-11-11| 2|
| tank1| 25|1982-07-06| 1|
+---------+---+----------+---+

对数据处理操作非常的多,这里只是简单介绍。这种数据存取的方式,实时性差



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2026.html

留下评论

留下评论
  • (必需)
  • (必需) (will not be published)
  • (必需)   5X1=?