parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发, Parquet的灵感来自于2010年Google发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel论文中还介绍了Google如何使用这种存储格式实现并行查询的。
csv,txt是行式存储,转换过后,在查询速度提高了不少,特别是存储空间,减少了90%多。
1,把一个目录下所有文本文件,转存为到另外一个目录的parquet文件
2,测试文件
package ex
import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import com.demo.TxtChange
object T2p {
var toType, fromPath, toPath = ""
//方法一
def main2(args: Array[String]): Unit = {
//获取参数
for(i <- toType=args(i+1);
case "--from" => fromPath=args(i+1);
case "--to" => toPath=args(i+1);
case _ => "error";
}
}
//读取文件csv
val conf = new SparkConf().setAppName("txt2parquet").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("delimiter","|").load(fromPath)
.toDF("creative_id","category_name","ad_keywords","creative_type","inventory_type","gender","source","advanced_creative_title","first_industry_name","second_industry_name")
//转换
TxtChange.RDDExtensions(df).Txt2Parquet(toPath,"tank",1)
}
//方法二,推荐方法
def main(args: Array[String]): Unit = {
//获取参数
for(i <- toType=args(i+1);
case "--from" => fromPath=args(i+1);
case "--to" => toPath=args(i+1);
case _ => "error";
}
}
//定义parquet结构,推荐这种方式,可以定义字段类型
val schema = StructType(Array(
StructField("creative_id",DataTypes.LongType,false),
StructField("category_name",DataTypes.StringType,true),
StructField("ad_keywords",DataTypes.StringType,true),
StructField("creative_type",DataTypes.StringType,false),
StructField("inventory_type",DataTypes.StringType,true),
StructField("gender",DataTypes.StringType,false),
StructField("source",DataTypes.StringType,true),
StructField("advanced_creative_title",DataTypes.StringType,true),
StructField("first_industry_name",DataTypes.StringType,false),
StructField("second_industry_name",DataTypes.StringType,true)
))
//读取文件csv
val spark = SparkSession.builder().master("local[2]").appName("txt3parquet").getOrCreate()
val df = spark.read.schema(schema).option("delimiter","|").csv(fromPath)
//转换
TxtChange.RDDExtensions(df).Txt2Parquet(toPath,"tank",3)
}
}
3,测试类
package com.demo
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql
object TxtChange {
implicit class RDDExtensions(val df: sql.DataFrame) extends AnyVal {
def Txt2Parquet(path: String,name: String, num: Int): Unit = {
//df转rdd
val rdd: RDD[String] = df.rdd.map(x=>x.toString())
//临时文件夹
val path1 = s"$path/$name";
//转换
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
if(num>0){
df.repartition(num).write.option("compression","snappy").parquet(path1)
}else{
df.repartition(2).write.format("parquet").option("compression","snappy").save(path1)
}
//copy 文件并且临时文件夹
val pt = new Path(s"$path1/part*.parquet")
val fileStatus = hdfs.globStatus(pt)
val files = FileUtil.stat2Paths(fileStatus)
for(i <- files){
FileUtil.copy(
hdfs,i,
hdfs,new Path(path),
false,rdd.sparkContext.hadoopConfiguration
)
hdfs.delete(new Path(s"$path/."+i.getName+".crc"), true)
}
hdfs.delete(new Path(path1), true)
}
}
}
4,debug配置
这样就可以愉快的本地debug。
本人不太喜欢在博客里面贴代码,看起来费时间。因为有网友私下问了我,这些问题,才贴出来。
看到这儿,您肯定在想,转换后的.parquet文件能用吗?下篇来讲这个问题。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2319.html

