spark python 连接 spark物理内存不够大怎么办

在数据挖掘中,Python和Scala语言都是极受欢迎的,本文总结两种语言在Spark环境各自特点。
本文翻译自& https://www.dezyre.com/article/Scala-vs-Python-for-apache-Spark/213
1.性能对比
由于Scala是基于JVM的数据分析和处理,Scala比Python快10倍。当编写Python代码用且调用Spark库时,性能是平庸的,但如果程序涉及到比Python编码还要多的处理时,则要比Scala等效代码慢得多。Python解释器PyPy内置一个JIT(及时)编译器,它很快,但它不提供各种Python C扩展支持。在这样的情况下,对库的C扩展CPython解释器优于PyPy解释器。
使用Python在Spark的性能开销超过Scala,但其重要性取决于您在做什么。当内核数量较少时,Scala比Python快。随着核数的增加,Scala的性能优势开始缩小。
当大量的处理其工作时,性能不是选择编程语言的主要驱动因素。然而,当有重要的处理逻辑时,性能是一个主要因素,Scala绝对比Python提供更好的性能,用于针对Spark程序。
2.学习曲线
在用Scala语言编写Spark程序时有几个语法糖,所以大数据专业人员在学习Spark时需要非常小心。程序员可能会发现Scala语法有时会让人发疯。Scala中的一些库很难定义随机的符号运算符,而这些代码可以由没有经验的程序员理解。在使用Scala时,开发人员需要关注代码的可读性。与Scala相比,Java或Python是一个灵活的语法复杂的语言。对Scala开发人员的需求越来越大,因为大数据公司重视能在Spark中掌握数据分析和处理的高效而健壮的开发人员。
Python是为Java程序员学习相对容易的因为它的语法和标准库。然而,Python是不是一个高度并行和可扩展的像SoundCloud或推特系统的理想选择。
学习Scala丰富了程序员对类型系统中各种新抽象的认识,新的函数编程特性和不可变数据。
大数据系统的复杂多样的基础结构需要一种编程语言,它有能力集成多个数据库和服务。在大数据的生态系统中,Scala胜在Play框架提供了许多异步库和容易集成的各种并发原语,比如Akka。Scala使开发人员编写高效的、可读性和可维护性的服务而不是。相反,Python不支持的重量级进程并行在用uWSGI时,但它不支持真正的多线程。
当使用Python写Spark程序时,不管进程有多少线程,每次只有一个CPU在Python进程中处于活动状态。这有助于每个CPU核心只处理一个进程,但糟糕的是,每当部署新代码时,需要重新启动更多的进程,还需要额外的内存开销。Scala在这些方面更高效,更容易共事。
4.类型安全
当用Spark编程时,开发人员需要根据变化的需求不断地重新编码代码。Scala是静态类型语言,尽管它看起来像一种动态类型语言,因为它具有优雅的类型推断机制。作为静态类型语言,Scala仍然提供编译器来捕获编译时错误。
重构像Scala这样的静态类型语言的程序代码比重构像Python这样的动态语言代码要容易得多且简单。开发人员在修改Python程序代码后常常会遇到困难,因为它造成的bug比修复程序原有的bug要多。所以最好是缓慢而安全地使用Scala,而不是快速的、死地使用Python。
对于小型的特殊实验,Python是一种有效的选择,但它并不像静态语言那样有效地扩展到大型软件工程中。
Scala和Python语言在Sparkcontext中有同样的表达,因此通过使用Scala或Python可以实现所需的功能。无论哪种方式,程序员都会创建一个Sparkcontext并调用函数。Python是一种比Scala更便于用户使用的语言。Python不那么冗长,开发人员很容易用Python编写脚本来调用Spark。易用性是一个主观因素,因为它取决于程序员的个人偏好。
6.高级特性
Scala编程语言有几个存在类型、宏和隐式。Scala的晦涩难懂的语法可能很难对开发人员可能无法理解的高级特性进行实验。然而,Scala的优势在于在重要的框架和库中使用这些强大的特性。
话虽如此,Scala没有足够的数据科学工具和库,如Python用于机器学习和自然语言处理。Sparkmlib–机器学习库只有较少的ML算法但他们是理想的大数据处理。Scala缺乏良好的可视化和本地数据转换。Scala无疑是Spark streaming特性的最佳选择,因为Python 通过pySpark 调用Spark.streaming不像Scala那样先进和成熟。
“Scala速度更快,使用方便 但上手难,而Python则较慢,但很容易使用。”
Spark框架是用Scala编写的,所以了解Scala编程语言有助于大数据开发人员轻松地挖掘源代码,如果某些功能不能像预期的那样发挥作用。使用Python增加了更多问题和bug的可能性,因为2种不同语言之间的转换是困难的。为Spark使用Scala提供对Spark框架的最新特性的访问,因为它们首先在Scala中可用,然后移植到Python中。
根据Spark决定Scala和Python取决于最适合项目需要的特性,因为每种语言都有自己的优点和缺点。在使用Apache Spark编程语言之前,开发者必须学习Scala和Python来熟悉它们的特性。学习了Python和Scala之后,决定何时使用Scala来Spark以及何时使用Python来调用Spark是相当容易的。Apache Spark编程语言的选择完全取决于要解决的问题。
阅读(...) 评论()Spark入门 (针对Python)【python吧】_百度贴吧
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&签到排名:今日本吧第个签到,本吧因你更精彩,明天继续来努力!
本吧签到人数:0成为超级会员,使用一键签到本月漏签0次!成为超级会员,赠送8张补签卡连续签到:天&&累计签到:天超级会员单次开通12个月以上,赠送连续签到卡3张
关注:187,547贴子:
Spark入门 (针对Python)收藏
概述经过多年来开拓性的工作,UC Berkeley AMP Lab开发了Spark。它使用分布式内存数据结构,提高了数据处理的速度,在大多数工作上优于Haddop。本文用一个真实的数据集,展示Spark的结构,以及基本的转换(transformations)与行动(actions)。如果你想尝试编写和运行自己的Spark代码,可以到Dataquest试试本教程的(英文)互动版本。弹性分布式数据集(RDD)Spark的核心结构是RDD,全称“弹性分布式数据集”(resilient distributed dataset)。从名字即可看出,RDD是Spark里的数据集,分布于RAM或内存,或许多机器中。 一个RDD对象本质是多个元素的组合。它可以是包含多个元素(元组、列表、字典等)的列表。你可以把数据集载入为RDD,然后运行此RDD对象可用的方法(methods),就像Pandas里的数据框(DataFrames)。PySparkSpark是用Scala语言写成的,Scala把要编译的东西编译为Java虚拟机(JVM)的字节码(bytecode)。Spark的开源社区开发了一个叫PySpark的工具库。它允许使用者用Python处理RDD。这多亏了一个叫Py4J的库,它让Python可以使用JVM的对象(比如这里的RDD)。开始操作之前,先把一个包含《每日秀》(the Daily Show)所有来宾的数据集载入为RDD。这里用的数据集是FiveThirtyEight’s dataset的tsv版本。TSV文件是由 “\t” 分隔的数据文件,不同于像CSV文件用逗号 “,” 分隔。raw_data = sc.textFile(&daily_show.tsv&)raw_data.take(5)['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List', '1999\tactor\t1/11/99\tActing\tMichael J. Fox', '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard', '1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman', '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson']SparkContextSparkContext 是管理Spark里的集群(cluster)和协调集群运行进程的对象。SparkContext与集群的manager相连。Manager负责管理运行具体运算的执行者。下面一幅图来自Spark官方文档,能更好地展示这个结构:SparkContext对象通常以变量sc的形式被引用。运行:raw_data = sc.textFile(&daily_show.tsv&)把TSV数据集载入为 RDD对象raw_data 。这个RDD对象类似一个包含许多字符串对象(string objects)的列表,数据集中每一行是一个字符串对象。之后,使用 take() 方法打印出前五个元素:raw_data.take(5)take(n) 返回RDD的前n个元素。想了解更多RDD可用的方法,可查阅PySpark的官方文档。惰性计算(Lazy Evaluation)你可能会问:如果RDD与Python列表相似,那为什么不使用括号直接获取RDD里的元素?这是因为RDD对象分布于很多个部分,我们无法对其进行列表的标准操作,而且RDD本身就是为了处理分布式数据开发的。RDD抽象方式的优势是可以让Spark在本地计算机运行。在本地运行时,Spark把本地计算机的内存划分为很多部分,以模拟在许多机器上进行计算的情境,所以在本地运行时也无需改动代码。Spark RDD的另一个优点是代码的惰性计算(lazily evaluate)。Spark把一个计算拖延到不得不运行的时候。在上面的代码中,直到运行 raw_data.take(5) ,Spark才把TSV文件载入RDD。当raw_data = sc.textFile(“dail_show.tsv”) 被调用时,创建了一个指向此文件的指针。但只有当raw_data.take(5) 需要此文件时,文件才真正被读取进raw_data。本教程以及未来的讲解中会出现更多惰性计算的例子。流水线(Pipelines)Spark大量借用了Hadoop的Map-Reduce模式,但许多地方与Hadoop不同。如果你有使用Hadoop和传统Map-Reduce的经验,Cloudera有一篇很棒的文章探讨这些差异。如果你从没使用过Map-Reduce 或 Hadoop也不用担心,本教程会介绍需要了解的概念。使用Spark时,需要理解的核心概念是数据流水线(data pipelining)。Spark里的每个运算/计算本质是都是一系列步骤(step)。这些步骤能被连在一起,按顺序运行,形成一个流水线。流水线中的每个步骤返回一个Python值(例如Integer),一个Python数据结构(例如字典),或者一个RDD对象。首先,我们来看map() 函数。Map()map(f) 把函数f应用于RDD的每个元素。因为RDD是可迭代的(像多数Python对象一样),Spark每次迭代都运行f,之后返回一个新RDD。为了便于理解,这里示范一个使用map 的例子。如果你观察仔细,就会发现 raw_data 目前的格式并不利于后续工作。现在每个元素都是一个字符串。为了便于管理,我们要把每个元素都转换成一个列表。Python的传统做法是:使用for循环在集合中迭代把每个字符串根据分隔符断开把结果储存为一个列表下面展示在Spark中使用map实现这个任务的方法。在稍后的一段代码中,我们要:调用RDD的map()函数,把括号里的内容应用于数据集的每一行。写一个lambda函数,根据分隔符”\t”把字符串分开,把结果存储为叫做daily_show的RDD。在daily_show 上,调用RDD的take()函数,显示前五个元素(行)。map(f) 函数是一个用于转换的步骤。需要提供给它一个命名过的函数或lambda函数。代码及输出如下:daily_show = raw_data.map(lambda line: line.split('\t'))daily_show.take(5)[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'], ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'], ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'], ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'], ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]Python与Scala,永远的好朋友我们习惯了用Python写出任务的逻辑。PySpark众多的优点之一,是可以把逻辑和具体的数据转换分开。在上面的代码中,我们用Python写了一个lambda函数:raw_data.map(lambda: line(line.split('\t')))而当这段代码运行于RDD时,又利用了Scala的优势。这就是PySpark的力量。尽管没有学习任何关于Scala的知识,我们还是利用了Spark的Scala结构在数据处理上的优异表现。更棒的是,当我们运行:daily_show.take(5)返回的结果还是对Python友好的格式。转换与行动Spark里有两类方法:转换(Transformations) - map(), reduceByKey()行动(Actions) - take(), reduce(), saveAsTextFile(), collect()转换是惰性运算,总是返回对一个RDD对象的引用。不过,直到某个行动需要使用转换过的RDD,转换才会运行。任何返回RDD的函数都是转换,任何返回某个值的函数都是行动。在你实现本教程并尝试写PySpark代码的过程中,这些概念会变得更加清晰。不可变你可能会觉得奇怪:为什么不直接拆分每个字符串,而是要新建一个叫做daily_show的新对象?在Python中,可以直接逐个对集合里的元素进行修改,而不必返回或指派新对象。RDD对象是不可变的。一旦对象被创建,它们的值就无法再变化。在Python里,列表和字典是可变的,这意味着我们可以改变这些对象的值,而元组是不可变的。在Python中修改一个元组对象,唯一方法就是创造一个包含所需改动的新元组。Spark利用RDD不可变的性质来提升速度,具体的原理超出本教程的讨论范围。ReduceByKey()我们想要对《每日秀》每年的来宾数目进行统计。在Python中,如果daily_show 是一个列表,其中包含多个列表,下面的一段代码可以实现我们的目的:tally = dict()for line in daily_show:
year = line[0]
if year in tally.keys():
tally[year] = tally[year] + 1
tally[year] = 1tally 的每个键(key)会是唯一的,而值(value)是数据集中包含key的行数。如果想用Spark获得相同结果,需要使用Map 步骤,接ReduceByKey步骤。tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)print(tally)PythonRDD[156] at RDD at PythonRDD.scala:43解释你可能注意到了,打印tally 并没有像我们希望的那样返回统计数值。这是由于惰性计算的缘故。PySpark推迟map 和reduceByKey 的执行,直到需要使用它们(结果)的时候。在使用take() 预览tally 的前几个元素之前,先来过一遍上面的代码:daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y)在map 步骤,我们使用了一个lambda函数,用来创建一个元组其中包含:键: x[0], 列表的第一个值值: 1, 整数这里的策略是创建一个元组,其中包含年份作键,取值为1。在运行map 之后,Spark 会在内存中保留一个类似下列形式的,由多个元组构成的列表:('YEAR', 1)('1991', 1)('1991', 1)('1991', 1)('1991', 1)...而我们想把这些化简为:('YEAR', 1)('1991', 4)...reduceByKey(f) 允许我们用函数f,将键相同的元组合并。使用take 命令查看上面两个步骤的结果。take的作用是强迫惰性代码立即执行。由于 tally 是RDD,我们无法使用Python的len 函数来计算数目,而是要用RDD的 count() 函数。tally.take(tally.count())[('YEAR', 1), ('2013', 166), ('2001', 157), ('2004', 164), ('2000', 169), ('2015', 100), ('2010', 165), ('2006', 161), ('2014', 163), ('2003', 166), ('2002', 159), ('2011', 163), ('2012', 164), ('2008', 164), ('2007', 141), ('2005', 162), ('1999', 166), ('2009', 163)]Filter与Pandas不同的是,Spark无法识别首行是标题,也没有拿掉这些标题。我们需要想个办法从集合中去掉这个元素:('YEAR', 1)你可能会试着从RDD里直接去掉这个元素,但请注意RDD是不可变的对象,一旦被创建就无法更改。去掉这个元组的唯一方法,就是创建一个不包含此元组的RDD对象。Spark有一个filter(f) 函数。此函数允许我们根据现存的RDD创建一个新的RDD,新RDD中只包含符合要求的元素。定义一个只返回二元值True 或 False函数 f 。只有True 对应的项目会出现在最终的RDD中。更多有关filter函数的内容可见Spark官方文档。def filter_year(line):
if line[0] == 'YEAR':
return False
return Truefiltered_daily_show = daily_show.filter(lambda line: filter_year(line))大家一起来为了展示Spark的强大,这一节示范如何把一系列数据转换连成一个流水线,并观察Spark在后台处理一切。Spark在编写时就意图为这个目的服务,而且为处理连续任务进行了高度优化。以前用 Hadoop连续处理大量任务非常耗时。这是因为实时产生的结果都需要被写入硬盘,而且Hadoop 根本没意识到完整流水线的重要性(如果你对此感到好奇,可以从这个网址了解更多)。感谢Spark嚣张的内存使用方式(只把硬盘用作备份和特殊任务)以及建构合理的内核。与Hadoop相比,Spark可以大大节省周转时间。在下面一段代码中,我们进行一系列操作:筛掉没有职业的来宾,把每个职业名称变为小写,统计各个职业,并输出统计结果的前五项。filtered_daily_show.filter(lambda line: line[1] != '') \
.map(lambda line: (line[1].lower(), 1)) \
.reduceByKey(lambda x,y: x+y) \
.take(5)[('radio personality', 3), ('television writer', 1), ('american political figure', 2), ('former united states secretary of state', 6), ('mathematician', 1)]后续希望本教程激发了你对Spark的兴趣,并掌握了如何用PySpark编写我们熟悉的Python代码,同时利用分布式处理的优势。在涉及更大数据集的工作中,PySpark会大放光芒,因为它模糊了数据科学在“本地计算机”与“大型在线分布式计算(也被称作云)”中的界限。如果你喜欢本教程,可以去Dataquest 阅读下一章节(英文版),下一章会进一步讲解Spark的转换与行动。在学习数据科学的过程中我发现了一个在线学习的好网站 ,里面课程免费哒,课程按知识点编排的,很新颖,推荐给大家。
登录百度帐号喜爱机器学习
spark机器学习笔记:(一)Spark Python初探
声明:版权所有,转载请联系作者并注明出处
博主简介:风雪夜归子(英文名:Allen),机器学习算法攻城狮,喜爱钻研Meachine Learning的黑科技,对Deep Learning和Artificial Intelligence充满兴趣,经常关注Kaggle数据挖掘竞赛平台,对数据、Machine Learning和Artificial Intelligence有兴趣的童鞋可以一起探讨哦,个人CSDN博客:
本系列的博文主要来源于《用spark机器学习》这本书的读书笔记,如果对scala熟悉,建议看原书,原书绝大多数代码都是采用scala书写的,如果对Python比较熟悉,也许本系列笔记对你有所帮助,为了各位童鞋看原书的方便,本系列的博文基本上保持了变量名与原书一致。
Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。从这方面说,它与Apache Hadoop等分布式处理框架类似。但在底层架构上,Spark与它们有所不同。
Spark起源于加利福利亚大学伯克利分校的一个研究项目。学校当时关注分布式机器学习算法的应用情况。因此,Spark从一开始便为应对迭代式应用的高性能需求而设计。在这类应用中,相同的数据会被多次访问。该设计主要靠利用数据集内存缓存以及启动任务时的低延迟和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表明了其实用性。
本系列的spark笔记主要采用Python语言。
Spark支持四种运行模式。
? 本地单机模式:所有Spark进程都运行在同一个Java虚拟机(Java Vitural Machine,JVM)中。
? 集群单机模式:使用Spark自己内置的任务调度框架。
? 基于Mesos:Mesos是一个流行的开源集群计算框架。
? 基于YARN:即Hadoop 2,它是一个与Hadoop关联的集群计算和资源调度框架。
1. Spark 编程模型
在对Spark的设计进行更全面的介绍前,我们先介绍SparkContext对象以及Spark shell。后面将通过它们来了解Spark编程模型的基础知识。
1.1 SparkContext类与SparkConf类
任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,我们便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。若要用Scala代码来实现的话,可参照下面的代码:
val conf = new SparkConf().setAppName("Test Spark App").setMaster("local[4]")
val sc = new SparkContext(conf)
这段代码会创建一个4线程的SparkContext对象,并将其相应的任务命名为Test SparkAPP。我们也可通过如下方式调SparkContext的简单构造函数,以默认的参数值来创建相应的对象。其效果和上述的完全相同:
val sc = new SparkContext("local[4]", "Test Spark App")
1.2 Spark shell
Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。要想通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。
要想在Python shell中使用Spark,直接运行./bin/pyspark命令即可,如果配置了pyspark的环境变量,则直接运行pyspark命令即可。与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。上述命令的终端输出应该如下图所示:
1.3 弹性分布式数据集
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。
1.3.1 创建RDD
从现有集合创建
collection = list(["a", "b", "c", "d", "e"])
rddFromCollection = sc.parallelize(collection)
RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于Hadoop的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文件、其他Hadoop标准格式、HBase、Cassandra等。以下举例说明如何用一个本地文件系统里的文件创建RDD:
rddFromTextFile = sc.textFile("LICENSE")
上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String(字符串)对象。
创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。Spark的操作通常采用函数式风格。对于那些熟悉用Scala或Python进行函数式编程的程序员来说,这不难掌握。但Spark
API其实容易上手,所以那些没有函数式编程经验的程序员也不用担心。
Spark程序中最常用的转换操作便是map操作。该操作对一个RDD里的每一条记录都执行某个函数,从而将输入映射成为新的输出。比如,下面这段代码便对一个从本地文本文件创建的RDD进行操作。它对该RDD中的每一条记录都执行size函数。之前我们曾创建过一个这样的由若干String构成的RDD对象。通过map函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干Int构成的RDD对象。
intsFromStringsRDD = rddFromTextFile.map(lambda line: line.size)
Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中,count返回一个Long,sum返回一个Double)。这就意味着多个操作可以很自然地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子相同的结果:
aveLengthOfRecordChained = rddFromTextFile.map(lambda line: line.size).sum() /rddFromTextFile.count()
值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。
这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:
transformedRDD = rddFromTextFile.map(lambda line: line.size).filter(lambda size : size & 10).map(lambda size : size * 2)
注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用一个执行操作,比如sum,该计算将会被触发:
computation = transformedRDD.sum()
2. RDD缓存策略
Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache函数来实现:
rddFromTextFile.cache
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。
Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即可:
&&& broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))
16/06/26 21:04:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 296.0 B, free 296.0 B)
16/06/26 21:04:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 110.0 B, free 406.0 B)
16/06/26 21:04:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:36878 (size: 110.0 B, free: 517.4 MB)
16/06/26 21:04:50 INFO SparkContext: Created broadcast 0 from broadcast at PythonRDD.scala:430
终端的输出表明,广播变量存储在内存中,占用的空间大概是110字节,仍余下517MB可用空间。
广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value方法:
sc.parallelize(list(["1", "2", "3"])).map(lambda x: broadcastAList.value).collect()
上述一行代码会从{"1", "2", "3"}这个集合(一个Scala List)里,新建一个带有三条记录的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那个broadcastAList里的记录与新建的RDD里的三条记录分别拼接而成。
注意,上述代码使用了collect函数。这个函数是一个Spark执行函数,它将整个RDD以Scala(Python或Java)集合的形式返回驱动程序。collect函数一般仅在的确需要将整个结果集返回驱动程序并进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,进而导致程序崩溃。高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便属于这类情况。
累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。
3. Spark Python编程入门
Spark的Python API几乎覆盖了所有Scala API所能提供的功能,只有极少数的一些特性和个别的API方法,暂时还不支持。但通常不影响我们使用Spark Python进行编程。下面看一个简单的实例:
"""用Python编写的一个简单Spark应用"""
#filename pythonapp.py
from pyspark import SparkContext
sc = SparkContext("local[2]", "First Spark App")
# 将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求总购买次数
numPurchases = data.count()
# 求有多少不同客户购买过商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])
spark提交脚本命令如下:
tanyouwei@tanyouwei:~$
spark-submit pythonapp.py
没有更多推荐了,
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!}

我要回帖

更多关于 spark python api 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信