您的位置 首页 java

Spark中读写不同类型文件

Spark支持读取数据格式有文本, json ,SequenceFile(MapFile),objectFile, CSV 等格式。

1.文本文件

文本的读写操作:

sc.textFile( dir ,1)

rdd .saveAsTextFile(dir)

2.Json

a.json

{“uid”:1, “uname”:”kyrie”, “age”:19}

{“uid”:2, “uname”:”jame”, “age”:25}

val conf = new SparkConf().setAppName(“Jsontest”)

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val df1 = sqlContext.read.json(“a.json”)

df1.select(“uid”,”uname”,”age”).show(10, false )

3.sequenceFile

3.1. 读取SequenceFile

Spark有专门用来读取SequenceFile的接口。在SparkContext中,可以调用sequenceFile(path, keyClass, valueClass, minpartit io ns),前面提及SequenceFile使用Writable类,因此keyClass和valueClass都必须使用正确的Writable类。

例:读取SequenceFile

val data=sc.sequenceFile(inFile,”org.apache. hadoop .io.Text”, “org.apache.hadoop.io.IntWritable”)

sc.sequenceFile(“/datascience/data/data-writer/adt/dmp_click/data/2017/09/26/12/05/0”,classOf[Text],classOf[BytesWritable]).map{

case (k, v) =>

val len = v.getLength

val value = new String(v.getBytes, 0, len, “UTF-8”)

k.toString -> value

}.take(100).foreach(println)

3.2. 保存SequenceFile

Scala 中,需要创建一个又可以写出到SequenceFile的类型构成的PairRDD,如果要保存的是Scala的原生类型,可以直接调用saveSequenceFile(path) 。如果键和值不能自动转为Writable类型,或想使用变长类型,可以对数据进行映射操作,在保存之前进行类型转换。

4.objectFile

* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
* BytesWritable values that contain a serialized partition. This is still an experimental
* storage format and may not be supported exactly as is in future Spark releases. It will also
* be pretty slow if you use the default serializer (Java serialization),
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects. 
由注释可知,保存的也是sequenceFile,key为:NullWritable,value:为BytesWritable。 
sc.objectFile[ClassTag](path) 
* Save this RDD as a SequenceFile of serialized objects.rdd.saveAsObjectFile(path: String) 

5.csv

读取csv压缩文件或文本文件。

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.4.0" withSources()方式一: val data =sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(s"$dataInput/impression")
方式二:importcom.databricks.spark.csv._sqlContext.csvFile(s"$dataInput/impression")
option说明:
  1、path:解析的CSV文件的目录,路径支持通配符;
  2、header:默认值是false。我们知道,CSV文件第一行一般是解释各个列的含义的名称,如果我们不需要加载这一行,我们可以将这个选项设置为true;
  3、delimiter:默认情况下,CSV是使用英文逗号分隔的,如果不是这个分隔,我们就可以设置这个选项。
  4、quote:默认情况下的引号是'"',我们可以通过设置这个选项来支持别的引号。
  5、mode:解析的模式。默认值是PERMISSIVE,支持的选项有
    (1)、PERMISSIVE:尝试解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.
    (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected
    (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line 

6.Hadoop输入输出格式

新版的Hadoop API读入文件,newAPIHadoopFile ,写入saveAsNewAPIHadoopFile。

旧版的Hadoop API读入文件,HadoopFile ,写入saveAsHadoopFile

6.1.新接口读取文件

#文本文件

val rdd = sc.hadoopFile(“/user/yu.guan/xueyuan/1005484_1_check.tar.gz”, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)

.map(p => new String(p._2.getBytes, 0, p._2.getLength, “GBK”)

.take(10)

.foreach(println)

#k-v文件

sc.newAPIHadoopFile[Text,BytesWritable,SequenceFileInputFormat[Text,BytesWritable]](path).flatMap{
case (_key,value) =>
val key = _key. toString 
if( filter(key) ) {
try{
val _bitmap = BitmapUtil.fromWritable(value)
Some(key -> _bitmap)
}catch{
case e : Exception =>
println( s"${e.getMessage}")
println( s"$key\t$path")
None
}
}else None
} 

6.2.新接口写入文件

new UnionRDD(sc,filterBitmaps).reduceByKey(_ or _,partition_num).map{
case (k,v) => SerializeText(k.toString).writable -> new BytesWritable(v.toBytes)
}.saveAsNewAPIHadoopFile(out , classOf[Text] , classOf[BytesWritable] , classOf[SequenceFileOutputFormat[Text,BytesWritable]]) 
rdd1.flatMap{
case (key,bitmap) =>
if(!br_topKeys.value.isEmpty){
if(bitmap.cardinality > 0){
val _key = if(br_topKeys.value.contains(key)) key else "others"
Some(_key -> bitmap)
}else None
}else Some(key -> bitmap)
}.reduceByKey(_ or _).map{
case (id,bitmap) => SerializeText(id.toString) -> bitmap.toBytes
}.sortByKey(true).map[(Text,BytesWritable)]{
case (k,v) => k.writable -> new BytesWritable(v)
}.saveAsNewAPIHadoopFile(output,classOf[Text], classOf[BytesWritable], classOf[MapFileOutputFormat]) 

文章来源:智云一二三科技

文章标题:Spark中读写不同类型文件

文章地址:https://www.zhihuclub.com/189288.shtml

关于作者: 智云科技

热门文章

网站地图