9.4 第三个例子:社交网络中的共同联系人

社交网络正在改变着社会,也改变着人们相互之间的联系方式。Facebook、Linkedin、Twitter以及Instagram都拥有数百万用户,他们使用这些网络与朋友分享生活中的每个瞬间,建立新的职业联系,提升专业品牌,与新人会见,或者只是了解一下世界的最新发展趋势。

可以将社交网络视为一个图,其中用户是节点,而用户之间的关系是边。和图一样,在像Facebook这样的社交网络中,用户之间的关系既可以是无向的,也可以是双向的。如果用户A与用户B相关联,那么用户B也就与用户A相关联。与之相反,在像Twitter这样的社交网络中,用户之间的关系是有向的。在这种情况下,我们称用户A关注用户B,但是反过来就不一定为真了。

本节将实现一个算法来计算社交网络中每一对用户之间的共同联系人,且该社交网络中用户之间为双向关系。我们将实现Steve Krenzel在“MapReduce: Finding Friends”中讲述的算法。该算法的主要步骤如下。

  • 数据源是一个存放有每个用户及其联系人的文件。
  1. A-B,C,D,
  2. B-A,C,D,E,
  3. C-A,B,D,E,
  4. D-A,B,C,E,
  5. E-B,C,D,
  • 这就意味着用户A的联系人是用户B、C和D。考虑到他们之间的关系是双向的,因此如果B是A的联系人,那么A也是B的联系人,而且在文件中这两个关系都要描述。这样,我们的元素就有下述两个部分。

    • 一个用户标识符。
    • 该用户的联系人列表。
  • 下一步,生成一个元素集合,其中每个元素都有三个部分。这三个部分如下所示。
    • 一个用户标识符。
    • 一个朋友的用户标识。
    • 该用户的联系人列表。
  • 因此,对于用户A,将生成下述元素。
  1. A-B-B,C,D
  2. A-C-B,C,D
  3. A-D,B,C,D
  • 对所有元素都执行相同的处理过程。我们将存储两个用户标识符并按照字母表顺序排序。这样,对用户B,就可以生成下述元素。
  1. A-B-A,C,D,E
  2. B-C-A,C,D,E
  3. B-D-A,C,D,E
  4. B-E-A,C,D,E
  • 一旦生成所有的新元素后,就按照两个用户标识符对它们进行分组。例如,对于元组A-B,将生成下面的分组。
  1. A-B-(B,C,D),(A,C,D,E)
  • 最后,计算两个列表的交集。得到的结果列表就是两个用户之间的共同联系人。例如,用户A和B的共同联系人是C和D。

为了测试该算法,使用了两个数据集。

  • 前面给出的测试样例。
  • 社交圈:可通过网址https://snap.stanford.edu/data/egonets-Facebook.html下载Facebook数据集,其中含有4039个Facebook用户的联系人信息。我们已经将原始数据转换成为本例中要用到的数据格式。

9.4.1 基本类

与本书中的其他例子一样,我们也实现了本例的串行版本和并发版本,以此来验证并发流对应用程序性能的改进情况。这两个版本的程序有一些共同的类。

  • Person

Person类存储了关于社交网络中每个人的信息,它包括如下要素。

  • 它的用户ID,存放在ID属性中。
  • 该用户的联系人列表,以一个String对象列表的形式存放在联系人属性中。
    该类声明了上述两个属性,以及与之对应的getXXX()方法和setXXX()方法。此外,还需要一个构造函数以创建该联系人列表,还有一个名为addContact()的方法,该方法用于将单个联系人添加到联系人列表。该类的源码非常简单,在此不再给出。
  • PersonPair

PersonPair类扩展了Person类,增加了存放第二个用户标识符的属性。将该属性称作otherId。该类声明了该属性以及相应的getXXX()方法和setXXX()方法。还需要一个名为getFullId()的方法,该方法返回一个含有两个用户标识符的字符串,它们之间采用字符,分隔。该类的源代码非常简单,因此这里不再给出。

  • DataLoader

DataLoader类加载带有用户信息及其联系人的文件,并且将其转换成一个Person对象列表。该类仅实现了一个名为load()的静态方法,该方法接收以String对象出现的文件路径作为参数,并且返回Person对象列表。

如前所示,该文件具有如下格式。

  1. User-C1,C2,C3...CN

其中,User是用户的标识符,而C1C2C3…CN都是该用户联系人的标识符。

该类的源代码非常简单,在此不再给出。

9.4.2 并发版本

首先,分析一下该算法的并发版本。

  • CommonPersonMapper

CommonPersonMapper类是稍后将要用到的一个辅助类。它将生成所有的PersonPair对象,这些对象可以从Person对象生成。该类实现了Function接口,而该接口采用Person类和List类参数化。

该类实现了Fuction接口中定义的apply()方法。首先,初始化将要返回的List对象,并且对联系人列表进行排序。

  1. public class CommonPersonMapper implements Function<Person,
  2. List<PersonPair>> {
  3. @Override
  4. public List<PersonPair> apply(Person person) {
  5. List<PersonPair> ret=new ArrayList<>();
  6. List<String> contacts=person.getContacts();
  7. Collections.sort(contacts);

然后,处理整个联系人列表,为每个联系人创建PersonPair对象。如前所述,按照字母表顺序存放两个联系人。按字母表排序靠前的存放在ID字段中,而另一个则存放在otherId字段中。

  1. for (String contact : contacts) {
  2. PersonPair personExt=new PersonPair();
  3. if (person.getId().compareTo(contact) < 0) {
  4. personExt.setId(person.getId());
  5. personExt.setOtherId(contact);
  6. } else {
  7. personExt.setId(contact);
  8. personExt.setOtherId(person.getId());
  9. }

最后,将联系人列表添加到新对象,并且将该对象添加到结果列表。处理完所有的联系人后,返回结果列表。

  1. personExt.setContacts(contacts);
  2. ret.add(personExt);
  3. }
  4. return ret;
  5. }
  6. }
  • ConcurrentSocialNetwork

ConcurrentSocialNetwork类是本例的主类。它仅仅实现了一个名为bidirectionalCommonContacts()的静态方法。该方法接收社交网络上的人员列表(含有联系人),并且返回一个PersonPair对象列表,这些PersonPair对象中含有每一对互为联系人的用户之间的共同联系人。

从内部来看,我们使用不同的流来实现自己的算法。我们使用第一个流将Person对象的输入列表转换成一个Map。该Map的键为每一对用户的两个标识符,而其值为一个含有两个用户联系人的PersonPair对象列表。这样,这些列表总是有两个元素。代码如下:

  1. public class ConcurrentSocialNetwork {
  2. public static List<PersonPair> bidirectionalCommonContacts
  3. (List<Person> people) { Map<String,
  4. List<PersonPair>> group = people.parallelStream()
  5. .map(new CommonPersonMapper())
  6. .flatMap(Collection::stream)
  7. .collect(Collectors.groupingByConcurrent
  8. (PersonPair::getFullId));

该流有如下组件。

(1) 使用输入列表的parallelStream()方法创建流。

(2) 然后,使用map()方法和前面提到的CommonPersonMapper类将每个Person对象都转换到一个PersonPair对象列表中,这其中考虑到了该对象的所有可能结果。

(3) 此时,有了一个List对象流。我们使用flatMap()方法将该流转换成一个PersonPair对象流。

(4) 最后,使用collect()方法生成该Map,这要用到groupingByConcurrent()方法返回的收集器,而采用getFullId()方法返回的值作为该Map的键。

然后,使用Collectors类的of()方法创建一个新的收集器。该收集器将接收一个字符串Collection作为输入,使用AtomicReference>接口作为中间数据结构,并且返回一个字符串Collection作为返回类型。

  1. Collector<Collection<String>, AtomicReference<Collection<String>>,
  2. Collection<String>> intersecting = Collector.of(() ->
  3. new AtomicReference<>(null), (acc, list) -> {
  4. (acc, list) -> {
  5. if (acc.get() == null) {
  6. acc.updateAndGet(value -> new ConcurrentLinkedQueue<>(list));
  7. } else {
  8. acc.get().retainAll(list);
  9. }
  10. }, (acc1, acc2) -> {
  11. if (acc1.get() == null) return acc2;
  12. if (acc2.get() == null)
  13. return acc1;
  14. acc1.get().retainAll(acc2.get());
  15. return acc1;
  16. }, (acc) -> acc.get() == null ? Collections.emptySet() :
  17. acc.get(), Collector.Characteristics.CONCURRENT,
  18. Collector.Characteristics.UNORDERED);

of()方法的第一个参数是Supplier函数。需要创建一个中间数据结构时,总是要调用该Supplier。在串行流中,该方法仅被调用一次,但是在并发流中,每个线程都会调用该方法。

  1. () -> new AtomicReference<>(null),

在我们的例子中,会直接创建一个新的AtomicReference来存放Collection对象。

of()方法的第二个参数是Accumulator函数。该函数接收中间数据结构和一个输入值作为参数。

  1. (acc, list) -> {
  2. if (acc.get() == null) {
  3. acc.updateAndGet(value -> new ConcurrentLinkedQueue<>(list));
  4. } else {
  5. acc.get().retainAll(list);
  6. }
  7. }

在我们的例子中,acc参数是AtomicReference,而list参数是ConcurrentLinkedDeque。如果acc参数存储的是空值,那么使用AtomicReferenceupdateAndGet()方法。该方法更新当前值并且返回新值。如果AtomicReferencenull,本例创建一个含有该列表元素的新ConcurrentLinkedDeque。如果AtomicReference不为空,那么使用retainAll()方法添加该列表的所有元素。

of()方法的第三个参数是Combiner函数。该函数只在并行流中调用,它接收两个中间数据结构作为参数,并且仅生成一个数据结构。

  1. (acc1, acc2) -> {
  2. if (acc1.get() == null)
  3. return acc2;
  4. if (acc2.get() == null)
  5. return acc1;
  6. acc1.get().retainAll(acc2.get());
  7. return acc1;
  8. },

在我们的例子中,如果其中一个参数为null,则返回另一个数据结构。否则,使用acc1参数的retainAll()方法并且返回结果。

of()方法的第四个参数是Finisher函数。该函数将最后的中间数据结构转换成我们希望返回的数据结构。在我们的例子中,中间数据结构和最终数据结构相同,因此不需要转换。

  1. (acc) -> acc.get() == null ? Collections.emptySet() : acc.get(),

最后,使用最后一个参数指明该收集器是并发的。这就意味着,同一个结果容器可以从多个不同线程并发调用该Accumulator函数;该收集器是无序的,这就意味着,该操作不会保留元素的原始顺序。

定义了收集器后,还要将第一个流生成的Map转换成一个PersonPair对象列表,其中含有每一对用户的共同联系人。我们采用下述代码:

  1. List<PersonPair> peopleCommonContacts = group
  2. .entrySet().parallelStream().map((entry) -> {
  3. Collection<String> commonContacts = entry
  4. .getValue().parallelStream().map(p -> p
  5. .getContacts()).collect(intersecting);
  6. PersonPair person = new PersonPair();
  7. person.setId(entry.getKey().split(",")[0]);
  8. person.setOtherId(entry.getKey().split (",")[1]);
  9. person.setContacts(new ArrayList<String> (commonContacts));
  10. return person;
  11. }).collect(Collectors.toList());
  12. return peopleCommonContacts;
  13. }
  14. }

使用entySet()方法处理该Map的所有元素。创建parallelStream()方法来处理所有的Entry对象,然后使用map()方法将每个PersonPair对象列表转换为一个含有共同联系人的唯一PersonPair对象。

对每条记录来说,其键是一对用户的标识符(以逗号作为分隔符),而其值是由两个PersonPair对象组成的列表。第一个PersonPair对象中含有一个用户的联系人,而另一个PersonPair对象中含有另一个用户的联系人。

我们为该列表创建一个流来生成两个用户的共同联系人,其中含有如下元素。

(1) 使用该列表的parallelStream()方法创建该流。

(2) 使用map()方法来将每个PersonPair()对象替换为存放在该对象中的联系人列表。

(3) 最后,使用收集器生成含有共同联系人的ConcurrentLinkedDeque

最后,创建一个新的PersonPair对象,其中含有两个用户的标识符及其共同联系人列表。将该对象添加到结果列表。该Map中的所有元素处理完毕后,可以返回该结果列表。

  • ConcurrentMain

ConcurrentMain类实现了main()方法,用于测试算法。如前所述,使用下面两个数据集测试该算法。

  • 一个非常简单的用于测试该算法正确性的数据集。
  • 基于Facebook真实数据的数据集。
    该类的源代码如下:
  1. public class ConcurrentMain {
  2. public static void main(String[] args) {
  3. Date start, end;
  4. System.out.println("Concurrent Main Bidirectional - Test");
  5. List<Person> people=DataLoader.load("data","test.txt");
  6. start=new Date();
  7. List<PersonPair> peopleCommonContacts= ConcurrentSocialNetwork
  8. .bidirectionalCommonContacts (people);
  9. end=new Date();
  10. peopleCommonContacts.forEach(p -> System.out.println
  11. (p.getFullId()+": "+getContacts(p.getContacts())));
  12. System.out.println("Execution Time: "+(end.getTime()-
  13. start.getTime()));
  14. System.out.println("Concurrent Main Bidirectional -
  15. Facebook");
  16. people=DataLoader.load("data","facebook_contacts.txt");
  17. start=new Date();
  18. peopleCommonContacts= ConcurrentSocialNetwork
  19. .bidirectionalCommonContacts (people);
  20. end=new Date();
  21. peopleCommonContacts.forEach(p -> System.out.println
  22. (p.getFullId()+": "+getContacts(p.getContacts())));
  23. System.out.println("Execution Time: "+(end.getTime()-
  24. start.getTime()));
  25. }
  26. private static String formatContacts(List<String> contacts) {
  27. StringBuffer buffer=new StringBuffer();
  28. for (String contact: contacts) {
  29. buffer.append(contact+",");
  30. }
  31. return buffer.toString();
  32. }
  33. }

9.4.3 串行版本

和本书的其他例子一样,我们也为本例实现了串行版。该版本相当于对并发版做如下更改。

  • stream()方法替换parallelStream()方法。
  • ArrayList数据结构替换ConcurrentLinkedDeque数据结构。
  • groupingBy()方法替换groupingByConcurrent()方法。
  • 不使用of()方法中最后的参数。

9.4.4 对比两个版本

我们采用JMH框架执行这些示例,该框架允许在Java中实现微型基准测试。使用面向基准测试的框架是比较好的解决方案,它直接用currentTimeMillis()方法或者nanoTime()方法度量时间。在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核,且每个核可以执行两个线程,这样就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

结果如下(单位:毫秒)。

示例数据集Facebook
Intel架构
串行版0.5623193.83
并发版2.0371778.239
AMD架构
串行版3.3258953.173
并发版2.9763447.576

可以得出如下结论。

  • 对于示例数据集,在Intel架构上串行版的执行时间结果更好,而在AMD架构上也有类似表现。原因在于示例数据集中的元素比较少。
  • 对于Facebook数据集,并发版在两种架构上的执行时间结果均更好。

针对Facebook数据集比较并发版和串行版,就会得到如下结果。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{8953.173}{3447.576}=2.60\\&S_{{\rm Intel}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{3193.83}{1778.239}=1.80\end{aligned}