rdd,dataframe,dataset相互转换

张映 发表于 2019-12-17

分类目录: hadoop/spark/scala

标签:, ,

RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过,DataFrame与Dataset均支持sparksql的操作,rdd不支持。

1,df转rdd

val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
val df = array.toDF("id", "name", "age")
val rdd1 = df.rdd

2,ds转rdd

val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
val ds = array.toDS
val rdd2 = ds.rdd

3,df转ds

val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
val df = array.toDF("id", "name", "age")
case class tanktest(id:Int,name:String,age:Int)extends Serializable
val ds = df.as[tanktest]

4,ds转df

val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
val ds = array.toDS
val ds2df = ds.toDF

5,rdd转df

val spark = SparkSession.builder().master("local").appName("tanktest").getOrCreate()
import spark.implicits._  //implicits集成到sparksession

//方法一
val rdd = spark.sparkContext.parallelize(Seq(
(1, "tank1", 25),
(2, "zhang", 26)
))

val df = rdd.toDF("id", "name", "age")

//方法二
val schema = StructType(List(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
))

val rdd = spark.sparkContext.parallelize(Seq(
Row(1, "tank1", 25), //row
Row(2, "zhang", 26)
))

val df = spark.sqlContext.createDataFrame(rdd,schema)

6,rdd转ds

val spark = SparkSession.builder().master("local").appName("tanktest").getOrCreate()
import spark.implicits._  //implicits集成到sparksession
val rdd = spark.sparkContext.parallelize(Seq(
(1, "tank1", 25), //无row
(2, "zhang", 26)
))
rdd.toDS()


转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2324.html