10.4 异步I/O

新异步功能的关键组成部分是一些实现 Channel 接口的类,这些类可以处理需要交给后台线程完成的 I/O 操作。这种功能还可以应用于长期运行的大型操作和其他几种场合。

这一节专门介绍处理文件 I/O 的 AsynchronousFileChannel 类,除此之外还要了解一些其他异步通道。本章末尾会介绍异步套接字。这一节介绍的内容包括:

  • 使用 AsynchronousFileChannel 类处理文件 I/O

  • 使用 AsynchronousSocketChannel 类处理客户端套接字 I/O

  • 使用 AsynchronousServerSocketChannel 类处理能接受连入连接的异步套接字

和异步通道交互有两种不同的方式:使用 Future 接口的方式和回调方式。

10.4.1 基于Future接口的方式

第 11 章会详细介绍 Future 接口,现在你只需知道这个接口表示进行中的任务,可能已经完成,也可能还未完成。这个接口有两个关键的方法。

  • isDone()

返回布尔值,表示任务是否已经完成。

  • get()

返回结果。如果已经结束,立即返回;如果还未结束,在完成前一直阻塞。

下面看个示例程序。这个程序异步读取一个大型文件(可能有 100 Mb):

  1. try (AsynchronousFileChannel channel =
  2. AsynchronousFileChannel.open(Paths.get("input.txt"))) {
  3. ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 1024 * 100);
  4. Future<Integer> result = channel.read(buffer, 0);
  5. while(!result.isDone()) {
  6. // 做些其他有用的操作……
  7. }
  8. System.out.println("Bytes read: " + result.get());
  9. }

10.4.2 基于回调的方式

处理异步 I/O 的回调方式基于 CompletionHandler 接口实现,这个接口定义了两个方法,completed()failed(),分别在操作成功和失败时调用。

处理异步 I/O 时,如果想立即收到事件提醒,可以使用这种方式。例如,有大量 I/O 操作要执行,但其中某次操作失败不会导致重大错误,这种情况就可以使用回调方式:

  1. byte[] data = {2, 3, 5, 7, 11, 13, 17, 19, 23};
  2. ByteBuffer buffy = ByteBuffer.wrap(data);
  3. CompletionHandler<Integer,Object> h =
  4. new CompletionHandler() {
  5. public void completed(Integer written, Object o) {
  6. System.out.println("Bytes written: " + written);
  7. }
  8. public void failed(Throwable x, Object o) {
  9. System.out.println("Asynch write failed: "+ x.getMessage());
  10. }
  11. };
  12. try (AsynchronousFileChannel channel =
  13. AsynchronousFileChannel.open(Paths.get("primes.txt"),
  14. StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
  15. channel.write(buffy, 0, null, h);
  16. Thread.sleep(1000); // 必须这么做,防止退出太快
  17. }

AsynchronousFileChannel 对象关联一个后台线程池,所以处理 I/O 操作时,原线程可以继续处理其他任务。

默认情况下,这个线程池由运行时提供并管理。如果需要,线程池也可以由应用创建和管理(通过 AsynchronousFileChannel.open() 方法的某个重载形式),不过一般不需要这么做。

最后,为了完整性,我们还要简单介绍 NIO 对多路复用 I/O 的支持。在多路复用 I/O 中,单个线程能管理多个通道,而且会检测哪个通道做好了读或写的准备。支持多路复用 I/O 的类在 java.nio.channels 包中,包括 SelectableChannelSelector

编写需要高伸缩性的高级应用时,这种非阻塞式多路复用技术特别有用,不过对这个话题的完整讨论超出了本书的范畴。

10.4.3 监视服务和目录搜索

我们要介绍的最后一种异步服务是监视目录,或访问目录(或树状结构)。监视服务会观察目录中发生的所有事情,例如创建或修改文件:

  1. try {
  2. WatchService watcher = FileSystems.getDefault().newWatchService();
  3. Path dir = FileSystems.getDefault().getPath("/home/ben");
  4. WatchKey key = dir.register(watcher,
  5. StandardWatchEventKinds.ENTRY_CREATE,
  6. StandardWatchEventKinds.ENTRY_MODIFY,
  7. StandardWatchEventKinds.ENTRY_DELETE);
  8. while(!shutdown) {
  9. key = watcher.take();
  10. for (WatchEvent<?> event: key.pollEvents()) {
  11. Object o = event.context();
  12. if (o instanceof Path) {
  13. System.out.println("Path altered: "+ o);
  14. }
  15. }
  16. key.reset();
  17. }
  18. }

相比之下,目录流提供的是单个目录中当前所有文件的情况。例如,若想列出所有 Java 源码文件及其大小(以字节为单位),可以使用如下代码:

  1. try(DirectoryStream<Path> stream =
  2. Files.newDirectoryStream(Paths.get("/opt/projects"), "*.java")) {
  3. for (Path p : stream) {
  4. System.out.println(p +": "+ Files.size(p));
  5. }
  6. }

这个 API 有个缺点,即只能返回匹配通配模式的元素,这有时不够灵活。我们可以更进一步,使用新方法 Files.find()Files.walk(),递归遍历目录,找出每个元素:

  1. final Pattern isJava = Pattern.compile(".*\\.java$");
  2. final Path homeDir = Paths.get("/Users/ben/projects/");
  3. Files.find(homeDir, 255,
  4. (p, attrs) -> isJava.matcher(p.toString()).find())
  5. .forEach(q -> {System.out.println(q.normalize());});

我们还可以更进一步,使用 java.nio.file 包中的 FileVisitor 接口编写高级的解决方案,不过,此时需要开发者实现 FileVisitor 接口中的全部四个方法,不能像上述代码那样只使用一个 lambda 表达式。

本章的最后一节要讨论 Java 对网络的支持以及 JDK 中相应的核心类。