scala csv txt 转 parquet 的二种方法

张映 发表于 2019-12-13

分类目录: hadoop/spark/scala

标签:, , ,

parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发, Parquet的灵感来自于2010年Google发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel论文中还介绍了Google如何使用这种存储格式实现并行查询的。

csv,txt是行式存储,转换过后,在查询速度提高了不少,特别是存储空间,减少了90%多。

1,把一个目录下所有文本文件,转存为到另外一个目录的parquet文件

2,测试文件

package ex

import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import com.demo.TxtChange

object T2p {

    var  toType, fromPath, toPath = ""

    //方法一
    def main2(args: Array[String]): Unit = {

        //获取参数
        for(i <- toType=args(i+1);
                case "--from" => fromPath=args(i+1);
                case "--to" => toPath=args(i+1);
                case _ => "error";
            }
        }

        //读取文件csv
        val conf = new SparkConf().setAppName("txt2parquet").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val df = sqlContext.read.format("com.databricks.spark.csv").option("delimiter","|").load(fromPath)
            .toDF("creative_id","category_name","ad_keywords","creative_type","inventory_type","gender","source","advanced_creative_title","first_industry_name","second_industry_name")

        //转换
        TxtChange.RDDExtensions(df).Txt2Parquet(toPath,"tank",1)

    }

    //方法二,推荐方法
    def main(args: Array[String]): Unit = {

        //获取参数
        for(i <- toType=args(i+1);
                case "--from" => fromPath=args(i+1);
                case "--to" => toPath=args(i+1);
                case _ => "error";
            }
        }

        //定义parquet结构,推荐这种方式,可以定义字段类型
        val schema = StructType(Array(
            StructField("creative_id",DataTypes.LongType,false),
            StructField("category_name",DataTypes.StringType,true),
            StructField("ad_keywords",DataTypes.StringType,true),
            StructField("creative_type",DataTypes.StringType,false),
            StructField("inventory_type",DataTypes.StringType,true),
            StructField("gender",DataTypes.StringType,false),
            StructField("source",DataTypes.StringType,true),
            StructField("advanced_creative_title",DataTypes.StringType,true),
            StructField("first_industry_name",DataTypes.StringType,false),
            StructField("second_industry_name",DataTypes.StringType,true)
        ))

        //读取文件csv
        val spark = SparkSession.builder().master("local[2]").appName("txt3parquet").getOrCreate()
        val df = spark.read.schema(schema).option("delimiter","|").csv(fromPath)

        //转换
        TxtChange.RDDExtensions(df).Txt2Parquet(toPath,"tank",3)

    }
}

3,测试类

package com.demo

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql

object TxtChange {

    implicit class RDDExtensions(val df: sql.DataFrame) extends AnyVal {

        def Txt2Parquet(path: String,name: String, num: Int): Unit = {

            //df转rdd
            val rdd: RDD[String] = df.rdd.map(x=>x.toString())

            //临时文件夹
            val path1 = s"$path/$name";

            //转换
            val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
            if(num>0){
                df.repartition(num).write.option("compression","snappy").parquet(path1)
            }else{
                df.repartition(2).write.format("parquet").option("compression","snappy").save(path1)
            }

            //copy 文件并且临时文件夹
            val pt = new Path(s"$path1/part*.parquet")
            val fileStatus = hdfs.globStatus(pt)
            val files = FileUtil.stat2Paths(fileStatus)
            for(i <- files){
                FileUtil.copy(
                    hdfs,i,
                    hdfs,new Path(path),
                    false,rdd.sparkContext.hadoopConfiguration
                )
                hdfs.delete(new Path(s"$path/."+i.getName+".crc"), true)
            }

            hdfs.delete(new Path(path1), true)
        }
    }
}

4,debug配置

scala csv转parquet debug配置

scala csv转parquet debug配置

这样就可以愉快的本地debug。

csv转parquet后,空间占用减少很多

csv转parquet后,空间占用减少很多

本人不太喜欢在博客里面贴代码,看起来费时间。因为有网友私下问了我,这些问题,才贴出来。

看到这儿,您肯定在想,转换后的.parquet文件能用吗?下篇来讲这个问题。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2319.html