`
guibin
  • 浏览: 363480 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

统一了线程和事件的Actors(Actors That Unify Threads and Events)(第三节)

阅读更多
统一了线程和事件的Actors(Actors That Unify Threads and Events)(第三节)
guibin.beijing@gmail.com

本文翻译自http://lamp.epfl.ch/~phaller/doc/haller07actorsunify.pdf,原作者 Philipp Haller 和 Martin Odersky.

第三节. 实例
在这一节中我们用一个大的例子来讨论actor模型给我们带来的好处。在这个过程中,我们仔细分析三种不同的实现:一个事件驱动的版本,一个基于线程的版本,一个使用Scala的actor的版本。

Fig. 2. Producers that generate all values in a tree in in-order 写道
class InOrder(n: IntTree)
    extends Producer[int] {
    def produceValues = traverse(n)
    def traverse(n: IntTree) {
        if (n != null) {
            traverse(n.left)
            produce(n.elem)
            traverse(n.right)
        }
    }
}

class InOrder(n: IntTree)
    extends Producer[int] {
    def produceValues = traverse(n, {})
    def traverse(n: Tree, c: => unit) {
        if (n != null) {
            traverse(n.left, produce(n.elem,
            traverse(n.right, c)))
        } else c
    }
}


我们将要写一个producer的抽象,这个producer提供标准的iterator接口以获取一系列生成出来的值。Pruducer需要实现一个抽象的produceValues方法,这个produceValues方法调用produce以生成单独的值。这两个方法都继承自类Producer。作为一个列子,figure 2中的第一个例子展示了producer的定义,这个producer通过中序遍历一个预先存了值的二叉树生成需要的值。

Fig. 3. Event-driven and threaded producers 写道
abstract class CPSProducer[T] {
    var next: Option[T] = None
    var savedCont: () => unit =
        () => produceValues
    def produce(x: T,
                        cont: => unit) {
        next = Some(x)
        savedCont = () => {
            next = None; cont
        }
    }
...
}


abstract class ThreadedProducer[T] {
    val produced = new Queue[Option[T]]
    def next: Option[T] = synchronized {
        while (produced.isEmpty) {wait()}
        produced.dequeue
    }
    new Thread(new Runnable() {
        def run() {
            produceValues
            produced += None
        }
    }).start()

    def produce(x: T) = synchronized {
        produced += Some(x)
        if (produced.length == 1) notify()
    }
...
}


在纯的事件驱动的版本中,基本上有两种方法遍历,也就是通过连续传值方式遍历(continuation-passing style -CPS),或者是显示的FSM编程。在figure 3的第一段程序展示了用事件驱动的方法实现了producers,在这种实现方法中运用了CPS方式的遍历。思路是produce方法是一个被传递中的持续闭包,当下一个值要被产生时,这个方法将会被调用。比如之前提到的使用中序遍历的二叉树实现的方法,即Figure 2的第二段程序,通过使用producer的实例变量来改变生成的值。

figure 3的第二段程序展示了一个使用线程实现的producer的抽象。在基于线程实现的版本中,迭代的状态在运行produceValues方法的堆栈中隐式的维护着。生成的值被放进一个能够同其他迭代器通信的队列中。在向一个空队列请求值时,此时会阻塞运行迭代器的线程(guibin注,在此例中迭代器是指运行next方法)。同基于事件驱动的实现版本相比,基于线程的版本简化了对迭代策略的指定(guibin注:原文 simplifies the specification of iteration strategies)。为了定义一个具体的迭代器,具体的迭代器中必须实现能够按照指定的顺序遍历二叉树的produceValues方法。

Fig. 4. Implementation of the producer and coordinator actors 写道
abstract class ActorProducer[T] {
    def produce(x: T) {
        coordinator ! Some(x)
    }
    private val producer = actor {
        produceValues
        coordinator ! None
    }
    ...
}


    private val coordinator = actor {
        loop { receive {
            case ’next => receive {
            case x: Option[_] => reply(x)
            }
        }}
}


Figure 4展示了使用两个actor实现的producer,一个producer actor,一个coordinator actor。producer运行produceValues方法进而能够给coordinator产生一系列包裹在Some中的值,这些序列以None结尾。coordinator用来同步来自客户端的请求和来自producer的值。在基于线程的版本中,produce方法没有接受一个持续性的参数。

在基于actor版本的实现中,改进了在基于事件的版本中不向CPS中指定的遍历(guibin注:原文The actor-based version improves over the event-driven version by not requiring to specify the traversal in CPS.)。此外基于actor版本的实现支持迭代器的并发,由于基于邮箱的通信是非竞争的。基于同样的原因,在基于线程的实现版本中,没有必要使用显示的阻塞队列,因为这个功能已经被actor的邮箱实现了。我们相信使用阻塞队列通信是非常的常见,以至于很有必要把这些队列都做成支持并发的actor的邮箱的形式。


Guibin
2011-03-20



0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics