您的位置 首页 java

Spark入门详解——2

RDD 的转换与操作

对于 RDD 可以有两种计算方式:转换(返回值还是一个 RDD)与操作(返回值不是一个

RDD)。

转换(Transformations) (如: map, filter, groupBy, join 等), Transformations 操作是 Lazy 的,也就是说从一个 RDD 转换生成另一个 RDD 的操作不是马上执行, Spark 在遇到 Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有 Actions 操作的时候才会真正启动计算过程进行计算。

操作(Actions) (如: count, collect, save 等), Actions 操作会返回结果或把 RDD 数据写到存储系统中。 Actions 是触发 Spark 启动计算的动因。

下面使用一个例子来示例说明 Transformations 与 Actions 在 spark 的使用。

val sc = new SparkContext(master, “Example”, System.getenv(“SPARK_HOME”),

Seq(System.getenv(“SPARK_TEST_JAR”)))

val rdd_A = sc.textFile( hdfs ://…..)

val rdd_B = rdd_A.flatMap((line => line.split(“\s+”))).map(word => (word, 1))

val rdd_C = sc.textFile(hdfs://…..)

val rdd_D = rdd_C.map(line => (line.substring(10), 1))

val rdd_E = rdd_D.reduceByKey((a, b) => a + b)

val rdd_F = rdd_B.jion(rdd_E)

rdd_F.saveAsSequenceFile(hdfs://….)

Lineage(血统)

利用内存加快数据加载,在众多的其它的 In-Memory 类数据库或 Cache 类系统中也有实现,

Spark 的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时

采用的方案。为了保证 RDD 中数据的鲁棒性, RDD 数据集通过所谓的血统关系(Lineage)记

住了它是如何从其它 RDD 中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的

备份或者 LOG 机制, RDD 的 Lineage 记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。 当这个 RDD 的部分分区数据丢失时,它可以通过 Lineage 获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了 Spark 的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

RDD 在 Lineage 依赖方面分为两种 Narrow Dependencies 与 Wide Dependencies 用来解决数据容错的高效性。 Narrow Dependencies 是指父 RDD 的每一个分区最多被一个子 RDD 的分区所用,表现为一个父 RDD 的分区对应于一个子 RDD 的分区或多个父 RDD 的分区对应于一个子 RDD 的分区,也就是说一个父 RDD 的一个分区不可能对应一个子 RDD 的多个分区。 Wide Dependencies 是指子 RDD 的分区依赖于父 RDD 的多个分区或所有分区,也就是说存在一个父 RDD 的一个分区对应一个子 RDD 的多个分区。对与 Wide Dependencies,这种计算的输入和输出在不同的节点上, lineage 方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是 lineage,血统的意思), Narrow Dependencies 对于数据的重算开销要远小于 Wide Dependencies 的数据重算开销。

容错

在 RDD 计算,通过 checkpint 进行容错,做 checkpoint 有两种方式,一个是 checkpoint data,一个是 logging the updates。用户可以控制采用哪种方式来实现容错,默认是 logging the updates方式,通过记录跟踪所有生成 RDD 的转换(transformations)也就是记录每个 RDD 的 lineage(血统)来重新计算生成丢失的分区数据。

资源管理与作业调度

Spark 对于资源管理与作业调度可以使用 Standalone(独立模式), Apache Mesos 及 HadoopYARN 来实现。 Spark on Yarn 在 Spark0.6 时引用,但真正可用是在现在的 branch-0.8 版本。Spark on Yarn 遵循 YARN 的官方规范实现,得益于 Spark 天生支持多种 Scheduler 和 Executor 的良好设计,对 YARN 的支持也就非常容易, Spark on Yarn 的大致框架图。

编程接口

Spark 通过与 编程语言 集成的方式暴露 RDD 的操作,类似于 DryadLINQ 和 Flume Java ,每个数据集都表示为 RDD 对象,对数据集的操作就表示成对 RDD 对象的操作。 Spark 主要的编程语言是 Scala ,选择 Scala 是因为它的简洁性(Scala 可以很方便在交互式下使用)和性能(JVM 上的静态强类型语言)。

Spark 和 Hadoop MapReduce 类似,由 Master(类似于 MapReduce 的 Jobtracker)和 Workers(Spark的 Slave 工作节点)组成。用户编写的 Spark 程序被称为 Driver 程序, Dirver 程序会连接 master并定义了对各 RDD 的转换与操作,而对 RDD 的转换与操作通过 Scala 闭包 (字面量函数)来表示, Scala 使用 Java 对象来表示闭包且都是可序列化的,以此把对 RDD 的闭包操作发送到各 Workers 节点。 Workers 存储着数据分块和享有集群内存,是运行在工作节点上的守护进程,当它收到对 RDD 的操作时,根据数据分片信息进行本地化数据操作,生成新的数据分片、返回结果或把 RDD 写入存储系统。

Scala

Spark使用 Scala开发,默认使用 Scala作为编程语言。编写 Spark程序比编写 Hadoop MapReduce程序要简单的多, SparK 提供了 Spark-Shell,可以在 Spark-Shell 测试程序。写 SparK 程序的一般步骤就是创建或使用(SparkContext)实例,使用 SparkContext 创建 RDD,然后就是对 RDD进行操作。如:

val sc = new SparkContext(master, appName, [sparkHome], [jars])

val textFile = sc.textFile(“hdfs://…..”)

textFile.map(….).filter(…..)…..

Java

Spark 支持 Java 编程,但对于使用 Java 就没有了 Spark-Shell 这样方便的工具,其它与 Scala 编程是一样的,因为都是 JVM 上的语言, Scala 与 Java 可以互操作, Java 编程接口其实就是对Scala 的封装。如:

JavaSparkContext sc = new JavaSparkContext(…);

JavaRDD lines = ctx.textFile(“hdfs://…”);

JavaRDD words = lines.flatMap(

new FlatMapFunction() {

public Iterable call(String s) {

return Arrays.asList(s.split(” “));

}

}

);

python

现在 Spark 也提供了 Python 编程接口, Spark 使用 py4j 来实现 python 与 java 的互操作,从而实现使用 python 编写 Spark 程序。 Spark 也同样提供了 pyspark,一个 Spark 的 python shell,可以以交互式的方式使用 Python 编写 Spark 程序。 如:

from pyspark import SparkContext

sc = SparkContext(“local”, “Job Name”, pyFiles=[‘MyFile.py’, ‘lib.zip’, ‘app.egg’])

words = sc.textFile(“/usr/share/dict/words”)

words.filter(lambda w: w.startswith(“spar”)).take(5)

使用示例 Standalone 模式

为方便 Spark 的推广使用, Spark 提供了 Standalone 模式, Spark 一开始就设计运行于 ApacheMesos 资源管理框架上,这是非常好的设计,但是却带了部署测试的复杂性。为了让 Spark能更方便的部署和尝试, Spark 因此提供了 Standalone 运行模式,它由一个 Spark Master 和多个 Spark worker 组成,与 Hadoop MapReduce1 很相似,就连集群启动方式都几乎是一样。

以 Standalone 模式运行 Spark 集群

下载 Scala2.9.3,并配置 SCALA_HOME

下载 Spark 代码(可以使用源码编译也可以下载编译好的版本)这里下载 编译好的版本

()

解压 spark-0.7.3-prebuilt-cdh4.tgz 安装包

修改配置(conf/*) slaves: 配置工作节点的主机名 spark-env.sh:配置环境变量。

SCALA_HOME=/home/spark/scala-2.9.3

JAVA_HOME=/home/spark/jdk1.6.0_45

SPARK_MASTER_IP=spark1

SPARK_MASTER_PORT=30111

SPARK_MASTER_WEBUI_PORT=30118

SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g

SPARK_WORKER_PORT=30333

SPARK_WORKER_WEBUI_PORT=30119

SPARK_WORKER_INSTANCES=1

把 Hadoop 配置 copy 到 conf 目录下

在 master 主机上对其它机器做 ssh 无密码登录

把配置好的 Spark 程序使用 scp copy 到其它机器

在 master 启动集群

$SPARK_HOME/start-all.sh

明天更新yarn 模式 + 编写 Driver 程序+案例展示:

文章来源:智云一二三科技

文章标题:Spark入门详解——2

文章地址:https://www.zhihuclub.com/189275.shtml

关于作者: 智云科技

热门文章

网站地图