GPU+分布式计算,能把数据cpu性能提升升100倍吗

&>&&>&正文
GPU+分布式计算,能把数据性能提升100倍吗?
项目要点:
&&Zilliz定位于基于GPU硬件加速的新一代OLAP(联机分析处理)数据库系统,专注于研发基于GPU的智能数据处理平台,是一家分布式数据库公司。
&&Zilliz的应用领域包括了金融、游戏、电商、物联网、零售、电信等领域。Zilliz的产品还处于内测阶段,产品预计2018年年底正式发布公测版本,未来将在银行、政府、电信等行业进行重点布局。
&&目前,Zilliz现在不超过20人,大部分为技术人员,主要来自于甲骨文等公司。
&&科技发展至今,人类巨大数据量的产生以指数级的速度增长中。在此基础上,云计算、大数据、以及需要大数据支撑的AI技术也在不断蓬勃发展,并在不同垂直领域陆续实现商业化落地。
&&近几年来,中国大数据行业遍地开花,大数据创业公司也在短期内如雨后春笋般出现。大数据领域创业公司也在抓紧赛道窗口期跑马圈地中,寻找中国创客(ID:xjbmaker)曾经报道过行业大数据(数澜科技、云英数据)、人力大数据(E成科技)、零售大数据(超盟数据)、移动游戏大数据(热云数据)、再到营销大数据(ZMT众盟)。
&&在竞争加剧的同时,大数据公司在使用场景、目标客户上更加细分化,形成一定差异化竞争。定位银行、政府等大型客户,Zilliz是一家专注于研发基于GPU硬件加速的新一代OLAP的分布式数据库公司。
&&创业契机:数据的爆发性增长带来机遇
&&“我天生对数据敏感,整个工作生涯似乎都在与数据和计算机打交道。”在美国威斯康星大学计算机专业硕士毕业后,Zilliz的创始人星爵加入甲骨文(Oracle)公司总部。后来在Oracle工作多年,当时他主要负责多租户数据库(OracleMultitenant)的核心研发工作,是一个典型的技术研发工程师。
&&在当时,数据的产生速度每两年发生一次迭代,基本上是两年之前的一倍。在星爵看来,各行各业都存在数据产能过剩,数据不能够得以利用的问题。这是由于现有大数据处理的速度不能够赶上数据增加迭代的速度,导致大量数据没有被分析利用。
&&研究报告表明,人类数据的生产量和存储量呈指数级增长。过去5年里数据量已经从TB(1024GB=1TB)级别跃升到PB(1024TB=1PB)、EB(1024PB=1EB)乃至于ZB (1024EB=1ZB)级别。
&&而在当时,尽管市面上大多数大数据解决方案能处理海量数据,但并不能完全满足瞬时、海量的数据处理需求。在数据行业工作数年的星爵发现,GPU性能改进的速度曲线,跟爆炸式数据增长的曲线非常吻合。
&&尽管海量数据处理的需求已经存在,“但在数据库软件的发展长期受到硬件成本、处理速度等方面的种种约束,在当时并不适合投入商业化使用。”星爵说,直至近期硬件厂商能够提供更加高速的芯片,帮开发者把门槛降低,为分布式数据库的技术开发提供基础。
&&看到创业的时机到来,2016年星爵离开Oracle创办了Zilliz,Zilliz的名字来源于英文zillion of zillions,直译为无穷的无穷。Zilliz现在不超过20人,大部分为技术人员,主要来自于甲骨文等公司。
Zilliz创始人兼CEO:星爵
&&基于GPU的分布式数据库
&&Zilliz是中国首家将GPU的技术应用在分布式数据库中的数据处理公司,据星爵透露,Zilliz的处理性能比普通数据库的性能提高100倍,并且能够在此基础上,将硬件成本降低10倍。
&&一直以来,CPU在计算机上负责“计算”,CPU的核数越大,运算能力越强。相较于CPU的十几核来说,GPU上可以承载数千个处理单元。在过去,GPU技术主要被应用于图像渲染和真实场景模拟。
&&现在,GPU计算已经在深度学习、高性能计算(HPC)中广泛应用,越来越像更高性能的CPU。GPU的这种“大规模并行计算”的能力已经开始被挖掘,定位也从之前协处理器向主流处理器做转移。
&&“如何运用GPU加速数据处理速度,在2006年的时候就是学术热点,”星爵说,他表示为了简单理解GPU分布式数据库,可以想象为当CPU处理数据时,是一个人在抄写课文;当GPU处理数据时,是多个分散在各个地方不同的人,同时在抄录课文,所以效率会高很多。
&&这就是GPU分布式数据库,利用GPU处理器上成千上万个处理单元进行大规模并行数据处理,加速数据库操作。百度百科将分布式数据库定义为,利用高速计算机网络将物理上分散的多个数据存储单元连接起来组成一个逻辑上统一的数据库。
&&当数据量的高速增长,瞬时处理数据的需求得以体现,分布式数据库技术也得到了快速的发展。传统的关系型数据库开始从集中式模型向分布式架构发展,基于关系型的分布式数据库在保留了传统数据库的数据模型和基本特征下,从集中式存储、计算走向分布式存储、计算。Zilliz的技术优势也在于此。
面向银行政府等布局产品
&&目前,Zilliz还在测试阶段,产品预计2018年年底正式上线,产品应用领域包括金融、游戏、电商、物联网、零售、电信等,主要将在银行、政府、互联网行业进行重点布局。
&&值得一提的是,近期火爆的区块链技术跟分布式数据库技术有相似之处,也是去中心化分布式存储和计算。区块链可以被看做是一种特殊的分布式数据库,以一个区块为单位,可以分布式、去中心化地存储数据,不可篡改是它的特点。以往的分布式数据库往往是有中心的,而区块链彻底没有中心,用来防止被篡改。
&&竞品方面,Zilliz对标美国的Kinetica和美国的MapD,二者都是GPU分布式数据库,前者已经于2017年6月完成5千万美元融资,后者于2017年完成2500万B轮融资。而Zilliz于2017年8月完成由云启资本领投,靖亚资本、华岩资本跟投的数千万元天使轮融资。
&&在国内,分布式数据库创业公司还有柏睿数据和PinCAP,其中PinCAP和Zilliz都还处于研发阶段。而柏睿数据定位运营商、公安局等政企大客户已经投入商业化落地,据了解柏睿数据去年签单总金额约为1亿元人民币。分布式数据库也属于大数据公司的一种,区别在于能够在瞬时处理更大量的数据,所以目标企业往往定位于是银行、政府、运营商等每秒运算需求到TB级别的大型政企客户。
&&记者 | 刘娜
作者:刘娜
编辑:新语
来源:中国新闻网  10:07
来源:中国新闻网  09:07
来源:中新网  09:07
来源:中新网  08:06
来源:中新网  22:27
来源:中新网  22:07
中央新闻网站&&专注青少年领域
版权所有:共青团中央网络影视中心信息网络传播视听节目许可证0105108号 京ICP备号-1白话CAPI+FPGA新看点, 为大数据加速(一)--恒扬数据--为用户提供高性能的网络通信平台、数据采集分流、业务加速设备
白话CAPI+FPGA新看点, 为大数据加速(一)
NSA-120加速卡
话说多年以来,作为有着十几年开发经历的元老级FPGA玩家,恒扬科技一直玩着阳春白雪式的东东,什么FPGA加速啦,异构计算啦,多核平台啦,对涉世未深的科技小白来说,即便听了N遍,依旧还是云里雾里,不知所云。
9月16日,第二代分布式计算联盟成立。&作为和IBM、Xilinx携手创办的联盟成员之一,恒扬正好借此机会,放下身段,接接地气,用一个外行人的视角给大家讲讲高大上的CAPI+FPGA为大数据加速碰撞出来的新火花儿。
当当当当~~,科技范小白及FPGA张大侠闪亮登场!
科技范小白:&好学指数&5颗星★★★★★&&&&&&&&&&&科技指数@#%…¥^
FPGA张大侠: FPGA技术指标&爆表&&&&&&&&&&&&&&&&&&&&&颜值。。。。居然也爆表
科技范小白:在下范小白,近日在江湖上听说各路帮主聚首,召开了英雄大会,共商二代分布式计算之事宜,据说席间众英雄群情激荡,脑洞大开,勾画出FPGA+CAPI应用大蓝图。敢问大侠,能否先行给做做脑补,一代分布式计算是啥概念?
FPGA张大侠:对,要明白二代分布式计算,我们先得了解什么是一代分布式计算。举个简单的例子,比如咱们班的数学老师给大家出了一道很难的数学题,看样子非常复杂,计算量超大,一个同学在下课前是完不成了,那么咱们成立一个几个童鞋组成的解题小组,然后由组长分配任务,每人完成其中的一部分计算,通过大家的努力和结果汇总,居然在下课前把一个人如何也完成不了的题做完了。这就是第一代分布式计算的概念——它是相对于集中计算来讲的,即把一个需要庞大计算量的工作,分配到多台计算机上来完成,从而达到快速实现的目的。
科技范小白:原来如此,原来就是从单兵作战改到小组作战啊!那二代分布式计算是咋回事?它和一代计算有啥区别呢?
FPGA张大侠:第二代分布式计算呢,是对第一代分布式计算的一个改进和升华。咱们还是以上面这个例子继续说吧,数学老师看到大家的能力强了,于是第二天又出了一道更难、计算量更大的数学题,我估计这次得是宇宙级复杂程度了&O(∩_∩)O哈哈,但老师说还是你们这个小组来解,但是呢,允许大家找各种各样的工具。于是乎大家有的用算盘算,有人用计算器算,有人用电脑算,八仙过海各显神通。然后组长充分了解到大家工具的能力,再次进行了工作的分配,最后大家在工具的协助下再次完成了难题。可以看到这次大家和上次最大的区别在于,不光用每个人的大脑自己算了,而且寻找工具来帮忙完成计算工作,这些工具在计算方面比人的大脑效率更高。那么这就是第二代分布式计算的概念——即在分布式计算的基础上,再采用专用的协处理器来对CPU(大脑)进行加速,这种CPU+协处理器的模式我们叫做——异构计算,而且要求这里的协处理也能进行编程和升级,以达到通用化目的。
科技范小白:哦,CPU+协处理器=异构计算,我现在终于知道异构计算是啥了!妈妈再也不用担心我的学习了!
FPGA张大侠:上面我们已经提到了异构计算,假如上面的人的大脑是我们认为的通用计算处理器的话,算盘、计算器、计算机就是异构处理器。异构处理器一般相较通用处理器在某些方面的计算能力更强,配合通用处理器处理特定的计算任务从而达到加速的目的。目前主流的异构处理器主要有CPU+GPU和CPU+FPGA两种。CPU+GPU的组合最擅长浮点运算,也就是计算机里面的小数加减乘除运算,这也是一代分布式计算的最典型代表啦。CPU+FPGA呢,最擅长的是整数运算,现在物联网、大数据、人工智能、机器学习这些新兴的大规模数据处理需求不断涌现,CPU+FPGA最擅长干这事,它的机会终于来了!
这次由IBM、赛灵思和恒扬科技共同倡导的二代分布式计算,主要是FPGA+CAPI技术的结合,这个CAPI技术是IBM开发出来的在Power&CPU上的一种新技术,叫加速处理器接口技术,正是因为有了CAPI技术,CAPI接口为CPU和FPGA提供了缓存一致性和对等的内存访问能力,这使得FPGA异构加速计算在CPU侧的编程更具友好性,能达到更快的速度。举个栗子,就好比你要去访问你的邻居,你得先出门,走到外面的马路上,再敲你邻居的门,经过邻居的允许你才能到达邻居家里;如果你和你的邻居关系好,是铁哥们,经常去他家做客,于是你俩商量在你们两家之间的围墙上打个大窟窿,还不装门,通过这样一个“快速通道”你去他家是不是很快。
科技范小白:在下明白了!是不是可以这么理解,这就好比咱们常见的公交快速路,大站快车,直达目的地。
FPGA张大侠:是的,小白你很厉害哟,直接进化成大白了(●—●)。CAPI就是这样一个快速通道,省去了中间多个环节,建立了CPU和FPGA直接的快速一致性通道,方便FPGA加速时通过这个“快速通道”搬运数据。
科技范小白:噢耶,被大侠表扬了,好开森哦!大侠,我还想知道,在CAPI+FPGA异构计算里,CAPI是个快速通道,那FPGA是个什么角色呢,它是不是就是大量数据处理的主力呢?
FPGA张大侠:小白,这次又让你说对了。如果把CPU理解成大脑的话,FPGA加速卡就可以理解成专门用来处理大量计算的计算器或者计算机了,FPGA芯片就是用来进行计算的主力。FPGA(Field Programm-able&Gate&Array) 芯片是一种可编程的IC芯片,就是我们刚才说的协助大脑CPU处理特定计算任务的协处理器,我们可以给它指定任务,让它快速、重复地处理一些CPU不爱干的脏活累活,这样就把CPU给解放出来了,系统整体的处理效率也就大大提高了。
那么结合CAPI技术和FPGA技术的加速卡有什么优势呢?基本可以总结出四大优势:
* & FPGA芯片和CPU更通畅便捷的“对话” &&CAPI技术具备缓存一致性和对等的内存访问能力,也就是说CPU可以分享他的内存空间直接给FPGA使用和访问,通过CAPI给CPU和FPGA搭起一条快速通道,使得他们的“对话和交流”变得轻而易举。
* & 逆天高性能,真正实现以一当百 &&根据赛迪顾问发布的《中国OpenPower产业生态发展白皮书》介绍,使用CAPI+FPGA加速卡对大数据Hadoop算法中对性能要求最高的擦除码进行处理,经测试,服务器性能提升20-100倍!真正实现逆天的以一当百啊!
*&少吃多干,环保低功耗&啥是吃的是草,挤出来的是奶?看看CAPI+FPGA加速卡就知道了!一块FPGA加速卡功耗约20Watt~75Watt,安装在服务器机箱内,不占用额外机房空间。光看这个,你没什么概念是吧!没关系,对比一下就知道了!&1个CPU单元功耗约为&145Watt~190Watt,&1个GPU卡单元&功耗约为&235Watt~300W,这下明白了吧!
* & 灵活可编程,全方位实现指哪打哪 &&FPGA芯片是可以编程的,所以我们可以用它来实现多种不同算法的在线升级。我们可以在不同的场合对FPGA加速单元进行升级,灵活配置更换不同的算法,此法具备通用性,可应用于不同场景加速——可以说,FPGA芯片是“十八般武艺样样精通”呢!
科技范小白:艾玛,CAPI+FPGA加速卡这么厉害啊,我要直接给跪了!那大侠,这么棒的技术,世面上有没有成熟的产品呢?
FPGA张大侠:当然有啊!我们恒扬科技就在不久前推出了国内首款CAPI+FPGA加速卡NSA-120加速卡,先给你目睹一下它的真容啦!上图片!
怎么样?NSA-120加速卡的颜值还是棒棒的吧!除了具备上面说的几点特性外,NSA-120加速卡还具有几大独门秘籍,坐稳听好了啊!
* & 好马配好鞍,硬件可靠是基础& & 基于CAPI+FPGA架构的NSA-120加速卡,可是提前完成了大量的硬件和信号完整性测试工作和产品化工作的哦!这样就确保了产品硬件的可靠性。(基础工作那真是做得杠杠的!)&加速卡自带的NPL基础平台开发包包含了丰富的监控模块和统计模块,可以实时读取FPGA内部温度啦!板卡温度啦!内部模块运行状态啦!统计信息、电源电流动态变化等等这些信息,这就让使用者对加速卡的性能状态一目了然哩!
* &应用轻松上手,开发难度降至最小&&NSA-120加速卡自带的NPL加速卡平台层套件开发包,还有一个更厉害无敌的特性呢!这就是大幅度降低了IDH(FPGA应用加速器开发商)的技术门槛和研发工作量。它简单好用,所有FPGA开发流程通通软件化,让IDH小伙伴们再也不用去了解复杂的底层硬件,只需要关注实现核心算法模块就OK啦!
* & 性能有保障,安全来护航 &&性能有保障,安全当然也不能少。NSA-120加速卡板载两颗加密芯片,一颗是为用户&NPL平台层套件开发包进行加密的,另一颗是为IDH(FPGA应用加速器开发商)&AFU进行加密的,双管齐下,确保IP安全有保证!
* & 一次投资,灵活升级,确保ROI最大化& &具备在线可升级能力的NSA-120可支持不同AFU算法模块在线更换,FPGA配置文件可通过在线从CPU升级到外挂FLASH中,现场无须重新启动服务器,在不影响客户业务运行的情况下完成现场升级。
一口气说了这么多,不知道到小白你听懂了没有!
科技范小白:听懂了听懂了,这NSA-120加速卡真是太牛了!大侠,我还想知道它具体是怎么给大数据云计算应用加速的,有啥实际应用案例?不过今天讲的内容好像挺多的,要不这些我们下次继续,我先好好消化消化今天的内容咯!
FPGA张大侠:没问题,我们下期继续!记得来学哦!
上一篇:下一篇:社会化媒体
了解更多>>
桂ICP备 号
阅读下一篇
自媒体运营攻略
行业经验交流
Hi,在你登录以后,就可以永久免费的收藏任何您感兴趣的内容,关注感兴趣的作者!
手机注册或邮箱注册
点击按钮进行验证
请输入正确的邮箱
已有帐号请点击
帐号创建成功!
我们刚刚给你发送了一封验证邮件
请在48小时内查收邮件,并按照提示验证邮箱
感谢你对微口网的信任与支持
你输入的邮箱还未注册
还没有帐号请点击
点击按钮进行验证
你输入的邮箱还未注册
又想起来了?
你已成功重置密码,请妥善保管,以后使用新密码登录
邮件发送成功!
我们刚刚给你发送了一封邮件
请在5分钟内查收邮件,并按照提示重置密码
感谢你对微口网的信任与支持
对不起,你的帐号尚未验证
如果你没有收到邮件,请留意垃圾箱 或
意见与建议
请留下您的联系方式
* 留下您正确的联系方式,以便工作人员尽快与你取得联系
转藏至我的藏点五种基于 MapReduce 的并行计算框架介绍及性能测试
并行计算模型和框架目前开源社区有许多并行计算模型和框架可供选择,按照实现方式、运行机制、依附的产品生态圈等可以被划分为几个类型,每个类型各有优缺点,如果能够对各类型的并行计算框架都进行深入研究及适当的缺点修复,就可以为不同硬件环境下的海量数据分析需求提供不同的软件层面的解决方案。 并行计算框架并行计算或称平行计算是相对于串行计算来说的。它是一种一次可执行多个指令的算法,目的是提高计算速度,以及通过扩大问题求解规模,解决大型而复杂的计算问题。所谓并行计算可分为时间上的并行和空间上的并行。时间上的并行就是指流水线技术,而空间上的并行则是指用多个处理器并发的执行计算。并行计算(Parallel
Computing)是指同时使用多种计算资源解决计算问题的过程,是提高计算机系统计算速度和处理能力的一种有效手段。它的基本思想是用多个处理器来协同求解同一问题,即将被求解的问题分解成若干个部分,各部分均由一个独立的处理机来并行计算。并行计算系统既可以是专门设计的、含有多个处理器的超级计算机,也可以是以某种方式互连的若干台的独立计算机构成的集群。通过并行计算集群完成数据的处理,再将处理的结果返回给用户。 国内外研究欧美发达国家对于并行计算技术的研究要远远早于我国,从最初的并行计算逐渐过渡到网格计算,随着 Internet
网络资源的迅速膨胀,因特网容纳了海量的各种类型的数据和信息。海量数据的处理对服务器 CPU、IO
的吞吐都是严峻的考验,不论是处理速度、存储空间、容错性,还是在访问速度等方面,传统的技术架构和仅靠单台计算机基于串行的方式越来越不适应当前海量数据处理的要求。国内外学者提出很多海量数据处理方法,以改善海量数据处理存在的诸多问题。目前已有的海量数据处理方法在概念上较容易理解,然而由于数据量巨大,要在可接受的时间内完成相应的处理,只有将这些计算进行并行化处理,通过提取出处理过程中存在的可并行工作的分量,用分布式模型来实现这些并行分量的并行执行过程。随着技术的发展,单机的性能有了突飞猛进的发展变化,尤其是内存和处理器等硬件技术,但是硬件技术的发展在理论上总是有限度的,如果说硬件的发展在纵向上提高了系统的性能,那么并行技术的发展就是从横向上拓展了处理的方式。2003 年美国 Google 公司对外发布了 MapReduce、GFS、BigData 三篇论文,至此正式将并行计算框架落地为 MapReduce 框架。我国的并行和分布式计算技术研究起源于 60 年代末,按照国防科技大学周兴铭院士提出的观点,到目前为止已经三个阶段了。第一阶段,自 60 年代末至 70
年代末,主要从事大型机内的并行处理技术研究;第二阶段,自 70 年代末至 90 年代初,主要从事向量机和并行多处理器系统研究;第三阶段,自 80 年代末至今,主要从事
MPP(Massively Parallel Processor) 系统研究。尽管我国在并行计算方面开展的研究和应用较早,目前也拥有很多的并行计算资源,但研究和应用的成效相对美国还存在较大的差距,有待进一步的提高和发展。MapReduceMapReduce 是由谷歌推出的一个编程模型,是一个能处理和生成超大数据集的算法模型,该架构能够在大量普通配置的计算机上实现并行化处理。MapReduce
编程模型结合用户实现的 Map 和 Reduce 函数。用户自定义的 Map 函数处理一个输入的基于 key/value pair 的集合,输出中间基于
key/value pair 的集合,MapReduce 库把中间所有具有相同 key 值的 value 值集合在一起后传递给 Reduce 函数,用户自定义的
Reduce 函数合并所有具有相同 key 值的 value 值,形成一个较小 value 值的集合。一般地,一个典型的 MapReduce 程序的执行流程如图 1
所示。图 1 .MapReduce
程序执行流程图MapReduce 执行过程主要包括: 将输入的海量数据切片分给不同的机器处理; 执行 Map 任务的 Worker 将输入数据解析成 key/value pair,用户定义的 Map 函数把输入的 key/value pair
转成中间形式的 key/value pair; 按照 key 值对中间形式的 key/value 进行排序、聚合; 把不同的 key 值和相应的 value 集分配给不同的机器,完成 Reduce 运算; 输出 Reduce 结果。任务成功完成后,MapReduce 的输出存放在 R 个输出文件中,一般情况下,这 R 个输出文件不需要合并成一个文件,而是作为另外一个 MapReduce
的输入,或者在另一个可处理多个分割文件的分布式应用中使用。受 Google MapReduce 启发,许多研究者在不同的实验平台上实现了 MapReduce 框架,本文将对 Apache Hadoop
MapReduce、Apache、Spark、斯坦福大学的 Phoenix,Nokia 研发的 Disco,以及香港科技大学的 Mars 等 5 个 MapReduce
实现框架进行逐一介绍和各方面对比。 Hadoop MapReduceHadoop 的设计思路来源于 Google 的 GFS 和
MapReduce。它是一个开源软件框架,通过在集群计算机中使用简单的编程模型,可编写和运行分布式应用程序处理大规模数据。Hadoop 包含三个子项目:Hadoop
Common、Hadoop Distributed File System(HDFS) 和 Hadoop MapReduce。第一代 Hadoop MapReduce 是一个在计算机集群上分布式处理海量数据集的软件框架,包括一个 JobTracker 和一定数量的
TaskTracker。运行流程图如图 2 所示。图 2 .Hadoop
MapReduce 系统架构图在最上层有 4 个独立的实体,即客户端、JobTracker、TaskTracker 和分布式文件系统。客户端提交 MapReduce 作业;JobTracker
协调作业的运行;JobTracker 是一个 Java 应用程序,它的主类是 JobTracker;TaskTracker 运行作业划分后的任务,TaskTracker
也是一个 Java 应用程序,它的主类是 TaskTracker。Hadoop 运行 MapReduce
作业的步骤主要包括提交作业、初始化作业、分配任务、执行任务、更新进度和状态、完成作业等 6 个步骤。 Spark MapReduceSpark 是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速。Spark 非常小巧玲珑,由加州伯克利大学 AMP 实验室的 Matei
为主的小团队所开发。使用的语言是 Scala,项目的核心部分的代码只有 63 个 Scala 文件,非常短小精悍。Spark
启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark
提供了基于内存的计算集群,在分析数据时将数据导入内存以实现快速查询,“速度比”基于磁盘的系统,如比 Hadoop 快很多。Spark
最初是为了处理迭代算法,如机器学习、图挖掘算法等,以及交互式数据挖掘算法而开发的。在这两种场景下,Spark 的运行速度可以达到 Hadoop 的几百倍。 DiscoDisco 是由 Nokia 研究中心开发的,基于 MapReduce 的分布式数据处理框架,核心部分由 Erlang 语言开发,外部编程接口为 Python
语言。Disco 是一个开放源代码的大规模数据分析平台,支持大数据集的并行计算,能运行在不可靠的集群计算机上。Disco 可部署在集群和多核计算机上,还可部署在
Amazon EC2 上。Disco 基于主/从架构 (Master/Slave),图 3 总体设计架构图展示了通过一台主节点 (Master) 服务器控制多台从节点
(Slave) 服务器的总体设计架构。图 3 .Disco
总体架构图Disco 运行 MapReduce 步骤如下: Disco 用户使用 Python 脚本开始 Disco 作业; 作业请求通过 HTTP 发送到主机; 主机是一个 Erlang 进程,通过 HTTP 接收作业请求; 主机通过 SSH 启动每个节点处的从机; 从机在 Worker 进程中运行 Disco 任务。 PhoenixPhoenix 作为斯坦福大学 EE382a 课程的一类项目,由斯坦福大学计算机系统实验室开发。Phoenix 对 MapReduce 的实现原则和最初由 Google
实现的 MapReduce 基本相同。不同的是,它在集群中以实现共享内存系统为目的,共享内存能最小化由任务派生和数据间的通信所造成的间接成本。Phoenix
可编程多核芯片或共享内存多核处理器 (SMPs 和 ccNUMAs),用于数据密集型任务处理。 MarsMars 是香港科技大学与微软、新浪合作开发的基于 GPU 的 MapReduce
框架。目前已经包含字符串匹配、矩阵乘法、倒排索引、字词统计、网页访问排名、网页访问计数、相似性评估和 K 均值等 8 项应用,能够在 32 位与 64 位的 Linux
平台上运行。Mars 框架实现方式和基于 CPU 的 MapReduce 框架非常类似,也由 Map 和 Reduce 两个阶段组成,它的基本工作流程图如图 4
所示。图 4 .Mars
基本工作流程图在开始每个阶段之前,Mars 初始化线程配置,包括 GPU 上线程组的数量和每个线程组中线程的数量。Mars 在 GPU
内使用大量的线程,在运行时阶段会均匀分配任务给线程,每个线程负责一个 Map 或 Reduce 任务,以小数量的 key/value
对作为输入,并通过一种无锁的方案来管理 MapReduce 框架中的并发写入。Mars 的工作流程主要有 7 个操作步骤: 在主存储器中输入 key/value 对,并将它们存储到数组; 初始化运行时的配置参数; 复制主存储器中的输入数组到 GPU 设备内存; 启动 GPU 上的 Map 阶段,并将中间的 key/value 对存储到数组; 如果 mSort 选择 F,即需要排序阶段,则对中间结果进行排序; 如果 noReduce 是 F,即需要 Reduce 阶段,则启动 GPU 上的 Reduce 阶段,并输出最终结果,否则中间结果就是最终结果; 复制 GPU 设备存储器中的结果到主存储器。上述步骤的 1,2,3,7 这四个步骤的操作由调度器来完成,调度器负责准备数据输入,在 GPU 上调用 Map 和 Reduce 阶段,并将结果返回给用户。五种框架的优缺点比较表 1. 五种框架优缺点比较 Hadoop MapReduce
需要首先架构基于 Hadoop 的集群系统,通过 HDFS 完成运算的数据存储工作
可以的单独运行,也可以与 Hadoop 框架完整结合
独立运行,不需要提前部署集群,运行时系统的实现是建立在 PThread 之上的,也可方便地移植到其他共享内存线程库上
整个 Disco 平台由分布式存储系统 DDFS 和 MapReduce 框架组成,DDFS
与计算框架高度耦合,通过监控各个节点上的磁盘使用情况进行负载均衡
运行时为 Map 或 Reduce 任务初始化大量的 GPU 线程,并为每个线程自动分配少量的 key/value 对来运行任务
计算能力非常强,适合超大数据集的应用程序,但是由于系统开销等原因,处理小规模数据的速度不一定比串行程序快,并且本身集群的稳定性不高
在保证容错的前提下,用内存来承载工作集,内存的存取速度快于磁盘多个数量级,从而可以极大提升性能
利用共享内存缓冲区实现通信,从而避免了因数据复制产生的开销,但 Phoenix 也存在不能自动执行迭代计算、没有高效的错误发现机制等不足
由一个 Master 服务器和一系列 Worker 节点组成,Master 和 Worker 之间采用基于轮询的通信机制,通过 HTTP
的方式传输数据。轮询的时间间隔不好确定,若时间间隔设置不当,会显著降低程序的执行性能
由于 GPU 线程不支持运行时动态调度,所以给每个 GPU 线程分配的任务是固定的,若输入数据划分布均匀,将导致 Map 或 Reduce
阶段的负载不均衡,使得整个系统性能急剧降低。同时由于 GPU
不支持运行时在设备内存中分配空间,需要预先在设备内存中分配好输入数据和输出数据的存放空间,但是 Map 和 Reduce
阶段输出数据大小是未知的,并且当多个 GPU 线程同时向共享输出区域中写数据时,易造成写冲突 WordCount 实验 基本原理单词计数 (WordCount) 是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版"Hello
World"。单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数。 本次实验步骤本次实验的硬件资源基于 x86 服务器 1 台,配置为内存 32GB DDR3、E5 CPU/12 核、GPU,实验数据样本为
10M/50M/100M/500M/1000M 的文本文件五个,我们使用 Hadoop MapReduce、Spark、Phoenix、Disco、Mars 等
MapReduce 框架分别运行文本分析程序,基于结果一致的前提下统计出运行时间、运行时 CPU 占有率、运行时内存占有率等数据,并采用这些数据绘制成柱状图。Hadoop
MapReduce首先需要将文件拆分成 splits,由于测试用的文件较小,所以每个文件为一个 split,并将文件按行分割形成&key,value&对,图 12
分割过程图所示。这一步由 MapReduce 框架自动完成,其中偏移量(即 key 值)包括了回车所占的字符数(Windows 和 Linux 环境会不同)。图 5 . 分割过程图将分割好的&key,value&对交给用户定义的 map 方法进行处理,生成新的&key,value&对,图 6 执行 map 方法所示。图 6 . 执行 Map
方法过程图得到 map 方法输出的&key,value&对后,Mapper 会将它们按照 key 值进行排序,并执行 Combine 过程,将 key 相同的
value 值累加,得到 Mapper 的最终输出结果。图 7Map 端排序及 Combine 过程所示。图 7 . Map 端排序及
Combine 过程Reducer 先对从 Mapper 接收的数据进行排序,再交由用户自定义的 reduce 方法进行处理,得到新的&key,value&对,并作为
WordCount 的输出结果,图 15Reduce 端排序及输出结果所示。图 8 . Reduce
端排序及输出结果流程图清单 1 . 第一代
Hadoop MapReduce WordCount 示例代码import java.io.IOE
import java.util.StringT
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.util.GenericOptionsP
public class WordCount {
public static class TokenizerMapper
extends Mapper&Object, Text, Text, IntWritable&{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// 开始 Map 过程
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//遍历 Map 里面的字符串
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
public static class IntSumReducer
extends Reducer&Text,IntWritable,Text,IntWritable& {
private IntWritable result = new IntWritable();
//开始 Reduce 过程
public void reduce(Text key, Iterable&IntWritable& values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
result.set(sum);
context.write(key, result);
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount &in& &out&");
System.exit(2);
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}Spark WordCount
实验Spark 与 Hadoop MapReduce 的最大区别是它把所有数据保存在内存中,Hadoop MapReduce 需要从外部存储介质里把数据读入到内存,Spark
不需要这一步骤。它的实现原理与 Hadoop MapReduce 没有太大区别,这里不再重复原理,完整的运行代码如清单 2 所示。清单 2 . Spark
WordCount 示例代码SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD&String& lines = ctx.textFile(args[0], Integer.parseInt(args[1]));
JavaRDD&String& words = lines.flatMap(new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) {
return Arrays.asList(SPACE.split(s));
//定义 RDD ones
JavaPairRDD&String, Integer& ones = words.mapToPair(new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(String s) {
return new Tuple2&String, Integer&(s, 1);
//ones.reduceByKey(func, numPartitions)
JavaPairRDD&String, Integer& counts = ones.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
//输出 List
List&Tuple2&String, Integer&& output = counts.collect();
Collections.sort(output, new Comparator&Tuple2&String, Integer&&() {
public int compare(Tuple2&String, Integer& t1,
Tuple2&String, Integer& t2) {
if(t1._2 & t2._2) {
return -1;
} else if(t1._2 & t2._2) {
});Disco WordCount
实验MapReduce 框架由于 Disco 有分布式文件系统存在,所以一般情况下都不会单独使用,都是从分布式文件系统内取数据后读入内存,然后再切分数据、进入
MapReduce 阶段。首先需要调用 ddfs 的 chunk 命令把文件上传到 DDFS,然后开始编写 MapReduce 程序,Disco 外层应用程序采用
Python 编写。Map 程序实例如清单 3 所示,Reduce 程序示例如清单 4 所示。清单 3 . Map 程序段def fun_map(line, params):
for word in line.split():
yield word, 1清单 4 . Reduce
程序段def fun_reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)清单 5 . Map/Reduce
任务from disco.core import Job, result_iterator
def map(line, params):
for word in line.split():
yield word, 1
def reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
if __name__ == '__main__':
job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
reduce=reduce)
for word, count in result_iterator(job.wait(show=True)):
print(word, count)
NotePhoenix
WordCount 实验Phoenix 是基于 CPU 的 MapReduce 框架,所以它也是采用将数据分割后读入内存,然后开始 MapReduce 处理阶段这样的传统方式。Phoenix
并不由用户决定切分每个 Map 分配到的数据块的大小,它是根据集群系统的实际 Cache 大小来切分的,这样可以避免出现分配到 Map
的数据块过大或者过小的情况出现。过大的数据快会导致 Map 执行较慢,过小的数据快会导致 Map 资源浪费,因为每次启动 Map 线程都需要消耗一定的系统资源。Map
阶段切分好的文本被多个 Map 并行执行,Phoenix 支持 100 个左右的 Map 并行执行,一个工作节点下可以有若干个 Map
并行执行。只有当一个工作节点上所有的 Map 任务都结束后才开始 Reduce 阶段。Reduce
阶段继续沿用了动态任务调度机制,同时允许用户自定义数据分区规则。清单 6 .
Phoenix 的 wordCount 程序段#include &stdio.h&
#include &strings.h&
#include &string.h&
#include &stddef.h&
#include &stdlib.h&
#include &unistd.h&
#include &assert.h&
#include &sys/mman.h&
#include &sys/stat.h&
#include &sys/time.h&
#include &fcntl.h&
#include &ctype.h&
#include &inttypes.h&
#include "map_reduce.h"
#include "stddefines.h"
#include "sort.h"
#define DEFAULT_DISP_NUM 10
typedef struct {
} wc_data_t;
NOT_IN_WORD
struct timeval begin,
#ifdef TIMING
unsigned int library_time = 0;
/** mystrcmp()
* Comparison function to compare 2 words
int mystrcmp(const void *s1, const void *s2)
return strcmp((const char *)s1, (const char *) s2);
/** mykeyvalcmp()
* Comparison function to compare 2 ints
int mykeyvalcmp(const void *v1, const void *v2)
keyval_t* kv1 = (keyval_t*)v1;
keyval_t* kv2 = (keyval_t*)v2;
intptr_t *i1 = kv1-&
intptr_t *i2 = kv2-&
if (i1 & i2) return 1;
else if (i1 & i2) return -1;
return strcmp((char *)kv1-&key, (char *)kv2-&key);
//return 0;
/** wordcount_分割器 ()
* 内存里面进行 Map 计算
int wordcount_splitter(void *data_in, int req_units, map_args_t *out)
wc_data_t * data = (wc_data_t *)data_
assert(data_in);
assert(out);
assert(data-&flen &= 0);
assert(data-&fdata);
assert(req_units);
assert(data-&fpos &= 0);
// End of file reached, return FALSE for no more data
if (data-&fpos &= data-&flen) return 0;
// Set the start of the next data
out-&data = (void *)&data-&fdata[data-&fpos];
// Determine the nominal length
out-&length = req_units * data-&unit_
if (data-&fpos + out-&length & data-&flen)
out-&length = data-&flen - data-&
// Set the length to end at a space
for (data-&fpos += (long)out-&
data-&fpos & data-&flen &&
data-&fdata[data-&fpos] != ' ' && data-&fdata[data-&fpos] != '\t' &&
data-&fdata[data-&fpos] != '\r' && data-&fdata[data-&fpos] != '\n';
data-&fpos++, out-&length++);
/** wordcount_locator()
* Return the memory address where this map task would heavily access.
void *wordcount_locator (map_args_t *task)
assert (task);
return task-&
/** wordcount_map()
* 对文本进行计数
void wordcount_map(map_args_t *args)
char *curr_start, curr_
int state = NOT_IN_WORD;
assert(args);
char *data = (char *)args-&
assert(data);
curr_start =
for (i = 0; i & args-& i++)
curr_ltr = toupper(data[i]);
switch (state)
case IN_WORD:
data[i] = curr_
if ((curr_ltr & 'A' || curr_ltr & 'Z') && curr_ltr != '\'')
data[i] = 0;
emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1);
state = NOT_IN_WORD;
case NOT_IN_WORD:
if (curr_ltr &= 'A' && curr_ltr &= 'Z')
curr_start = &data[i];
data[i] = curr_
state = IN_WORD;
// Add the last word
if (state == IN_WORD)
data[args-&length] = 0;
emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1);
/** wordcount_reduce()
* 计算字符
void wordcount_reduce(void *key_in, iterator_t *itr)
char *key = (char *)key_
intptr_t sum = 0;
assert(key);
assert(itr);
while (iter_next (itr, &val))
sum += (intptr_t)
emit(key, (void *)sum);
void *wordcount_combiner (iterator_t *itr)
intptr_t sum = 0;
assert(itr);
while (iter_next (itr, &val))
sum += (intptr_t)
return (void *)
int main(int argc, char *argv[])
final_data_t wc_
char * fname, * disp_num_
struct timeval starttime,
get_time (&begin);
// 确保文件名
if (argv[1] == NULL)
printf("USAGE: %s &filename& [Top # of results to display]\n", argv[0]);
fname = argv[1];
disp_num_str = argv[2];
printf("Wordcount: Running...\n");
// 读取文件
CHECK_ERROR((fd = open(fname, O_RDONLY)) & 0);
// Get the file info (for file length)
CHECK_ERROR(fstat(fd, &finfo) & 0);
#ifndef NO_MMAP
// 内存里面开始调用 map
CHECK_ERROR((fdata = mmap(0, finfo.st_size + 1,
PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)) == NULL);
fdata = (char *)malloc (finfo.st_size);
CHECK_ERROR (fdata == NULL);
ret = read (fd, fdata, finfo.st_size);
CHECK_ERROR (ret != finfo.st_size);
CHECK_ERROR((disp_num = (disp_num_str == NULL) ?
DEFAULT_DISP_NUM : atoi(disp_num_str)) &= 0);
wc_data_t wc_
wc_data.unit_size = 5; // approx 5 bytes per word
wc_data.fpos = 0;
wc_data.flen = finfo.st_
wc_data.fdata =
CHECK_ERROR (map_reduce_init ());
map_reduce_args_t map_reduce_
memset(&map_reduce_args, 0, sizeof(map_reduce_args_t));
map_reduce_args.task_data = &wc_
map_reduce_args.map = wordcount_
map_reduce_args.reduce = wordcount_
map_reduce_args.combiner = wordcount_
map_reduce_args.splitter = wordcount_
map_reduce_args.locator = wordcount_
map_reduce_args.key_cmp =
map_reduce_args.unit_size = wc_data.unit_
map_reduce_args.partition = NULL; // use default
map_reduce_args.result = &wc_
map_reduce_args.data_size = finfo.st_
map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 1024 * 2;
map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16;
map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16;
map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2;
printf("Wordcount: Calling MapReduce Scheduler Wordcount\n");
gettimeofday(&starttime,0);
get_time (&end);
#ifdef TIMING
fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin));
get_time (&begin);
CHECK_ERROR(map_reduce (&map_reduce_args) & 0);
get_time (&end);
#ifdef TIMING
library_time += time_diff (&end, &begin);
get_time (&begin);
gettimeofday(&endtime,0);
printf("Wordcount: Completed %ld\n",(endtime.tv_sec - starttime.tv_sec));
printf("Wordcount: MapReduce Completed\n");
printf("Wordcount: Calling MapReduce Scheduler Sort\n");
mapreduce_sort(wc_vals.data, wc_vals.length, sizeof(keyval_t), mykeyvalcmp);
CHECK_ERROR (map_reduce_finalize ());
printf("Wordcount: MapReduce Completed\n");
dprintf("\nWordcount: Results (TOP %d):\n", disp_num);
for (i = 0; i & disp_num && i & wc_vals. i++)
keyval_t * curr = &((keyval_t *)wc_vals.data)[i];
dprintf("%15s - %" PRIdPTR "\n", (char *)curr-&key, (intptr_t)curr-&val);
free(wc_vals.data);
#ifndef NO_MMAP
CHECK_ERROR(munmap(fdata, finfo.st_size + 1) & 0);
free (fdata);
CHECK_ERROR(close(fd) & 0);
get_time (&end);
#ifdef TIMING
fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin));
MapReduceMars 框架中,Map 和 Reduce 的处理阶段都在 GPU 内进行,Map 和 Reduce 的分割数据阶段都在 CPU 内进行,这是与其他基于 CPU 的
MapReduce 框架的最大不同。Mars 更多的是利用 CPU、GPU 缓存来替代内存,执行数据分割、处理过程。具体的 Word count 的流程如下所示: 准备 key/value 键值对,将这些键值对存储在数组里面; 初始化 MapReduce 上下文,设置参数 (根据不同的 GPU 需要根据 CUDA 核心数目设置并发线程数); 数据预处理,首先打开文件,将文件所有内容读入内存,然后申请一块同文件大小的显存,将文件内容中小写字符转为大写 (这样的影响 word,Word
算通一个单词)。 开始 MapReduce 阶段。根据并发线程数和文件大写切换内存中的文件,每块切分后的任务记录下该任务在内存中的偏移位置和长度视为 value,
显存的指针地址视为 key,将任务添加的任务池。将处理后的内存内容复制到刚刚申请的显存中。接着开始 Map 流程,将内存中的任务池复制到显存,申请显存块用于存放
Map 产生的数据,开启多线程并发执行用户定义的 map 流程 MAP_COUNT_FUNC,这个是 Mars 由于 GPU 程序的特殊性而设计的,用于记录
map 产生的 key 和 value 的长度 (sizeof)。调用 MAP_FUNC 方法,输入任务记录,输出单词以及单词所在的位置; 如果 noSort 是 F,对结果排序; 如果 noReduce 是 F,GPU 开始 reduce 阶段,生成最终的结果集。否则,立即输出最后的结果集; 结果输出,从 GPU 设备拷贝最终的结果集到内存,然后输出到屏幕。通过上述的 7 个步骤,WordCount 的计算过程全部完成并且输出结果集。清单 7 . Mars 的 Map
程序段#ifndef __MAP_CU__
#define __MAP_CU__
#include "MarsInc.h"
#include "global.h"
__device__ int hash_func(char* str, int len)
for (i = 0, hash= i & i++)
hash = (hash&&4)^(hash&&28)^str[i];
__device__ void MAP_COUNT_FUNC//(void *key, void *val, size_t keySize, size_t valSize)
WC_KEY_T* pKey = (WC_KEY_T*)
WC_VAL_T* pVal = (WC_VAL_T*)
char* ptrBuf = pKey-&file + pVal-&line_
int line_size = pVal-&line_
char* p = ptrB
int lsize = 0;
int wsize = 0;
char* start = ptrB
for (; *p &= 'A' && *p &= 'Z'; p++, lsize++);
*p = '\0';
wsize = (int)(p - start);
if (wsize & 6)
//printf("%s, wsize:%d\n", start, wsize);
EMIT_INTER_COUNT_FUNC(wsize, sizeof(int));
for (; (lsize & line_size) && (*p & 'A' || *p & 'Z'); p++, lsize++);
if (lsize &= line_size)
__device__ void MAP_FUNC//(void *key, void val, size_t keySize, size_t valSize)
WC_KEY_T* pKey = (WC_KEY_T*)
WC_VAL_T* pVal = (WC_VAL_T*)
char* filebuf = pKey-&
char* ptrBuf = filebuf + pVal-&line_
int line_size = pVal-&line_
char* p = ptrB
char* start = ptrB
int lsize = 0;
int wsize = 0;
for (; *p &= 'A' && *p &= 'Z'; p++, lsize++);
*p = '\0';
wsize = (int)(p - start);
int* o_val = (int*)GET_OUTPUT_BUF(0);
if (wsize & 6)
//printf("%s, %d\n", start, wsize);
EMIT_INTERMEDIATE_FUNC(start, o_val, wsize, sizeof(int));
for (; (lsize & line_size) && (*p & 'A' || *p & 'Z'); p++, lsize++);
if (lsize &= line_size)
#endif //__MAP_CU__清单 8 . Mars 的
Reduce 程序段#ifndef __REDUCE_CU__
#define __REDUCE_CU__
#include "MarsInc.h"
__device__ void REDUCE_COUNT_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
__device__ void REDUCE_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
#endif //__REDUCE_CU__五种框架 WordCount
实验性能对比图 9 . 实验运行时间比较图图 9 实验运行时间比较图是分析不同大小的文本文件所消耗的时间对比图。从上图可以看出,Hadoop MapReduce 的运行时间最长,原因是 Hadoop
生态环境包含内容过多,所以每次任务启动时首先需要加载所需资源包,然后缓慢地发起任务,并且由于本身是用性能较差的 Java
语言编写的,所以导致整体计算时间长、性能差。Phoenix 由于采用汇编和 C 语言编写,内核很小,运行时所用资源很少,所以整个测试过程耗时也较少。Spark 框架在
WordCount 实验中消耗的时长较 Disco 稍少,但是比 Phoenix、Mars 耗时太多。耗时最短的两个框架是 Mars 和
Phoenix。需要时长从高到低分别是 Hadoop MapReduce、Disco、Spark、Phoenix、Mars。图 10 .CPU 使用率对比图图 10-CPU 使用率比较图是分析任务执行过程当中 CPU 使用率情况图。从上图可以看出,Hadoop MapReduce、Disco 这两个框架需要占用的 CPU
资源在 1000M 文本处理时基本到达最大饱和度 (大于 90%),Apache Spark 的 CPU 使用率没有完全伴随着文本文件增大而大幅上涨,Phoenix 和
Mars 基本控制在对 CPU 使用率较低的范围内。图 11 . 内存使用率对比图图 11 内存使用率比较图是分析任务执行过程中内存使用情况对比。从图中可以看出,Mars 和 Phoenix
这两款框架所使用的内存在文本数据较小时是最少的,随着文本数据的增大,Apache Spark 随着数据量增大而内存大幅增加,Mars 和 Phoenix
有一定幅度的内存使用增加趋势。当数据量达到本次测试最大的 1000M 文本时,Spark 框架对内存的消耗是最小的,Hadoop MapReduce 和 Disco
需要占用较多的内存。从上面的测试结果我们得出,如果用户只需要处理海量的文本文件,不需要考虑存储、二次数据挖掘等,采用 Phoenix 或者 Mars 是最大性价比的选择,但是由于 Mars
必须在 GPU 上运行,本身 GPU 由于价格因素,导致不太可能在实际应用场景里推广,所以综合来看 Phoenix
是性价比最高的框架。如果应用程序需要处理的数据量非常大,并且客户希望计算出的数据可以被存储和二次计算或数据挖掘,那 Hadoop MapReduce 较好,因为整个
Hadoop 生态圈庞大,支持性很好。Apache Spark 由于架构层面设计不同,所以对于
CPU、内存的使用率一直保持较低状态,它未来可以用于海量数据分析用途。结束语现实世界很多实例都可用 MapReduce 编程模型来表示,MapReduce
作为一个通用可扩展的、高容错性的并行处理模型,可有效地处理海量数据,不断地从中分析挖掘出有价值的信息。MapReduce
封装了并行处理、负载均衡、容错、数据本地化等技术难点细节。通过本文测试用例可以证明 MapReduce
适用于海量文本分析的应用场景,可以为处理大数据提供技术支撑。
相关主题参考书籍 云计算的关键技术与应用实例 人民邮电出版社 王鹏 2010 参考论文 基于 Hadoop 平台的任务调度方案分析周口师范学院计算机科学与技术学院 周航 2013 参考论文 MapReduce 原理及其主要实现平台分析 现代图书情报技术 亢丽芸 2012 参考论文 近年来 Hadoop 国外研究综述 计算机系统应用 王彦明 2013 在 ,了解关于大数据的更多信息,获取技术文档、how-to 文章、培训、下载、产品信息以及其他资源。 在 ,了解关于信息管理的更多信息,获取技术文档、how-to 文章、培训、下载、产品信息以及其他资源。
添加或订阅评论,请先或。
有新评论时提醒我
static.content.url=http://www.ibm.com/developerworks/js/artrating/SITE_ID=10Zone=Big data and analyticsArticleID=1010384ArticleTitle=五种基于 MapReduce 的并行计算框架介绍及性能测试publish-date=}

我要回帖

更多关于 cpu性能提升 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信