Spark支持读取数据格式有文本, json ,SequenceFile(MapFile),objectFile, CSV 等格式。
1.文本文件
文本的读写操作:
sc.textFile( dir ,1)
rdd .saveAsTextFile(dir)
2.Json
a.json
{“uid”:1, “uname”:”kyrie”, “age”:19}
{“uid”:2, “uname”:”jame”, “age”:25}
val conf = new SparkConf().setAppName(“Jsontest”)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.read.json(“a.json”)
df1.select(“uid”,”uname”,”age”).show(10, false )
3.sequenceFile
3.1. 读取SequenceFile
Spark有专门用来读取SequenceFile的接口。在SparkContext中,可以调用sequenceFile(path, keyClass, valueClass, minpartit io ns),前面提及SequenceFile使用Writable类,因此keyClass和valueClass都必须使用正确的Writable类。
例:读取SequenceFile
val data=sc.sequenceFile(inFile,”org.apache. hadoop .io.Text”, “org.apache.hadoop.io.IntWritable”)
sc.sequenceFile(“/datascience/data/data-writer/adt/dmp_click/data/2017/09/26/12/05/0”,classOf[Text],classOf[BytesWritable]).map{
case (k, v) =>
val len = v.getLength
val value = new String(v.getBytes, 0, len, “UTF-8”)
k.toString -> value
}.take(100).foreach(println)
3.2. 保存SequenceFile
在 Scala 中,需要创建一个又可以写出到SequenceFile的类型构成的PairRDD,如果要保存的是Scala的原生类型,可以直接调用saveSequenceFile(path) 。如果键和值不能自动转为Writable类型,或想使用变长类型,可以对数据进行映射操作,在保存之前进行类型转换。
4.objectFile
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental * storage format and may not be supported exactly as is in future Spark releases. It will also * be pretty slow if you use the default serializer (Java serialization), * though the nice thing about it is that there's very little effort required to save arbitrary * objects.
由注释可知,保存的也是sequenceFile,key为:NullWritable,value:为BytesWritable。
sc.objectFile[ClassTag](path)
* Save this RDD as a SequenceFile of serialized objects.rdd.saveAsObjectFile(path: String)
5.csv
读取csv压缩文件或文本文件。
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.4.0" withSources()方式一: val data =sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(s"$dataInput/impression") 方式二:importcom.databricks.spark.csv._sqlContext.csvFile(s"$dataInput/impression") option说明: 1、path:解析的CSV文件的目录,路径支持通配符; 2、header:默认值是false。我们知道,CSV文件第一行一般是解释各个列的含义的名称,如果我们不需要加载这一行,我们可以将这个选项设置为true; 3、delimiter:默认情况下,CSV是使用英文逗号分隔的,如果不是这个分隔,我们就可以设置这个选项。 4、quote:默认情况下的引号是'"',我们可以通过设置这个选项来支持别的引号。 5、mode:解析的模式。默认值是PERMISSIVE,支持的选项有 (1)、PERMISSIVE:尝试解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored. (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line
6.Hadoop输入输出格式
新版的Hadoop API读入文件,newAPIHadoopFile ,写入saveAsNewAPIHadoopFile。
旧版的Hadoop API读入文件,HadoopFile ,写入saveAsHadoopFile
6.1.新接口读取文件
#文本文件
val rdd = sc.hadoopFile(“/user/yu.guan/xueyuan/1005484_1_check.tar.gz”, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
.map(p => new String(p._2.getBytes, 0, p._2.getLength, “GBK”)
.take(10)
.foreach(println)
#k-v文件
sc.newAPIHadoopFile[Text,BytesWritable,SequenceFileInputFormat[Text,BytesWritable]](path).flatMap{
case (_key,value) =>
val key = _key. toString
if( filter(key) ) {
try{
val _bitmap = BitmapUtil.fromWritable(value)
Some(key -> _bitmap)
}catch{
case e : Exception =>
println( s"${e.getMessage}")
println( s"$key\t$path")
None
}
}else None
}
6.2.新接口写入文件
new UnionRDD(sc,filterBitmaps).reduceByKey(_ or _,partition_num).map{ case (k,v) => SerializeText(k.toString).writable -> new BytesWritable(v.toBytes) }.saveAsNewAPIHadoopFile(out , classOf[Text] , classOf[BytesWritable] , classOf[SequenceFileOutputFormat[Text,BytesWritable]])
rdd1.flatMap{ case (key,bitmap) => if(!br_topKeys.value.isEmpty){ if(bitmap.cardinality > 0){ val _key = if(br_topKeys.value.contains(key)) key else "others" Some(_key -> bitmap) }else None }else Some(key -> bitmap) }.reduceByKey(_ or _).map{ case (id,bitmap) => SerializeText(id.toString) -> bitmap.toBytes }.sortByKey(true).map[(Text,BytesWritable)]{ case (k,v) => k.writable -> new BytesWritable(v) }.saveAsNewAPIHadoopFile(output,classOf[Text], classOf[BytesWritable], classOf[MapFileOutputFormat])