parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发, Parquet的灵感来自于2010年Google发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel论文中还介绍了Google如何使用这种存储格式实现并行查询的。
csv,txt是行式存储,转换过后,在查询速度提高了不少,特别是存储空间,减少了90%多。
1,把一个目录下所有文本文件,转存为到另外一个目录的parquet文件
2,测试文件
package ex import org.apache.spark.{SparkConf, SparkContext, sql} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import com.demo.TxtChange object T2p { var toType, fromPath, toPath = "" //方法一 def main2(args: Array[String]): Unit = { //获取参数 for(i <- toType=args(i+1); case "--from" => fromPath=args(i+1); case "--to" => toPath=args(i+1); case _ => "error"; } } //读取文件csv val conf = new SparkConf().setAppName("txt2parquet").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.csv").option("delimiter","|").load(fromPath) .toDF("creative_id","category_name","ad_keywords","creative_type","inventory_type","gender","source","advanced_creative_title","first_industry_name","second_industry_name") //转换 TxtChange.RDDExtensions(df).Txt2Parquet(toPath,"tank",1) } //方法二,推荐方法 def main(args: Array[String]): Unit = { //获取参数 for(i <- toType=args(i+1); case "--from" => fromPath=args(i+1); case "--to" => toPath=args(i+1); case _ => "error"; } } //定义parquet结构,推荐这种方式,可以定义字段类型 val schema = StructType(Array( StructField("creative_id",DataTypes.LongType,false), StructField("category_name",DataTypes.StringType,true), StructField("ad_keywords",DataTypes.StringType,true), StructField("creative_type",DataTypes.StringType,false), StructField("inventory_type",DataTypes.StringType,true), StructField("gender",DataTypes.StringType,false), StructField("source",DataTypes.StringType,true), StructField("advanced_creative_title",DataTypes.StringType,true), StructField("first_industry_name",DataTypes.StringType,false), StructField("second_industry_name",DataTypes.StringType,true) )) //读取文件csv val spark = SparkSession.builder().master("local[2]").appName("txt3parquet").getOrCreate() val df = spark.read.schema(schema).option("delimiter","|").csv(fromPath) //转换 TxtChange.RDDExtensions(df).Txt2Parquet(toPath,"tank",3) } }
3,测试类
package com.demo import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.spark.sql object TxtChange { implicit class RDDExtensions(val df: sql.DataFrame) extends AnyVal { def Txt2Parquet(path: String,name: String, num: Int): Unit = { //df转rdd val rdd: RDD[String] = df.rdd.map(x=>x.toString()) //临时文件夹 val path1 = s"$path/$name"; //转换 val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) if(num>0){ df.repartition(num).write.option("compression","snappy").parquet(path1) }else{ df.repartition(2).write.format("parquet").option("compression","snappy").save(path1) } //copy 文件并且临时文件夹 val pt = new Path(s"$path1/part*.parquet") val fileStatus = hdfs.globStatus(pt) val files = FileUtil.stat2Paths(fileStatus) for(i <- files){ FileUtil.copy( hdfs,i, hdfs,new Path(path), false,rdd.sparkContext.hadoopConfiguration ) hdfs.delete(new Path(s"$path/."+i.getName+".crc"), true) } hdfs.delete(new Path(path1), true) } } }
4,debug配置
这样就可以愉快的本地debug。
本人不太喜欢在博客里面贴代码,看起来费时间。因为有网友私下问了我,这些问题,才贴出来。
看到这儿,您肯定在想,转换后的.parquet文件能用吗?下篇来讲这个问题。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2319.html