运用Aggregator模式实现MapReduce

MapReduce是更好地利用并行计算资源来提升数据处理能力的重要算法,如今已被主流的大数据分析平台实现,成为了大数据批量处理的主力军。利用前面介绍的Actor特性,其实我们也可以实现一个简易的MapReduce。

利用AKKA Actor来实现MapReduce,天生就支持并行计算(利用远程Actor)与异步操作。为了简便起见,本例使用了本地的Actor实现了大数据世界的Hello World,即WordCounter。

在编写字数统计器的MapReduce之前,我们需要先分辨职责,包括:

  • 给定网页地址,获取指定网页的内容
  • 对网页内容进行分词
  • 为每个单词统计字数

考虑到本文的中心主题是介绍响应式编程与Actor模型,所以我们降低了案例难度,读取的网页内容均为英文,并简单地以空格作为分词的标志。由于我们需要接受客户端的字数统计分析请求,那么要完成前面提到的职责,至少需要四个Actor:

  • WordCounterClient:发送数据分析请求
  • WordCounterServer:模拟服务端,接收数据分析请求,并最终将统计后的结果返回给WordCounterClient
  • PageContentFetcher:获取网页内容
  • ContentWordCounter:网页内容的字数统计器

为了尽可能地提升性能,对于获取网页内容以及统计内容字数的统计工作,我们都需要多个Actor同时执行。然而,由于每个Actor处理消息都是以异步形式进行,我们该怎样才能知道并发处理的请求都得到了处理?针对字数统计器的案例而言,我们还需要将每个Actor统计获得的字数再进行reduce,同样也需要知道是否每条消息都已经处理完毕,并获得处理的结果。

AKKA通过Aggregator特性实现了Aggregator模式,可以很好地解决刚才提到的问题。它通过引入一个单独的聚合器Actor,用以聚合多个Actor产生的数据,并根据这些Actor对消息的Response*更新状态*。

假定ContentWordCounter分析后的结果如下代码所示:

case class AnalysisResult(wordToCount: Seq[(String, Long)])

那么,Aggregator就可以通过在其内部维持一个分析结果集(即前面所谓的状态,代码中的analysisResults),每收到一个Actor的Response,就将结果塞入到这个结果集(更新状态)中,并判断结果集的长度是否等于要处理的网页数,以此作为消息是否处理完毕的条件。整个Aggregator的实现如下:

class WordCounterAggregator extends Actor with Aggregator {  expectOnce {
    case StartAggregation(target, urls) =>
      new Handler(target, urls, sender)
    case _ =>
      sender ! BadCommand
      context stop self
  }
  class Handler(target: ActorRef, urls: Seq[String], originalSender: ActorRef) {
    var analysisResults = Set.empty[AnalysisResult]
    context.system.scheduler.scheduleOnce(10.seconds, self, Timeout)
    expect {
      case Timeout =>
        respondIfDone(respondAnyway = true)
    }
    urls.foreach { uri =>
      target ! FetchPageContent(uri)
      expectOnce {
        case result: AnalysisResult =>
          analysisResults += result
          respondIfDone()
      }
    }
    def respondIfDone(respondAnyway: Boolean = false) = {
      import MapSeqImplicits._
      if (respondAnyway || analysisResults.size == urls.size) {
        val wordToCounts = analysisResults.flatMap(_.wordToCount).reduceByKey(_ + _)
        originalSender ! AggregatedAnalysisResult(wordToCounts)
        context stop self
      }
    }
  }
}

WordCounterAggregator继承了Aggregator特性,这个特性已经对Actor的receive进行了处理,使得继承该特性的Actor不需要重写receive方法。Aggregator特性提供了expectexpectOnceunexpect,用以接收期待处理的消息。

在Aggregator内部,其实维持了一个expectList,用以存放expect等函数所接收的偏函数。expectexpectOnce都是将偏函数放入到这个列表中,只是后者只留存一次(通过permanent标志来判定),一旦匹配了,就会将该偏函数移除,而expect则不会;至于unexpect,就是expect的反操作,用于将偏函数从列表中移除。

自定义的respondIfDone方法会在满足聚合条件时,对分析结果进行reduce运算。Scala的集合库自身并没有提供reduceByKey()函数,是我模仿Spark的RDD自行编写的隐式转换方法:

object MapSeqImplicits {
  implicit class MapSeqWrapper(wordToCount: Iterable[(String, Long)]) {
    def reduceByKey(f: (Long, Long) => Long): Seq[(String, Long)] = {
      wordToCount.groupBy(_._1).map {
        case (word, counts) => (word, counts.map(_._2).foldLeft(0L)(f))      
      }.toSeq
    }
 }
}

因为引入了一个Aggregator,消息的处理以及Actor之间的协作就变得相对复杂。要进行响应式编程,其中一个关键就是要理清楚数据(或消息)的流动方向,并分辨每个数据处理器的职责。我们可以借助类似状态图之类的可视化工具帮助我们分析数据流动模型。下图是本例的一个消息处理模型,它同时还表达了Actor之间的协作关系。

执行字数统计的流程如下所示:

  • 首先,WordCounterClient接收StartAnalysisWebPages消息,准备分析网页;
  • 由于Client没有这个“能力”完成分析任务,于是求助于WordCounterServer,并发起FetchWebPages消息,要求获取网页内容;
  • WordCounterServer同样是个惫懒货色,什么都不干,转手就将这件事情转交给别的Actor了,所以他其实就是一个前台接待员。如果不需要聚合,它收到的FetchWebPages其实应该交给PageContentFetcher,但现在须得经由WordCounterAggregator来分配请求;所以从另外一个角度来看,这个Aggregator相当于是一个Mediator;
  • 由于Aggregator是一个Mediator,因此它会协调多个PageContentFetcher与ContentWordCounter来并行完成任务;因而Aggregator和这两个Actor之间是一对多关系,而PageContentFetcher与ContentWordCounter则属于一对一关系。当PageContentFetcher获得了网页内容后,就通过CountPageContent消息,将统计字数的职责交给了ContentWordCounter;
  • ContentWordCounter在计算完当前网页的字数后,会将分析结果AnalysisResult返回给Aggregator,并由其完成分析结果的reduce运算,并返回AggregatedAnalysisResult结果给Server;
  • 最后,Server再将Client需要的最终结果返回给Client。

由于Aggregator需要协调多个Fetcher与Counter的Actor,以支持异步并行计算(本例实则是并发计算)的需要,我为其引入了AKKA提供的Router Actor。通过Router可以创建一个容器Actor,内部管理多个worker rootees,并提供了RoundRobin、Random、Boardcast等多种路由形式,用户可以根据Actor的负载情况选择不同的路由方式。

这里,我选择使用RoundRobin以硬编码的形式创建了Router Actor:

val analyst: ActorRef = context.actorOf(Props(new ContentWordCounter(aggregator)), "PageContentAnalyst") 
val fetchers = context.actorOf(RoundRobinPool(4).props(Props(new PageContentFetcher(analyst))), "fetchers")

整体来看,PageContentFetcher与ContentWordCounter其实扮演的是map角色,并通过Router Actor来实现map工作的异步并发处理;而WordCounterAggregator则扮演了reduce角色,它负责将收到的多个分析结果进行reduce运算。

由于缺乏对MapReduce算法必要的封装,用AKKA Actor实现的MapReduce显得比较复杂,但却较好地体现了响应式编程的异步数据流本质。

当我们在使用Actor来处理异步消息传递时,当业务渐趋复杂后,我们常常会迷失在复杂的消息传递网中而无法自拔。为了保持清醒的头脑,需要时刻谨记Actor的职责。以我的经验,我们应该考虑:

  • 从Actor扮演的角色来思考它应该接收什么样的消息;
  • Actor对消息的处理一定要满足单一职责原则,正确地履行职责,也当在合适时候正确地转移职责;
  • 运用状态图帮助思考Actor与其他Actor之间的协作关系;
  • 正确理解AKKA Actor的消息发送机制,当在Actor内部再次发送消息时,是由sender发送,还是通过消息传递过来的actorRef对象发送消息。

要完成多个网页的字数统计功能,除了使用稍显复杂的Actor模式之外,我们也可以直接使用scala提供的并行集合来完成,代码更为精简:

val words = for {
 url <- urls.par
 line <- scala.io.Source.fromURL(url).getLines()
 word <- line.split(" ")
} yield (word)
val analysisResult = words.map(w => (w, 1L)).reduceByKey(_ + _)

在业务相对简单,并不需要非阻塞消息处理,也没有可伸缩性需求的时候,若能恰当运用scala自身提供的par集合会是好的选择。

事实上,为了实现字数统计的功能,采用AKKA提供的Aggregator确乎有些过度。它更擅长于通过将职责分治与合理运用基于消息的Actor模式来完成更为复杂的响应式系统。WordCounter的例子不外乎是我为了更好地解释Aggregator模式而给出的一个Demo罢了。


本文以及《利用Actor实现管道过滤器模式》两篇文章给出的源代码,可以在我的github上获得:https://github.com/agiledon/reactiveprogramming.git

2017-02-20 21:0189AKKAScala