Active Object模式简介
Active Object模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于 System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc() 方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc()的调用方代码是运行在自己的线程上(通常是main线程派生的子 线程),而JVM的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc()这个方法所代表的动作(其所定义的功能)的 调用方和执行方是运行在不同的线程中的,从而提高了并发性。
再进一步介绍Active Object模式,我们可先简单地将其核心理解为一个名为ActiveObject的类,该类对外暴露了一些异步方法,如图1所示。
图 1. ActiveObject对象示例
doSomething方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething方法会被多个线程调用。这时所需的线程安 全控制封装在doSomething方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个Active Object对象的方法与调用普通Java对象的方法并无太大差别。如清单1所示。
清单 1. Active Object方法调用示例
ActiveObject ao=...;
Future future = ao.doSomething("data");
//执行其它操作
String result = future.get();
System.out.println(result);
Active Object模式的架构
当Active Object模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会 被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入Active Object模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用Active Object模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。
Active Object模式的主要参与者有以下几种。其类图如图2所示。
图 2. Active Object模式的类图
(点击图像放大)
Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法doSomething时,该方法会生成一个相 应的MethodRequest实例并将其存储到Scheduler所维护的缓冲区中。doSomething方法的返回值是一个表示其执行结果的外包装 对象:Future参与者的实例。异步方法doSomething运行在调用方代码所在的线程中。
MethodRequest:负责将调用方代码对Proxy实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将Proxy的异步方法的调用和执行分离成为可能。其call方法会根据其所包含上下文信息调用Servant实例的相应方法。
ActivationQueue:负责临时存储由Proxy的异步方法被调用时所创建的MethodRequest实例的缓冲区。
Scheduler:负责将Proxy的异步方法所创建的MethodRequest实例存入其维护的缓冲区中。并根据一定的调 度策略,对其维护的缓冲区中的MethodRequest实例进行执行。其调度策略可以根据实际需要来定,如FIFO、LIFO和根据 MethodRequest中包含的信息所定的优先级等。
Servant:负责对Proxy所暴露的异步方法的具体实现。
Future:负责存储和返回Active Object异步方法的执行结果。
Active Object模式的序列图如图3所示。
图 3. Active Object模式的序列图
(点击图像放大)
第1步:调用方代码调用Proxy的异步方法doSomething。
第2~7步:doSomething方法创建Future实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest 对象。然后以所创建的MethodRequest对象作为参数调用Scheduler的enqueue方法,以将MethodRequest对象存入缓冲 区。Scheduler的enqueue方法会调用Scheduler所维护的ActivationQueue实例的enqueue方法,将 MethodRequest对象存入缓冲区。
第8步:doSomething返回其所创建的Future实例。
第9步:Scheduler实例采用专门的工作线程运行dispatch方法。
第10~12步:dispatch方法调用ActivationQueue实例的dequeue方法,获取一个MethodRequest对象。然后调用MethodRequest对象的call方法
第13~16步:MethodRequest对象的call方法调用与其关联的Servant实例的相应方法doSomething。并将Servant.doSomething方法的返回值设置到Future实例上。
第17步:MethodRequest对象的call方法返回。
上述步骤中,第1~8步是运行在Active Object的调用者线程中的,这几个步骤实现了将调用方代码对Active Object所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17步是运行在Active Object的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object对外暴露的异步方法的调用与执行的分离。
如果调用方代码关心Active Object的异步方法的返回值,则可以在其需要时,调用Future实例的get方法来获得异步方法的真正执行结果。
Active Object模式实战案例
某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户13612345678给其同事13787654321发送彩信时,可以将接收方号码填写为对方的短号,如776,而非其真实的号码。
该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整 个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到 数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过Java的对象序列化API,将表示下发彩信的对象序列化到磁盘文件中从而实 现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及Active Object模式如何帮助我们满足这些考虑。
首先,请求消息缓存到磁盘中涉及文件I/O这种慢的操作,我们不希望它在请求处理的主线程(即Web服务器的工作线程)中执行。因为这样会使该模块 的响应延时增大,降低系统的响应性。并使得Web服务器的工作线程因等待文件I/O而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在Web服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘 文件中等操作)则是在Active Object工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响 应。而此时,当前请求消息可能正在被Active Object线程缓存到文件中。如图4所示。
图 4 .异步实现缓存
其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存 储到多个子目录中。每个子目录最多可以存储指定个数(如2000个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也 存满,依此类推。当这些子目录的个数到达指定数量(如100个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是 固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而Active Object模式中的Proxy参与者可以帮助我们封装并发访问控制。
下面,我们看该案例的相关代码通过应用Active Object模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的Active Object模式的客调用方代码。如清单2所示。
清单 2. 彩信下发请求处理的入口类
public class MMSDeliveryServlet extends HttpServlet {
private static final long serialVersionUID = 5886933373599895099L;
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
//将请求中的数据解析为内部对象
MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
Recipient originalNumberRecipient = null;
try {
// 将接收方短号转换为长号
originalNumberRecipient = convertShortNumber(shortNumberRecipient);
} catch (SQLException e) {
// 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
AsyncRequestPersistence.getInstance().store(mmsDeliverReq);
// 继续对当前请求的其它处理,如给客户端响应
resp.setStatus(202);
}
}
private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
//省略其它代码
return mmsDeliverReq;
}
private Recipient convertShortNumber(Recipient shortNumberRecipient)
throws SQLException {
Recipient recipent = null;
//省略其它代码
return recipent;
}
}