11.1 并发数据结构

每个计算机程序都要用到数据。它们从数据库、文件或者其他来源获取数据,对数据进行转换,然后将转换后的数据再写回到某个数据库、文件或者其他目标。程序对存放在内存中的数据进行操作,并且采用数据结构将数据存放在内存中。

实现一个并发应用程序时,必须注意数据结构的使用。如果不同的线程可以修改存放在某个唯一数据结构中的数据,就必须使用同步机制保护在该数据结构之上的修改操作。如果不这样做,就会出现数据竞争条件。应用程序可能有时可以正确工作,但是下一次可能就会遇到某个随机性的异常,进而陷入死循环,或者毫无声息地给出一个不正确的结果。究竟会出现何种结局,取决于执行的顺序。

为了避免数据竞争条件,可以进行如下操作。

  • 使用一种非同步的数据结构,并且自己为其加入同步机制。
  • 使用由Java并发API提供的某种数据结构,这种数据结构在内部实现了同步机制,并且针对并发应用程序做了优化。

第二种供选方案是最推荐的。本节将回顾最重要的并发数据结构。

11.1.1 阻塞型数据结构和非阻塞型数据结构

Java并发API中提供了两种并发数据结构。

  • 阻塞型数据结构:这种类型的数据结构提供了插入数据和删除数据的方法,当操作无法立即执行时(例如,如果你要选取某个元素但数据结构为空),执行调用的线程就会被阻塞,直到可以执行该操作为止。
  • 非阻塞型数据结构:这种类型的数据结构提供了插入数据和删除数据的方法,当无法立即执行操作时,返回一个特定值或者抛出一个异常。

有时,非阻塞型数据结构会有一个与之等效的阻塞型数据结构。例如,ConcurrentLinkedDeque类是一个非阻塞型数据结构,而LinkedBlockingDeque类则是一个与之等效的阻塞型数据结构。阻塞型数据结构的一些方法具有非阻塞型数据结构的行为。例如,Deque接口定义了pollFirst()方法,如果双端队列为空,该方法并不会阻塞,而是返回null值。另一方面,getFirst()方法在这种情况下会抛出异常。每个阻塞型队列的实现都实现了该方法。

11.1.2 并发数据结构

Java集合框架(Java collections framework,JCF)提供了一个包含多种可用于串行编程的数据结构集合。Java并发API对这些数据结构进行了扩展,提供了另外一些可用于并发应用程序的数据结构,包括如下两项。

  • 接口:扩展了JCF提供的接口,添加了一些可用于并发应用程序的方法。
  • 类:实现了前面的接口,提供了可以用于应用程序的具体实现。

下面将介绍你会在并发应用程序中用到的接口和类。

  • 接口

首先,介绍一下由并发数据结构实现的最重要的接口。

  • BlockingQueue

队列是一种线性数据结构,允许在队列的末尾插入元素且从队列的起始位置获取元素。它是一个先入先出(FIFO)型数据结构,第一个进入队列的元素将是第一个被处理的元素。

JCF定义了Queue接口,该接口定义了在队列中执行的基本操作。该接口提供了实现如下操作的方法。

  1. - 在队列的末尾插入一个元素。
  2. - 从队列的首部开始检索并删除一个元素。
  3. - 从队列的首部开始检索一个元素但不删除。

对于这些方法,该接口定义了两个版本。它们在方法执行时具有不同的表现(例如,如果你要检索某个空队列中的元素)。

  1. - 可以抛出异常的方法。
  2. - 可以返回某一特定值的方法,例如<code>false</code>或<code>null</code>。

下表包含了每个操作所对应的方法名称。

操作抛出异常返回特殊值 插入add()offer() 检索并删除remove()poll() 检索但不删除element()peek()

BlockingQueue接口扩展了Queue接口,添加了当操作不可执行时阻塞调用线程的方法。这些方法有如下几种。

操作阻塞 插入put() 检索并删除take() 检索但不删除N/A

  • BlockingDeque

与队列一样,双端队列也是一种线性数据结构,但是允许从该数据结构的两端插入和删除元素。JCF定义了Deque接口,该接口扩展了Queue接口。除了Queue接口提供的方法之外,它还提供了从两端执行插入、检索且删除、检索但不删除等操作的方法。

操作抛出异常返回特定值 插入addFirst()addLast()offerFirst()offerLast() 检索并删除removeFirst()removeLast()pollFirst()pollLast() 检索但不删除getFirst()getLast()peekFirst()peekLast()

BlockingDeque接口扩展了Deque接口,添加了当操作无法执行时阻塞调用线程的方法。

操作阻塞 插入putFirst()putLast() 检索并删除takeFirst()takeLast() 检索但不删除N/A

  • ConcurrentMap

map(有时也叫关联数组)是一种允许存储(键,值)对的数据结构。JCF提供了Map接口,它定义了使用map的基本操作。这些方法包括如下几个。

  1. - <code>put()</code>:向map插入一个(键,值)对。
  2. - <code>get()</code>:返回与某个键相关联的值。
  3. - <code>remove()</code>:删除与特定键相关联的(键,值)对。
  4. - <code>containsKey()</code>和<code>containsValue()</code>:如果map中包含值的特定键,则返回<code>true</code>。

该接口在Java 8中做了修改,包含了下述新方法。本章接下来的内容将讲到如何使用这些方法。

  1. - <code>forEach()</code>:该方法针对map的所有元素执行给定函数。 <code>\* compute()</code>、<code>computeIfAbsent()</code>和<code>computeIfPresent()</code>:这些方法允许指定一个函数,该函数用于计算与某个键相关的新值。
  2. - <code>merge()</code>:该方法允许你指定将某个(键,值)对合并到某个已有的map中。如果map中没有该键,则直接插入,否则,执行指定的函数。

ConcurrentMap扩展了Map接口,为并发应用程序提供了相同的方法。请注意,在Java 8和Java 9中(与Java 7不同),ConcurrentMap接口并未在Map接口的基础上增加新方法。

  • TransferQueue

该接口扩展了BlockingQueue接口,并且增加了将元素从生产者传输到消费者的方法。在这些方法中,生产者可以一直等到消费者取走其元素为止。该接口添加的新方法有如下几项。

  1. - <code>transfer()</code>:将一个元素传输给一个消费者,并且等待(阻塞调用线程)该元素被使用。
  2. - <code>tryTransfer()</code>:如果有消费者等待,则传输一个元素。否则,该方法返回<code>false</code>值,并且不将该元素插入队列。

Java并发API为之前描述的接口提供了多种实现,其中一些实现并没有增加任何新特征,而另一些实现则增加了新颖有用的功能。

  • LinkedBlockingQueue

该类实现了BlockingQueue接口,提供了一个带有阻塞型方法的队列,该方法可以有任意有限数量的元素。该类还实现了QueueCollectionIterable接口。

  • ConcurrentLinkedQueue

该类实现了Queue接口,提供了一个线程安全的无限队列。从内部来看,该类使用一种非阻塞型算法保证应用程序中不会出现数据竞争。

  • LinkedBlockingDeque

该类实现了BlockingDeque接口,提供了一个带有阻塞型方法的双端队列,它可以有任意有限数量的元素。LinkedBlockingDeque具有比LinkedBlockingQueue更多的功能,但是其开销更大。因此,应在双端队列特性不必要的场合使用LinkedBlockingQueue类。

  • ConcurrentLinkedDeque

该类实现了Deque接口,提供了一个线程安全的无限双端队列,它允许在双端队列的两端添加和删除元素。它具有比ConcurrentLinkedQueue更多的功能,但与LinkedBlockingDeque相同,该类开销更大。

  • ArrayBlockingQueue

该类实现了BlockingQueue接口,基于一个数组提供了阻塞型队列的一个实现,可以有有限个元素。它还实现了QueueCollectionIterable接口。与基于数组的非并发数据结构(ArrayListArrayDeque)不同,ArrayBlockingQueue按照构造函数中所指定的固定大小为数组分配空间,而且不可再调整其大小。

  • DelayQueue

该类实现了BlockingDeque接口,提供了一个带有阻塞型方法和无限数目元素的队列实现。该队列的元素必须实现Delayed接口,因此它们必须实现getDelay()方法。如果该方法返回一个负值或0,那么延时已过期,可以取出队列的元素。位于队列首部的是延时负数值最小的元素。

  • LinkedTransferQueue

该类提供了一个TransferQueue接口的实现。它提供了一个元素数量无限的阻塞型队列。这些元素有可能被用作生产者和消费者之间的通信信道。在那里,生产者可以等待消费者处理它们的元素。

  • PriorityBlockingQueue

该类提供了BlockingQueue接口的一个实现,在该类中可以按照元素的自然顺序选择元素,也可以通过该类构造函数中指定的比较器选择元素。该队列的首部由元素的排列顺序决定。

  • ConcurrentHashMap

该类提供了ConcurrentMap接口的一个实现。它提供了一个线程安全的哈希表。除了Java 8中Map接口新增加的方法之外,该类还增加了其他一些方法。

  1. - <code>search()</code>、<code>searchEntries()</code>、<code>searchKeys()</code>和<code>searchValues()</code>:这些方法允许对 (键,值)对、键或者值应用搜索函数。这些搜索功能可以是一个lambda表达式。搜索函数返回一个非空值时,该方法结束。这也是该方法的执行结果。
  2. - <code>reduce()</code>、<code>reduceEntries()</code>、<code>reduceKeys()</code>和<code>reduceValues()</code>:这些方法允许应用一个<code>reduce()</code>操作转换(键,值)对、键,或者将其整个哈希表作为流处理(参考第9章,获取有关<code>reduce()</code>方法的详细内容)。

ConcurrentHashMap针对那些依赖其线程安全性而非同步细节的程序。调整map的大小是一项比较慢的操作。该类还增加了其他一些方法,例如forEachValueforEachKey等,但是此处不再赘述。

11.1.3 使用新特性

本节,你将学会如何使用在Java 8和Java 9中为并发数据结构引入的新特性。

  • ConcurrentHashMap的第一个例子

第9章实现了一个应用程序,可以对一个由20 000个亚马逊商品构成的数据集进行搜索。我们从亚马逊商品联合采购网络元数据中获取了这些信息,该元数据中包含有关548 552件商品的信息,包括商品的名称、销售排名和相似商品。可以通过搜索SNAP网站的“Amazon product co-purchasing network metadata”下载该数据集。在该例子中,我们采用了一个名为productsByBuyerConcurrentHashMap>存放用户所购买商品的信息。该map的键是用户的标识符,而其值为用户购买商品的列表。本节将采用该map学习如何使用ConcurrentHashMap类的新方法。

  • forEach()方法

该方法允许你指定对ConcurrentHashMap的每个(键,值)对都要执行的函数。该方法有很多版本,但是最基本的版本只有一个可以以lambda表达式表示的BiConsumer函数。例如,你可以使用该方法打印每个用户购买了多少商品,其代码如下:

  1. productsByBuyer.forEach( (id, list) -> System.out.println(id+":
  2. "+list.size()));

这个基本版的forEach()方法是常规Map接口的一部分,通常以顺序方式执行。在这段代码中我们使用了一个lambda表达式,其中id是元素的键,而list是元素的值。

在另一个例子中,使用了forEach()方法来计算用户的平均评级。

  1. productsByBuyer.forEach( (id, list) -> {
  2. double average=list.stream().mapToDouble(item -> item.getValue())
  3. .average().getAsDouble();
  4. System.out.println(id+": "+average);
  5. });

在这段代码中,也使用了一个lambda表达式,其中id是元素的键,list是元素的值。我们将一个流应用到该商品列表,计算了平均评级。

该方法还有如下其他版本。

  1. - <code>forEach(parallelismThreshold, action)</code>:这是要在并发应用程序中使用的版本。如果map的元素多于第一个参数指定的数目,该方法将以并行方式执行。
  2. - <code>forEachEntry(parallelismThreshold, action)</code>:该版本与上一版本相似,只不过在该版本中Action是<code>Consumer</code>接口的一个实现,它接收一个<code>Map.Entry</code>对象作为参数,其中含有元素的键和值。这种情况下也可以使用一个lambda表达式。
  3. - <code>forEachKey(parallelismThreshold, action)</code>:该版本与前一版本相似,只不过在这种情况下Action仅应用于<code>ConcurrentHashMap</code>的键。
  4. - <code>forEachValue(parallelismThreshold, action)</code>:该版本与前一版本相似,只不过在这种情况下Action仅应用于<code>ConcurrentHashMap</code>的值。

当前的实现采用公共的ForkJoinPool实例执行并行任务。

  • search()方法

该方法对ConcurrentHashMap的所有元素均应用一个搜索函数。该搜索函数可以返回一个空值或者一个不同于null的值。search()方法将返回搜索函数所返回的第一个非空值。该方法接收两个参数。

  1. - <code>parallelismThreshold</code>:如果map的元素比该参数指定的数目多,该方法将以并行方式执行。
  2. - <code>searchFunction</code>:这是<code>BiFunction</code>接口的一个实现,可以表示为一个lambda表达式。该函数接收每个元素的键和值作为参数,而且如前所述,如果找到了要找的结果,该函数就必须返回一个非空值,否则返回一个空值。

例如,你可以采用该函数查找第一本含有某个单词的书。

  1. ExtendedProduct firstProduct=productsByBuyer.search(100,
  2. (id, products) -> {
  3. for (ExtendedProduct product: products) {
  4. if (product.getTitle().toLowerCase().contains("java")) {
  5. return product;
  6. }
  7. }
  8. return null;
  9. });
  10. if (firstProduct!=null) {
  11. System.out.println(firstProduct.getBuyer()+":"+
  12. firstProduct.getTitle());
  13. }

本例使用100作为parallelismThreshold,使用一个lambda表达式实现搜索函数。在该函数中,对于每个元素而言,我们将会处理该列表中的所有商品。如果找到了一个含有单词java的商品,则返回该商品。这是由search()方法返回的值。最后,在控制台打印该商品的购买者和商品名称。

该方法的其他版本还有如下几种。

  1. - <code>searchEntries(parallelismThreshold, searchFunction)</code>:在这种情况下,搜索函数是<code>Function</code>接口的一个实现,接收一个<code>Map.Entry</code>对象作为参数。
  2. - <code>searchKeys(parallelismThreshold, searchFunction)</code>:在这种情况下,搜索函数仅应用于<code>ConcurrentHashMap</code>的键。
  3. - <code>searchValues(parallelismThreshold, searchFunction)</code>:在这种情况下,搜索函数仅应用于<code>ConcurrentHashMap</code>的值。
  • reduce()方法

该方法和Stream框架提供的reduce()方法相似,但是在这种情况下,你将直接对ConcurrentHashMap的元素进行操作。该方法接收以下三个参数。

  1. - <code>parallelismThreshold</code>:如果<code>ConcurrentHashMap</code>的元素数多于该参数所指定的数目,该方法将以并行方式执行。
  2. - <code>transformer</code>:该参数是<code>BiFunction</code>接口的一个实现,可以表示为一个lambda函数。它接收一个键和一个值作为参数,并且返回这些元素的转换结果。
  3. - <code>reducer</code>:该参数是<code>BiFunction</code>接口的一个实现,也可以表示为一个lambda函数。它接收由转换器函数返回的两个对象作为参数。该函数的目标是将这两个对象组合成一个对象。

作为该方法的例子之一,我们将获取一个评论取值为1(最坏情况)的商品列表。本例用到了两个辅助变量。第一个是transformer。它是一个BiFunction接口,用作reduce()方法的tramsformer元素。

  1. BiFunction<String, List<ExtendedProduct>, List<ExtendedProduct>>
  2. transformer = (key, value) ->value.stream().filter(product ->
  3. product.getValue() == 1).collect(Collectors.toList());

该函数接收键(即用户的id)和一个ExtendedProduct对象列表(含有该用户购买的商品)作为参数。我们处理该列表中的所有商品,并且返回评级为1的商品。

第二个变量是约简器BinaryOperator,作为reduce()方法的约简器函数。

  1. BinaryOperator<List<ExtendedProduct>> reducer = (list1, list2) ->{
  2. list1.addAll(list2);
  3. return list1;
  4. };

该约简器接收两个ExtendedProduct列表作为参数,并且使用addAll()方法将它们连接成一个列表。

现在,只需要实现对reduce()方法的调用。

  1. List<ExtendedProduct> badReviews=productsByBuyer.reduce(10,
  2. transformer, reducer);
  3. badReviews.forEach(product -> {
  4. System.out.println(product.getTitle()+":"+
  5. product.getBuyer()+":"+product.getValue());
  6. });

还有其他一些版本的reduce()方法。

  1. - <code>reduceEntries()</code>、<code>reduceEntriesToDouble()</code>、<code>reduceEntriesToInt()</code>和<code>reduceEntriesToLong()</code>:对于这些情况,转换器函数和约简器函数都针对<code>Map.Entry</code>对象进行处理。后三个版本的方法分别返回一个<code>double</code>、一个<code>int</code>和一个<code>long</code>值。
  2. - <code>reduceKeys()</code>、<code>reduceKeysToDouble()</code>、<code>reduceKeysToInt()</code>和<code>reduceKeysToLong()</code>:对于这些情况,转换器函数和约简器函数都针对map的键进行处理。后三个版本的方法分别返回一个<code>double</code>、一个<code>int</code>和一个<code>long</code>值。
  3. - <code>reduceToInt()</code>、<code>reduceToDouble()</code>和<code>reduceToLong()</code>:对于这些情况,转换器函数针对键和值进行处理,而约简器方法分别针对<code>int</code>、<code>double</code>和<code>long</code>数值进行处理。这些方法分别返回一个<code>int</code>、一个<code>double</code>和一个<code>long</code>值。
  4. - <code>reduceValues()</code>、<code>reduceValuesToDouble()</code>、<code>reduceValuesToInt()</code>和<code>reduceValuesToLong()</code>:对于这些情况,转换器函数和约简器函数都针对map的值进行处理。后三个版本的方法分别返回一个<code>double</code>、一个<code>int</code>和一个<code>long</code>值。
  • compute()方法

该方法(在Map接口中定义)接收一个元素的键和BiFunction接口的一个实现(可以用lambda表达式表示)作为参数。如果元素的键存在于ConcurrentHashMap中,则该函数将接收元素的键和值作为参数,否则将接收空值作为参数。如果该函数返回的值存在,该方法将用该函数返回的值来替换与该键相关的值;如果该函数返回的值不存在,则将该值插入到ConcurrentHashMap;如果返回值为null,则说明当前项已存在,那么就删除当前项。请注意,在BiFunction执行期间,将锁闭一个或几个map记录。因此,BiFunction的执行时间不应过长,而且不应该尝试更新同一map中的任何其他记录,否则可能会出现死锁。

例如,我们在使用该方法时,可以采用Java 8中引入的名为LongAdder新型原子变量,以计算和每个商品相关的差评数量。我们创建了一个新的ConcurrentHashMap,名为counter。它的键是商品的名称,值为LongAdder类的一个对象,用于计算每个商品有多少差评。

  1. ConcurrentHashMap<String, LongAdder> counter=new ConcurrentHashMap<>();

我们处理前面计算得到的所有badReviewsConcurrentLinkedDeque元素,并且使用compute()方法来创建和更新与每个商品相关的LongAdder

  1. badReviews.forEach(product -> {
  2. counter.computeIfAbsent(product.getTitle(), title -> new
  3. LongAdder()).increment();
  4. });
  5. counter.forEach((title, count) -> {
  6. System.out.println(title+":"+count);
  7. });

最后,将结果输出到控制台。

  • ConcurrentHashMap的另一个例子

ConcurrentHashMap类中还增加了另一个方法,它也是Map接口中定义的方法。这就是merge()方法,它可以将一个(键,值)对合并到map。如果ConcurrentHashMap中不存在该键,则直接插入该键。如果ConcurrentHashMap中存在该键,则需要定义新旧两个键中究竟哪一个应该与新值相关联。该方法接收三个参数。

  • 要合并的键。
  • 要合并的值。
  • 可表示为一个lambda表达式的BiFunction的实现。该函数接收与该键相关的旧值和新值作为参数。该方法将该函数返回的值与该键关联。BiFunction执行时对map进行部分锁定,这样可以保证同一个键不会被并发执行。
    例如,我们按照评论的年份字段对前面用到的亚马逊的20 000个商品进行了划分。对于每一年,均加载ConcurrentHashMap,其键为商品,而其值为评论列表。这样,便可以通过下述代码加载1995年和1996年的评论。
  1. Path path=Paths.get("data\\amazon\\1995.txt");
  2. ConcurrentHashMap<BasicProduct, ConcurrentLinkedDeque<BasicReview>>
  3. products1995=BasicProductLoader.load(path);
  4. showData(products1995);
  5. path=Paths.get("data\\amazon\\1996.txt");
  6. ConcurrentHashMap<BasicProduct,ConcurrentLinkedDeque<BasicReview>>
  7. products1996=BasicProductLoader.load(path);
  8. System.out.println(products1996.size());
  9. showData(products1996);

如果想将两个ConcurrentHashMap合并为一个,则可以使用下面的代码。

  1. products1996.forEach(10,(product, reviews) -> {
  2. products1995.merge(product, reviews, (reviews1, reviews2) -> {
  3. System.out.println("Merge for: "+product.getAsin());
  4. reviews1.addAll(reviews2);
  5. return reviews1;
  6. });
  7. });

我们处理Products1996 ConcurrentHashMap的所有元素,并且对每个(键,值)对都调用Products1995 ConcurrentHashMapmerge()方法。merge函数将接收两个评论列表,这样我们只需将它们连接成一个列表即可。

  • 一个采用ConcurrentLinkedDeque类的例子

Collection接口也引入了Java 8中的一些新方法。大多数并发数据结构都实现了该接口,因此可以通过它们使用这些新特性。其中两种方法是stream()parallelStream(),它们在第8章和第9章中都已经用到。下面看看如何使用另外两种方法,这要用到前面章节已提到的含有20 000个商品的ConcurrentLinkedDeque

  • removeIf()方法

该方法在Collection接口中有一个默认实现,它是非并发的而且并没有被ConcurrentLinkedDeque类重载。该方法接收一个Predicate接口的实现作为参数,这样就会接收Collection中的一个元素作为参数,而且应该返回一个truefalse值。该方法将处理Collection中的所有元素,而且当谓词取值为true时将删除这些元素。

例如,如果要删除所有销售排名高于1000的商品,可以使用下面的代码。

  1. System.out.println("Products: "+productList.size());
  2. productList.removeIf(product -> product.getSalesrank() > 1000);
  3. System.out.println("Products; "+productList.size());
  4. productList.forEach(product -> {
  5. System.out.println(product.getTitle()+": "+
  6. product.getSalesrank());
  7. });
  • spliterator()方法

该方法返回Spliterator接口的一个实现。一个spliterator定义了可被Stream API使用的数据源。需要直接使用spliterator的情况很少,但是有时可能希望创建自己的spliterator来为流产生一个定制的源(例如,如果实现了自己的数据结构)。如果有自己的spliterator实现,可以使用StreamSupport.stream(mySpliterator, isParallel)在其之上创建一个流。其中,isParallel是一个布尔值,决定了要创建的流是否为并行流。spliterator在某种意义上很像迭代器,可用来遍历集合中的所有元素,但你可以对元素进行划分,从而以并发的方式进行遍历操作。

一个spliterator具有8个定义其行为的不同特征。

  1. - <code>CONCURRENT</code>:可以安全地以并发方式对spliterator源进行修改。
  2. - <code>DISTINCT</code>:spliterator所返回的所有元素均不相同。
  3. - <code>IMMUTABLE</code>:spliterator源无法被修改。
  4. - <code>NONNULL</code>:spliterator不返回<code>null</code>值。
  5. - <code>ORDERED</code>:spliterator所返回的元素是经过排序的(这意味着它们的顺序很重要)。
  6. - <code>SIZED</code>:spliterator可以使用<code>estimateSize()</code>方法返回确定数目的元素。
  7. - <code>SORTED</code>:spliterator源经过了排序。
  8. - <code>SUBSIZED</code>:如果使用<code>trySplit()</code>方法分割该spliterator,产生的spliterator将是<code>SIZED</code>和<code>SUBSIZED</code>的。

该接口最有用的方法是如下几种。

  1. - <code>estimatedSize()</code>:该方法将返回spliterator中元素数的估计值。
  2. - <code>forEachRemaining()</code>:该方法允许你将一个<code>Consumer</code>接口的实现(可以表示为一个lambda函数)应用到spliterator尚未进行处理的元素。
  3. - <code>tryAdvance()</code>:该方法接收一个<code>Consumer</code>接口的实现(可以表示为一个lambda函数)作为参数。它选取spliterator中的下一个元素,使用<code>Consumer</code>实现进行处理并返回<code>true</code>值。如果spliterator再没有要处理的元素,则它返回<code>false</code>值。
  4. - <code>trySplit()</code>:该方法尝试将spliterator分割成两个部分。作为调用方的spliterator将处理其中的一些元素,而返回的spliterator将处理另一些元素。如果该spliterator是<code>ORDERED</code>,则返回的spliterator必须按照严格排序处理元素,而且调用方也必须按该严格排序处理。
  5. - <code>hasCharacteristics()</code>:该方法允许你检查spliterator的属性。

下面看一个关于该方法的例子,这里要用到含有20 000个商品的ArrayList数据结构。

首先,我们需要一个辅助任务,它将对一个商品集合进行处理,将它们的名称转换成小写形式。该任务将采用一个Spliterator作为属性。

  1. public class SpliteratorTask implements Runnable {
  2. private Spliterator<Product> spliterator;
  3. public SpliteratorTask (Spliterator<Product> spliterator) {
  4. this.spliterator=spliterator;
  5. }
  6. @Override
  7. public void run() {
  8. int counter=0;
  9. while (spliterator.tryAdvance(product -> {
  10. product.setTitle(product.getTitle().toLowerCase());
  11. })) {
  12. counter++;
  13. };
  14. System.out.println(Thread.currentThread().getName()
  15. +":"+counter);
  16. }
  17. }

正如你所看到的,当该任务完成执行时,它将输出已处理的商品数量。

在主方法中,一旦将20 000个商品加载到ConcurrentLinkedQueue,就可以得到一个spliterator,检查它的一些属性,并且查看其估计规模。

  1. Spliterator<Product> split1=productList.spliterator();
  2. System.out.println(split1.hasCharacteristics(Spliterator.CONCURRENT));
  3. System.out.println(split1.hasCharacteristics(Spliterator.SUBSIZED));
  4. System.out.println(split1.estimateSize());

然后,使用trySplit()方法来分割该spliterator,并且查看两个spliterator的大小。

  1. Spliterator<Product> split2=split1.trySplit();
  2. System.out.println(split1.estimateSize());
  3. System.out.println(split2.estimateSize());

最后,可以在一个执行器中执行两个任务,其中一个针对spliterator,用于查看每个spliterator是否确实将预期数量的元素处理完毕。

  1. ThreadPoolExecutor executor=(ThreadPoolExecutor)
  2. Executors.newCachedThreadPool();
  3. executor.execute(new SpliteratorTask(split1));
  4. executor.execute(new SpliteratorTask(split2));

在下面的屏幕截图中,你可以看到本例的执行结果。

11.1 并发数据结构 - 图1

可以发现,在分割spliterator之前,estimatedSize()方法如何返回20 000个元素。在执行trySplit()方法之后,每个spliterator都有10 000个元素。这些就是每个任务所处理的元素。

11.1.4 原子变量

原子变量是在Java 1.5中引入的,用于提供针对integerlongbooleanreferenceArray对象的原子操作。它们提供了一些方法来递增值、递减值、确定值、返回值,或者在其当前值等于预定义值时确定值。原子变量提供了与volatile关键字相似的保障。

Java 8中增加了四个新类,即DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder。在前一节中,我们使用LongAdder类计算了商品的差评数。该类提供了与AtomicLong相似的功能,但是当经常更新来自不同线程的累加操作并且只需要在操作的末端给出结果时,该类具有更好的性能。DoubleAdder函数与之类似,只不过针对double值。这两个类的主要目标都是为了给出一个不同的线程可以以一致的方式对其更新的计数器。这些类当中最重要的方法包括如下几种。

  • add():为计数器增加参数中指定的值。
  • increment():相当于add(1)
  • decrement():相当于add(-1)
  • sum():该方法返回计数器的当前值。

请注意,DoubleAdder类并没有increment()decrement()方法。

LongAccumulator类和LongAdder类很类似,但是它们也有一个非常明显的区别。它们都有一个可以指定如下两个参数的构造函数。

  • 内部计数器的标识值。
  • 一个将新值累加到累加器的函数。

要注意的是,该函数并不依赖于累加的顺序。在这种情况下,最重要的方法就是如下两种。

  • accumulate():该方法接收一个long值作为参数。它应用函数对计数器进行递增或递减操作,使之成为当前值和参数指定值。
  • get():返回计数器的当前值。

例如,下面的代码执行完毕后会将362 880输出到控制台。

  1. LongAccumulator accumulator=new LongAccumulator((x,y) -> x*y, 1);
  2. IntStream.range(1, 10).parallel().forEach(x -> accumulator
  3. .accumulate(x));
  4. System.out.println(accumulator.get());

在累加器中使用交换运算,这样对于任意输入顺序,其输出结果均相同。

11.1.5 变量句柄

变量句柄(variable handle)是一种对变量、静态域或数组元素的动态型引用,使你可以多种不同的模式访问该变量。例如,可以在并发应用程序中对变量进行访问保护,实现对该变量的原子访问。在此之前,你只能通过原子变量获得这样的行为,但是现在可以使用变量句柄获得同样的功能,而不需要采用任何同步机制。

这是Java 9中引入的一种新特性,由VarHandle类提供。变量句柄有如下几种访问方法。

  • 读取访问模式:根据不同方法,该模式允许按照不同的内存排序规则读取变量的值。你可以使用get()getVolatile()getAcquire()getOpaque()方法读取变量的值。第一种方法将变量视为非易失性变量读取。第二种方法将变量作为易失性变量来读取。第三种方法确保对该变量的其他访问在该语句之前不会因为优化方面的原因而重新排序。而最后一种方法与第三种类似,但是它仅对当前线程有影响。
  • 写入访问模式:根据方法不同,该模式允许你按照不同的内存排序规则写入变量的值。可以使用set()setVolatile()setRelease()setOpaque()方法。它们与前面读取访问模式中的方法相对应,只不过是针对写入访问的。
  • 原子更新访问模式:这种模式获得与原子变量类似的功能和操作,例如比较变量的值。你可以使用下述方法。
    • compareAndSet():如果作为参数传递的预期值和变量的当前值相等,那么改变变量的值,就像变量是被声明为易失性变量一样。
    • weakCompareAndSet()weakCompareAndSetPlain():如果作为参数传递的预期值与变量的当前值相等,那么自动将变量的当前值替换为新值。第一种法将变量视为一个易失性变量,而第二种法将变量视为一个非易失性变量。
  • 数值型原子更新访问模式:这种模式以原子方式修改数值。你可以使用下面的方法。
    • getAndAdd():增加变量的值并且返回之前的值,因为该变量被原子自动声明为一个易失性变量。
  • 位原子更新访问模式:这种模式以原子方式按位修改值。你可以使用getAndBitwiseOr()或者getAndBitwiseAnd()方法。

例如,可用一个名为VarHandleData的类,它有名为safeValueunsafeValue的两个属性。

  1. public class VarHandleData {
  2. public double safeValue;
  3. public double unsafeValue;
  4. }

下面实现一个含有10个线程的例子,并发更新这两个属性的值。我们将使用VarHandle直接更新safeValue属性和unsafeValue属性的值。

创建一个对象某个域的VarHandle对象的最简单方式是使用MethodHandles类中的静态方法lookup()。该方法会返回一个MethodHandles.Lookup工厂对象,它用于创建MethodHandles。然后,使用in()方法获得一个面向当前类(这里是VarHandleData)的MethodHandles。最后,使用findVarHandle()方法获取对象VarHandle,以访问对象的域。

例如,如果想要使用VarHandle访问VarHandleData对象的safeValue属性,可以采用下述指令。

  1. handler = MethodHandles.lookup().in(VarHandleData.class)
  2. .findVarHandle(VarHandleData.class,
  3. "safeValue", double.class);

因此,我们实现一个名为VarHandleTask的类,该类实现了Runnable接口,它可以增加和减少VarHandleData对象的两个属性的值。如前所述,我们使用VarHandle对象访问safeValue属性(通过getAndAdd()方法),并且直接修改unsafeValue属性。

  1. public class VarHandleTask implements Runnable {
  2. private VarHandleData data;
  3. public VarHandleTask(VarHandleData data) {
  4. this.data = data;
  5. }
  6. @Override
  7. public void run() {
  8. VarHandle handler;
  9. try {
  10. handler = MethodHandles.lookup().in(VarHandleData.class)
  11. .findVarHandle(VarHandleData.class,
  12. "safeValue", double.class);
  13. for (int i = 0; i < 10000; i++) {
  14. handler.getAndAdd(data, +100);
  15. data.unsafeValue += 100;
  16. handler.getAndAdd(data, -100);
  17. data.unsafeValue -= 100;
  18. }
  19. } catch (NoSuchFieldException | IllegalAccessException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }

最后,实现VarHandleMain类,该类创建一个VarHandleData对象和10个并发更新同一对象的VarHandleTasks

  1. public class VarHandleMain {
  2. public static void main(String[] args) {
  3. VarHandleData data = new VarHandleData();
  4. for (int i=0; i<10; i++) {
  5. VarHandleTask task=new VarHandleTask(data);
  6. ForkJoinPool.commonPool().execute(task);
  7. }
  8. ForkJoinPool.commonPool().shutdown();
  9. try {
  10. ForkJoinPool.commonPool().awaitTermination(1, TimeUnit.DAYS);
  11. } catch (InterruptedException e) {
  12. // 自动生成的catch代码块
  13. e.printStackTrace();
  14. }
  15. System.out.println("Safe Value: "+data.safeValue);
  16. System.out.println("Unsafe Value: "+data.unsafeValue);
  17. }
  18. }

执行本例时,将看到如何使safeValue属性的值总如预期一样为0,但是unsafeValue属性的值每次执行时都不同,因为会遇到数据竞争条件。