9.2 回调
为了展示非阻塞式I/O的原则,我们将运行一个极其简单的聊天应用,没有那些花里胡哨的功能。当用户第一次连接应用时,需要设定用户名,随后便可通过应用收发信息。
我们将使用Vert.x框架实现该应用,并且在实施过程中根据需要,引入其他一些必需的技术。让我们先来写一段接收TCP连接的代码,如例9-1所示。
例9-1 接收TCP连接
public class ChatVerticle extends Verticle {public void start() {vertx.createNetServer().connectHandler(socket -> {container.logger().info("socket connected");socket.dataHandler(new User(socket, this));}).listen(10_000);container.logger().info("ChatVerticle started");}}
读者可将Verticle想成Servlet——它是Vert.x框架中部署的原子单元。上述代码的入口是start方法,它和普通Java程序中的main方法类似。在聊天应用中,我们用它建立一个接收TCP连接的服务器。
然后向connectHandler方法输入一个Lambda表达式,每当有用户连接到聊天应用时,都会调用该Lambda表达式。这就是一个回调,与在第1章中介绍的Swing中的回调类似。这种方式的好处是,应用不必控制线程模型——Vert.x框架为我们管理线程,打理好了一切相关复杂性,程序员只需考虑事件和回调就够了。
我们的应用还通过dataHandler方法注册了另外一个回调,每当从网络套接字读取数据时,该回调就会被调用。在本例中,我们希望提供更复杂的功能,因此没有使用Lambda表达式,而是传入一个常规的User类,该类实现了相关的函数接口。User类的定义如例9-2所示。
例9-2 处理用户连接
public class User implements Handler<Buffer> {private static final Pattern newline = Pattern.compile("\\n");private final NetSocket socket;private final Set<String> names;private final EventBus eventBus;private Optional<String> name;public User(NetSocket socket, Verticle verticle) {Vertx vertx = verticle.getVertx();this.socket = socket;names = vertx.sharedData().getSet("names");eventBus = vertx.eventBus();name = Optional.empty();}@Overridepublic void handle(Buffer buffer) {newline.splitAsStream(buffer.toString()).forEach(line -> {if (!name.isPresent())setName(line);elsehandleMessage(line);});}// Class continues...
变量buffer包含了网络连接写入的数据,我们使用的是一个分行的文本协议,因此需要先将其转换成一个字符串,然后依换行符分割。
这里使用了正则表达式java.util.regex.Pattern的一个实例newline来匹配换行符。尤为方便的是,Java 8为Pattern类新增了一个splitAsStream方法,该方法使用正则表达式将字符串分割好后,生成一个包含分割结果的流对象。
用户连上聊天服务器后,首先要做的事是设置用户名。如果用户名未知,则执行设置用户名的逻辑;否则正常处理聊天消息。
还需要接收来自其他用户的消息,并且将它们传递给聊天程序客户端,让接收者能够读取消息。为了实现该功能,在设置当前用户用户名的同时,我们注册了另外一个回调,用来写入消息(例9-3)。
例9-3 注册聊天消息
eventBus.registerHandler(name, (Message<String> msg) -> {sendClient(msg.body());});
上述代码使用了Vert.x的事件总线,它允许在verticle对象之间以非阻塞式I/O的方式传递消息(如图9-1所示)。registerHandler方法将一个处理程序和一个地址关联,有消息发送给该地址时,就将之作为参数传递给处理程序,并且自动调用处理程序。这里使用用户名作为地址。

图9-1:使用事件总线传递消息
通过为地址注册处理程序并发消息的方式,可以构建非常复杂和解耦的服务,它们之间完全以非阻塞式I/O方式响应。需要注意的是,在我们的设计中没有共享状态。
Vert.x的事件总线允许发送多种类型的消息,但是它们都要使用Message对象进行封装。点对点的消息传递由Message对象本身完成,它们可能持有消息发送方的应答处理程序。在这种情况下,我们想要的是消息体,也就是文字本身,则只需调用body方法。我们通过将消息写入TCP连接,把消息发送给了用户聊天客户端。
当应用想要把消息从一个用户发送给另一个用户时,就使用代表另一个用户的地址(如例9-4所示),这里使用了用户的用户名。
例9-4 发送聊天信息
eventBus.send(user, name.get() + '>' + message);
让我们扩展这个基础聊天服务器,向关注你的用户群发消息,为此,需要实现两个新命令。
- 代表群发命令的感叹号,它能将信息群发给关注你的用户。如果Bob键入“!hello followers”,则所有关注Bob的用户都会收到该条信息:“Bob>hello followers”。
- 关注命令,用来关注一个用户,比如“follow Bob”。
一旦解析了命令,就可以着手实现broadcastMessage和followUser方法,它们分别代表了这两个命令。
这里的通信模式略有不同,除了给单个用户发消息,现在还拥有了群发信息的能力。幸好,Vert.x的事件总线允许我们将一条信息发布给多个处理程序(见图9-2),让我们得以沿用一种类似的方式。

图9-2:使用消息总线发布
代码的唯一变化是使用了事件总线的publish方法,而不是先前的send方法。为了避免用户使用!命令时和已有的地址冲突,在用户名后紧跟.followers。比如Bob发布一条消息时,所有注册到bob.followers的处理程序都会收到消息(如例9-5所示)。
例9-5 向关注者群发消息
private void broadcastMessage(String message) {String name = this.name.get();eventBus.publish(name + ".followers", name + '>' + message);}
在处理程序里,我们希望和早先的操作一样:将消息传递给客户(如例9-6所示)。
例9-6 接收群发的消息
private void followUser(String user) {eventBus.registerHandler(user + ".followers", (Message<String> message) -> {sendClient(message.body());});}
如果将消息发送到有多个处理程序监听的地址,则会轮询决定哪个处理程序会接收到消息。这意味着在注册地址时要多加小心。
