`
guibin
  • 浏览: 363698 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

统一了线程和事件的Actors(Actors That Unify Threads and Events)(第二节)

阅读更多
统一了线程和事件的Actors(Actors That Unify Threads and Events)(第二节)
guibin.beijing@gmail.com

本文翻译自http://lamp.epfl.ch/~phaller/doc/haller07actorsunify.pdf,原作者 Philipp Haller 和 Martin Odersky.

第二节. 使用Actors编程
Actor是一个能通过消息传递的方式与其他actor通信的进程。总共有两种主要的通信抽象,分别叫做“send”和“receive”。表达式 a ! msg 表示把消息msg发送到actor a上。Send是一种异步操作,比如这个操作执行完毕后会立即返回。消息会在actor的邮箱中缓冲着。receive操作有如下的形式:
引用
receive {
case msgpat1 => action1
...
case msgpatN => actionN
}


匹配了模式msgpati的第一个消息从邮箱中被清除掉,与此同时actioni被执行,如果没有匹配上任何一个模式,那么这个actor就被挂起。

代码 1:
Fig. 1. Example: orders and cancellations 写道

// base version
val orderMngr = actor {
      while (true)
          receive {
              case Order(sender, item) =>
                  val o =
                      handleOrder(sender, item)
                  sender ! Ack(o)
             
              case Cancel(sender, o) =>
                  if (o.pending) {
                      cancelOrder(o)
                      sender ! Ack(o)
                  } else sender ! NoAck

              case x => junk += x
          }
}

val customer = actor {
    orderMngr ! Order(self, myItem)
    receive {
        case Ack(o) => ...
    }

// version with reply and !?
val orderMngr = actor {
    while (true)
        receive {
            case Order(item) =>
                val o =
                    handleOrder(sender, item)
                reply(Ack(o))

            case Cancel(o) =>
                if (o.pending) {
                    cancelOrder(o)
                    reply(Ack(o))
                } else reply(NoAck)

            case x => junk += x
        }
}

val customer = actor {
    orderMngr !? Order(myItem) match {
        case Ack(o) => ...
    }
}

比如代码1中的示例代码,表达式actor { body } 创建了一个新的actor,这个actor能够运行body中的代码。表达式self用来引用当前在执行的actor。每一个java线程也叫做一个actor,因此即便是主线程也能执行receive

代码1中的例子展示了目前介绍的actor的所有用法。首先,我们定义一个actor orderMngr在内部的一个无限循环中接受消息。receive操作一直在等待两种类型的消息。 Order(sender, item)处理器处理item的一个订单。当订单(Order)的对象创建时,同时也创建了发送订单的actor的引用,并且当订单对象处理完毕后,确认信息送还给发送者。 Cancel(sender, o)表示如果订单o还处于等待状态的话就取消订单,取消完毕后向发送者发送确认信息。如果订单不是处于等待状态,则向发送者发送NoAck消息,表示要求取消的是非等待订单。

orderMngrreceive中的最后的模式是一个可变模式,这个模式可以匹配任何消息。可变模式从邮箱中删除无法匹配上的没用的消息(“junk”)。我们有定义了一个customer actor,这个actor在发送了一个订单消息,在继续向下执行之前等待来自orderMngr的确认消息。既然使用actor方法生成的actor是异步的,因此我们定义的actors都是可以并行运行的。

注意,在上面的实例代码中我们做了许多重复性的工作用来实现请求/应答风格的通信。尤其是sender被显示的包含在了每一条消息中。这样的使用方式经常出现,因此我们的库函数对这种使用方式给予特殊的支持,即消息经常携带sender的信息,可以使用下面的语法:
引用
a !? msg    向a发送msg消息,等到响应之后返回。
sender       用来引用发送了最近一条被self接受了的消息的actor
reply(msg) 将消息回复给sender
a forward msg 使用当前的sender将消息发送给a,而不是使用self

使用了这些附加的便利工具,第一个例子就可以被重写成第二个例子。

看看上面展示的例子,可以看出Scala语言似乎是专门为actor的并发而设计的。事实上不是这样的,Scala仅仅假定了底层主机的基本线程模型。在例子中展示的所有上层的操作都是在Scala库中的class及class的方法定义的。在余下的章节中,我们将打开盖子看看下面是如何组成和实现的。并发处理的实现将在第四节中讨论。

发送消息的操作 ! 是用于给actor发送一条消息的。语法 a ! msg 是方法调用 a.!(msg) 的简写形式,就像 x + y 在Scala中是 x.+(y) 的简写一样。因此我们在Actor trait中定义了 ! 方法。
引用
trait Actor {
    private val mailbox = new Queue[Any]
    def !(msg: Any): unit = ...
    ...
}


这个方法做了两件事情,首先将从参数传入的消息入队进入Actor的邮箱中,这个邮箱现在是用一个私有的 Queue[Any] 类型的成员表示。其次,如果此时receiving actor在receive语句中处于挂起的状态,那么此时actor的执行语句就被触发,其中这个receive语句是能够处理发送过来的消息的。

receive { ... }结构很有意思,在Scala中,在花括号中的模式匹配表达式被当作第一类对象,这个对象能想receive方法传入参数,并且传入参数的类型是PartialFunction,这个PartialFunction是Function1的子类,Function1是一元函数相对应的基类。这两个方法的定义如下:
引用
abstract class Function1[-a,+b] {
    def apply(x: a): b
}
abstract class PartialFunction[-a,+b] extends Function1[a,b] {
    def isDefinedAt(x: a): boolean
}


Functions就是具有apply方法的object,而partial functions是除了有apply方法之外,还有一个附加的isDefinedAt(x: A)方法的object,这个isDefinedAt方法是用来测试是否定义了相应给定了参数的函数(guibin注:即检查参数x是否在该PartialFunction函数的作用域中)。这两种类型的class都是(类型)参数化的(gubin注:原文parameterized);首先,第一个类型参数a表示函数的输入参数类型,第二个类型参数b表示该函数的结果类型。

模式匹配表达式:
引用
{case p=> e; ...; case p=> e}

因此是一个定义了如下方法的partial function:
  • 如果其中一个模式Pi匹配上了参数,则isDefinedAt方法返回true,否则返回false。
  • 如果第一匹配上了输入参数的模式Pi,则apply方法返回相应的值ei。如果没有任何一个模式能够匹配上输入参数,则会抛出MatchError异常来。


这两个方法在receive中的实现如下。首先,在邮箱中的消息按照其出现的顺序被扫描,如果某个消息的定义正好是receive的输入参数f(guibin注:第一点,receive的实际定义是def receive[R](f: PartialFunction[Any, R]): R = {......},即receive的输入参数是PartialFunction类型的,第二点,如果扫描到的邮箱中的消息正好在PartialFunction中有定义,且匹配上了这个PartialFunction中定义的某个模式),则这条消息从邮箱中被删除,并且f被应用到这个消息上(guibin注:也就是说针对这个消息调用receive中f对应的相应的执行操作),另外一方面,如果对邮箱中的每条消息调用 f.isDefinedAt(m) 都返回 false,则这个receiving actor被挂起。

actorself均是定义在Actor object中的方法。Object特点在于在运行时情况下,确保只有一个实例,跟java中的static 方法类似。比如:
引用
object Actor {
    def self: Actor ...
    def actor(body: => unit): Actor ...
    ...
}


注意,Scala对类型和术语(types and terms)有不同的命名空间。比如,Actor这个名字既用于上面的object的名字(这里是术语,term),也用于trait的名字(trait Actor),即self和actor方法的返回类型的名字(这里是类型,type)。在actor方法的定义中,参数body定义了新生成的actor的行为。这是一个返回类型为unit的闭包(guibin注:原文closure),开始的箭头 => 表示这是一个还未被求值计算的表达式(thunk,形实转换程序)。

在Scala的actor库中,还有其他的函数我们没有涉及到,比如有一个方法receiveWithin这个方法能够指定一个时间段,在此时间段内消息应该能被收到,如果超过该时间段仍然没有收到,则允许actor 超时。当超时时,与TIMEOUT模式相匹配的行为将会被触发。超时能够用来挂起一个actor,完全清除邮箱中的消息,或者实现有优先级的消息。

Guibin
2011-03-19

1
0
分享到:
评论
1 楼 lantian_123 2012-03-25  
赞,收藏。。。

相关推荐

Global site tag (gtag.js) - Google Analytics