RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过,DataFrame与Dataset均支持sparksql的操作,rdd不支持。
1,df转rdd
val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29)) val df = array.toDF("id", "name", "age") val rdd1 = df.rdd
2,ds转rdd
val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29)) val ds = array.toDS val rdd2 = ds.rdd
3,df转ds
val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29)) val df = array.toDF("id", "name", "age") case class tanktest(id:Int,name:String,age:Int)extends Serializable val ds = df.as[tanktest]
4,ds转df
val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29)) val ds = array.toDS val ds2df = ds.toDF
5,rdd转df
val spark = SparkSession.builder().master("local").appName("tanktest").getOrCreate() import spark.implicits._ //implicits集成到sparksession //方法一 val rdd = spark.sparkContext.parallelize(Seq( (1, "tank1", 25), (2, "zhang", 26) )) val df = rdd.toDF("id", "name", "age") //方法二 val schema = StructType(List( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true) )) val rdd = spark.sparkContext.parallelize(Seq( Row(1, "tank1", 25), //row Row(2, "zhang", 26) )) val df = spark.sqlContext.createDataFrame(rdd,schema)
6,rdd转ds
val spark = SparkSession.builder().master("local").appName("tanktest").getOrCreate() import spark.implicits._ //implicits集成到sparksession val rdd = spark.sparkContext.parallelize(Seq( (1, "tank1", 25), //无row (2, "zhang", 26) )) rdd.toDS()
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2324.html