6.2 第一个例子:关键字抽取算法
在本节,你将使用分段器实现关键字抽取算法。这类算法的主要用途是从文本文档或者文档集合(内部对每个文档做了更好的定义)中抽取单词,这些术语可用于文档综述,文档的聚类分析,或者信息检索过程的提升。
从文档集合的文档中抽取关键字最基本的算法是基于TF-IDF方法(而且该方法目前仍然被广泛使用),其中有如下两项。
- 术语频次(TF)是指一个单词在某个文档中出现的次数。
- 文档频次(DF)是含有某个单词的文档的数量。逆文档频次(IDF)用于度量单词所提供的使某个文档区别于其他文档的信息。如果一个单词很常用,那么它的IDF值会很低,但是如果该单词仅在少数几个文档中出现,那么它的IDF值会很高。
在文档d中单词t的TF-IDF值可以通过下述公式计算。

上述公式用到的属性其解释如下。
- Ft,d 是单词t在文档d中出现的次数。
- N 是集合中文档的数目。
- nt 是含有单词t的文档的数目。
为获取文档的关键字,可以选用具有较高TF-IDF值的单词。
要实现的算法将通过执行下述阶段计算文档集合中的最佳关键字。
- 阶段1:解析所有文档并且抽取所有单词的DF值。请注意,只有解析了所有文档才可以获得准确值。
- 阶段2:计算所有文档中单词的TF-IDF值。为每个文档选择10个关键字(TF-IDF值评价最高的10个单词)。
- 阶段3:获得一个最佳关键字列表。这个列表中的单词应该能够代表大多数文档的关键字。
为了测试算法,将使用有关电影信息的维基百科页面作为文档集合。该集合与第5章中用过的集合相同,由100 673个文档组成。我们将每个维基百科页面转换成一个文本文件,可以随本书配套的资源下载该文档集合。
你将实现本算法的两个版本:基础串行版本和使用Phaser类的并发版本。在此之后,我们将比较两个版本的执行时间,以验证并发处理能够带来更好的性能。
6.2.1 公共类
该算法的两个版本具有一些通用功能,用于解析文档以及存储有关文档、关键字和单词的信息。这样的公共类有如下几项。
Document类:用于存放含有文档以及构成文档的单词的文件名。Word类:用于存放单词字符串和度量该单词的指标(TF、DF和TF-IDF)。Keyword类:用于存放单词字符串以及将该单词作为关键字的文档数量。DocumentParser类:用于抽取某个文档的单词。
下面详细介绍一下这些类。
Word类
Word类存放了有关某个单词的信息。这些信息包括整个单词以及影响它的措施,也就是它在某个文档中的TF值,全局DF值,以及其最终的TF-IDF值。
该类实现了Comparable接口,因为要对单词数组进行排序,以获得具有较高TF-IDF值的单词。相关代码如下。
public class Word implements Comparable<Word> {
然后,声明该类的属性并且实现获取和设置这些属性的方法(在此未给出)。
private String word;private int tf;private int df;private double tfIdf;
我们还实现了其他一些有用的方法,如下所示。
- 该类的构造函数,对word(接收作为参数的单词)和
df属性(取值为1)进行了初始化。 addTf()方法,用于增加tf属性的值。merge()方法,接收一个Word对象作为参数,对来自两个不同文档的同一单词进行合并。将两个Word对象的tf属性值和df属性值相加。
然后,实现了setDf()方法的一个特殊版本。该方法接收df属性值作为参数,接收集合中文档的总数,然后计算得出tfIdf属性的值。
public void setDf(int df, int N) {this.df = df;tfIdf = tf * Math.log(Double.valueOf(N) / df);}
最后,实现了compareTo()方法,并希望按照tfIdf属性值从高到底的顺序对单词进行排序。
@Overridepublic int compareTo(Word o) {return Double.compare(o.getTfIdf(), this.getTfIdf());}}
Keyword类
Keyword类存放了关于关键字的信息。该信息包括完整的单词以及将该单词作为关键字的文档数。
与Word类一样,之所以该类也实现了Comparable接口,是因为将对一个关键字数组进行排序以获取最佳关键字。
public class Keyword implements Comparable<Keyword> {
然后,声明该类的属性并且实现相应的方法设定和返回属性值(这些在此未给出)。
private String word;private int df;
最后,实现compareTo()方法,希望能够按照文件数量由多到小的顺序排列关键字。
@Overridepublic int compareTo(Keyword o) {return Integer.compare(o.getDf(), this.getDf());}}
Document类
Document类存放文档集合(请记住集合中有100 673个文档)中某个文档的相关信息,其中包括文件名和构成该文档的单词集合。该单词集合通常也被称作该文档的词汇表,采用HashMap实现,它将整个单词视为一个字符串并作为键,将一个Word对象作为值。
public class Document {private String fileName;private HashMap <String, Word> voc;
我们实现了一个构造函数创建该HashMap,实现了用于获取和设置文件名的方法,以及返回文档词汇表的方法(这些方法在此未给出)。我们还实现了一个向词汇表添加单词的方法。如果单词不在词汇表中,则将其加入词汇表。
如果单词在词汇表中,则增加该单词的tf属性值。我们使用了voc对象的computeIfAbsent()方法。如果单词不在词汇表中,则该方法会将其插入到HashMap当中,然后用addTf()方法来增加tf值。
public void addWord(String string) {voc.computeIfAbsent(string, k -> new Word(k)).addTf();}}
HashMap类并不是同步的,但是仍然可以在并发应用程序中使用,因为不同任务并不会共享该类。一个Document对象只能由一个任务生成,因此使用HashMap类时并不会导致并发版应用程序中的竞争条件。
DocumentParser类
DocumentParser类读取一个文本文件的内容并且将其转换成一个Document对象。该类将文本分割成若干单词并且将它们存放在Document对象中,进而生成词汇表。该类有两个静态方法:第一个是parse()方法,接收文件路径字符串,返回一个Document对象。该方法打开文件并且逐行读取,使用parseLine()方法将每行转换成一个单词序列,并且将它们存放在Document类。
public class DocumentParser {public static Document parse(String path) {Document ret = new Document();Path file = Paths.get(path);ret.setFileName(file.toString());try (BufferedReader reader =Files.newBufferedReader(file)) {for(String line : Files.readAllLines(file)) {parseLine(line, ret);}} catch (IOException x) {x.printStackTrace();}return ret;}
parseLine()方法接收待解析的行和用于存放单词的Document对象作为参数。
首先,该方法使用Normalizer类删除每一行的重音符号,并将其转换成小写形式。
private static void parseLine(String line, Document ret) {line = Normalizer.normalize(line, Normalizer.Form.NFKD);line = line.replaceAll("[^\\p{ASCII}]", "");line = line.toLowerCase();
然后,使用StringTokenizer类将该行分割成多个单词,并且将这些单词添加到Document对象。
private static void parseLine(String line, Document ret) {// 清理字符串line = Normalizer.normalize(line, Normalizer.Form.NFKD);line = line.replaceAll("[^\\p{ASCII}]", "");line = line.toLowerCase();// 分词程序for(String w: line.split("\\W+")) {ret.addWord(w);}}}
6.2.2 串行版本
我们已在SerialKeywordExtraction类中实现了关键字算法的串行版本。该类定义了执行测试该算法的main()方法。
第一步是声明以下这些执行算法必需的内部变量。
- 两个用于度量执行时间的
Date对象。 - 一个存放含有文档集合的目录名称的字符串。
- 一个用于存放有关文档集合文件的
File对象数组。 - 一个用于存放文档集合全局词汇表的
HashMap。 - 一个用于存放关键字的
HashMap。 - 两个用于度量有关执行情况统计数据的
int值。
下面给出了这些变量的声明。
public class SerialKeywordExtraction {public static void main(String[] args) {Date start, end;File source = new File("data");File[] files = source.listFiles();HashMap<String, Word> globalVoc = new HashMap<>();HashMap<String, Integer> globalKeywords = new HashMap<>();int totalCalls = 0;int numDocuments = 0;start = new Date();
然后,介绍该算法的第一阶段。我们使用DocumentParser类的parse()方法解析所有文档。该方法返回一个含有文档词汇表的Document对象。我们使用HashMap类的merge()方法将文档词汇表添加到全局词汇表。如果单词不存在,则将它插入HashMap。如果该单词已存在,则将两个单词对象合并到一起,并且对Tf属性和Df属性求和。
if(files == null) {System.err.println("Unable to read the 'data' folder");return;}for (File file : files) {if (file.getName().endsWith(".txt")) {Document doc = DocumentParser.parse (file.getAbsolutePath());for (Word word : doc.getVoc().values()) {globalVoc.merge(word.getWord(), word, Word::merge);}numDocuments++;}}System.out.println("Corpus: " + numDocuments + " documents.");
在此阶段之后,globalVocHashMap类包含了文档集合的所有单词,以及单词的全局TF(单词在文档集合中出现的总次数)和DF值。
然后,引入该算法的第二阶段。如前所述,我们将使用TF-IDF指标计算每个文档的关键字。必须再次解析每个文档以生成其词汇表,因为内存不能存放构成文档集合的100 673份文档的词汇表。如果处理的文档集合规模较小,可以尝试只解析这些文档一次,并且将全部文档的词汇表存放在内存中。不过在我们的例子中,这是不可能的。因此,再次解析全部文档,并且使用globalVoc中存放的值更新每个单词的df属性。我们还构造了一个含有文档中所有单词的数组。
for (File file : files) {if (file.getName().endsWith(".txt")) {Document doc = DocumentParser.parse(file.getAbsolutePath());List<Word> keywords = new ArrayList<>( doc.getVoc().values());int index = 0;for (Word word : keywords) {Word globalWord = globalVoc.get(word.getWord());word.setDf(globalWord.getDf(), numDocuments);}
现在,有关键字列表,其中含有文档中所有单词以及计算得出的TF-IDF值。使用Collections类的sort()方法对该列表排序,具有较高TF-IDF值的单词排在前面。然后,我们获取该列表中的前10个单词,并且使用addKeyword()方法将其存放在globalKeywordsHashMap中。
选择排名前10的单词并没有特殊原因。其他供选方案也可以尝试,例如某一比例的一组单词或者TF-IDF指标最小值等,看看它们的表现情况。
Collections.sort(keywords);int counter = 0;for (Word word : keywords) {addKeyword(globalKeywords, word.getWord());totalCalls++;}}}
最后,引入该算法的第三阶段。将globalKeywordsHashMap转换成一个Keyword对象列表,使用Collections类的sort()方法对该数组进行排序。将DF值较高的关键字排在列表的前面,并且在控制台输出前100个单词。
相关代码如下所示:
List<Keyword> orderedGlobalKeywords = new ArrayList<>();for (Entry<String, Integer> entry : globalKeywords.entrySet()) {Keyword keyword = new Keyword();keyword.setWord(entry.getKey());keyword.setDf(entry.getValue());orderedGlobalKeywords.add(keyword);}Collections.sort(orderedGlobalKeywords);if (orderedGlobalKeywords.size() > 100) {orderedGlobalKeywords = orderedGlobalKeywords.subList(0, 100);}for (Keyword keyword : orderedGlobalKeywords) {System.out.println(keyword.getWord() + ": " + keyword.getDf());}
与第二阶段相同,选择前100个单词没有特殊的理由。也可以尝试其他供选方案。
为了结束main()方法,在控制台输出执行时间和其他统计数据。
end = new Date();System.out.println("Execution Time: " + (end.getTime() -start.getTime()));System.out.println("Vocabulary Size: " + globalVoc.size());System.out.println("Keyword Size: " + globalKeywords.size());System.out.println("Number of Documents: " + numDocuments);System.out.println("Total calls: " + totalCalls);}
SerialKeywordExtraction类还包括addKeyword()方法,它用于更新globalKeywordsHashMap类中某个关键字的信息。如果该单词存在,则该类更新其DF值;如果不存在,则将其插入。相关代码如下:
private static void addKeyword(Map<String, Integer>globalKeywords, String word) {globalKeywords.merge(word, 1, Integer::sum);}}
6.2.3 并发版本
为了实现本例的并发版本,我们用到了如下两个不同的类。
KeywordExtractionTasks类:该类以并发方式实现准备计算关键字的任务。这些任务将作为Thread对象执行,因此该类实现了Runnable接口。ConcurrentKeywordExtraction类:该类提供main()方法执行算法,创建、启动任务,并且等待任务完成。
下面仔细看看这些类。
KeywordExtractionTask类
如前所述,该类实现了计算最终单词列表的任务。它实现了Runnable接口,因此可以将其作为一个Thread线程执行,而且其内部用到了一些属性,大多数属性所有任务共享。
- 用于存放全局词汇表和全局关键字的两个
ConcurrentHashMap对象:之所以使用ConcurrentHashMap是因为这些对象将被所有任务更新,这样就必须采用并发数据结构避免竞争条件。 - 用于存放文档集合文件列表的两个文件对象
ConcurrentLinkedDeque:之所以使用ConcurrentLinkedDeque类是因为所有任务都将同时抽取(获取或删除)该列表的元素,因此必须使用并发数据结构以避免竞争条件。如果使用常规List,那么同一File对象会被不同的任务解析两次。之所以采用两个ConcurrentLinkedDeque是因为必须要对整个文档集合解析两次。如前所述,通过从数据结构中抽取File对象解析文档集合。因此,解析该集合时,该数据结构将为空。 - 用于控制任务执行的
Phaser对象:如前所述,关键字抽取算法按照三个阶段执行。在所有任务都完成上一阶段之前,任何任务都不能进入下一阶段。使用Phaser类对此加以控制。否则,将会得到不一致的结果。 - 最后阶段必须由唯一的线程执行:将使用布尔值区分主任务与其他任务。这些主任务将执行最后阶段。
- 集合中的文档总数:需要该值计算TF-IDF指标。
我们引入了一个构造函数以初始化所有属性。
public class KeywordExtractionTask implements Runnable {private ConcurrentHashMap<String, Word> globalVoc;private ConcurrentHashMap<String, Integer> globalKeywords;private ConcurrentLinkedDeque<File> concurrentFileListPhase1;private ConcurrentLinkedDeque<File> concurrentFileListPhase2;private Phaser phaser;private String name;private boolean main;private int parsedDocuments;private int numDocuments;public KeywordExtractionTask(ConcurrentLinkedDeque<File> concurrentFileListPhase1,ConcurrentLinkedDeque<File> concurrentFileListPhase2,Phaser phaser, ConcurrentHashMap<String, Word>globalVoc,ConcurrentHashMap<String, Integer> globalKeywords,int numDocuments, String name, boolean main) {this.concurrentFileListPhase1 = concurrentFileListPhase1;this.concurrentFileListPhase2 = concurrentFileListPhase2;this.globalVoc = globalVoc;this.globalKeywords = globalKeywords;this.phaser = phaser;this.main = main;this.name = name;this.numDocuments = numDocuments;}
使用run()方法实现该算法分为三个阶段。首先,调用分段器的arriveAndAwaitAdvance()方法等待其他任务的创建。所有任务都会同时开始执行。然后,正如在该算法的串行版本中提到的,解析所有文档并且构建globalVocConcurrentHashMap类,其中含有所有单词及其全局TF值和DF值。为了完成第一阶段,再次调用arriveAndAwaitAdvance()方法,在第二阶段开始之前等待其他任务结束。
@Overridepublic void run() {File file;// 第一阶段phaser.arriveAndAwaitAdvance();System.out.println(name + ": Phase 1");while ((file = concurrentFileListPhase1.poll()) != null) {Document doc = DocumentParser.parse(file.getAbsolutePath());for (Word word : doc.getVoc().values()) {globalVoc.merge(word.getWord(), word, Word::merge);}parsedDocuments++;}System.out.println(name + ": " + parsedDocuments +" parsed.");phaser.arriveAndAwaitAdvance();
正如你看到的,为获取待处理的File对象,使用了ConcurrentLinkedDeque类的poll()方法。该方法检索并且删除Deque的第一个元素,这样下一个任务将获取不同的文件进行解析,并且没有文件会被解析两次。
正如在该算法的串行版中提到的,第二阶段计算了globalKeywords结构。首先,计算每个文档最优的10个关键字,然后将其插入ConcurrentHashMap类。该代码和串行版中的相同,只是将串行数据结构替换为并发数据结构。
// 第二阶段System.out.println(name + ": Phase 2");while ((file = concurrentFileListPhase2.poll()) != null) {Document doc = DocumentParser.parse(file.getAbsolutePath());List<Word> keywords = new ArrayList<>(doc.getVoc().values());for (Word word : keywords) {Word globalWord = globalVoc.get(word.getWord());word.setDf(globalWord.getDf(), numDocuments);}Collections.sort(keywords);if(keywords.size() > 10) keywords = keywords.subList(0, 10);for (Word word : keywords) {addKeyword(globalKeywords, word.getWord());}}System.out.println(name + ": " + parsedDocuments +" parsed.");
对于主任务和其他任务而言最后阶段将有所不同。在将整个文档集合中的100个最佳关键字输出到控制台之前,主任务使用Phaser类的arriveAndAwaitAdvance()方法等待所有任务的第二阶段结束。最后,使用arriveAndDeregister()方法从分段器中注销。
剩下的任务使用arriveAndDeregister()方法标记第二阶段的结束、从分段器注销以及完成其执行。
当所有的任务完成工作后,都将从分段器中注销。最后分段器将有0个参与方,并且将进入终止状态。
if (main) {phaser.arriveAndAwaitAdvance();Iterator<Entry<String, Integer>> iterator =globalKeywords.entrySet().iterator(); KeywordorderedGlobalKeywords[] = newKeyword[globalKeywords.size()];int index = 0;while (iterator.hasNext()) {Entry<String, AtomicInteger> entry = iterator.next();Keyword keyword = new Keyword();keyword.setWord(entry.getKey());keyword.setDf(entry.getValue().get());orderedGlobalKeywords[index] = keyword;index++;}System.out.println("Keyword Size: " +orderedGlobalKeywords.length);Arrays.parallelSort(orderedGlobalKeywords);int counter = 0;for (int i = 0; i < orderedGlobalKeywords.length; i++){Keyword keyword = orderedGlobalKeywords[i];System.out.println(keyword.getWord() + ": " +keyword.getDf());counter++;if (counter == 100) {break;}}}phaser.arriveAndDeregister();System.out.println("Thread " + name + " has finished.");}
ConcurrentKeywordExtraction类
ConcurrentKeywordExtraction类初始化共享对象、创建任务、执行任务并且等待任务结束。它实现的main()方法可以接收可选参数。默认情况下,执行的任务数由Runtime类的availableProcessors()方法确定,该方法返回可供Java虚拟机(Java virtual machine,JVM)使用的硬件线程数。如果接收到一个参数,那么就将其转换成一个整型值,并且将其用作可用处理器数量的乘数,以确定将创建的任务数。
首先,初始化所有必要的数据结构和参数。为了填充这两个ConcurrentLinkedDeque结构,我们使用File类的listFiles()方法获取一个File对象数组,其中含有txt后缀的文件。
还可以使用不带参数的构造函数创建Phaser对象,这样所有的任务必须在分段器中进行显式注册。相关代码如下:
public class ConcurrentKeywordExtraction {public static void main(String[] args) {Date start, end;ConcurrentHashMap<String, Word> globalVoc = newConcurrentHashMap<>();ConcurrentHashMap<String, Integer> globalKeywords = newConcurrentHashMap<>();start = new Date();File source = new File("data");File[] files = source.listFiles(f ->f.getName().endsWith(".txt"));if (files == null) {System.err.println("The 'data' folder not found!");return;}ConcurrentLinkedDeque<File> concurrentFileListPhase1 = newConcurrentLinkedDeque<>(Arrays.asList(files));ConcurrentLinkedDeque<File> concurrentFileListPhase2 = newConcurrentLinkedDeque<>(Arrays.asList(files));int numDocuments = files.length();int factor = 1;if (args.length > 0) {factor = Integer.valueOf(args[0]);}int numTasks = factor *Runtime.getRuntime().availableProcessors();Phaser phaser = new Phaser();Thread[] threads = new Thread[numTasks];KeywordExtractionTask[] tasks = newKeywordExtractionTask[numTasks];
然后,将创建的第一个任务其主参数置为true,其他任务的主参数置为false。每个任务创建完毕后,我们使用Phaser类的register()方法在分段器中注册一个新的参与方,如下所示:
for (int i = 0; i < numTasks; i++) {tasks[i] = new KeywordExtractionTask(concurrentFileListPhase1,concurrentFileListPhase2, phaser, globalVoc,globalKeywords, concurrentFileListPhase1.size(),"Task" + i, i==0);phaser.register();System.out.println(phaser.getRegisteredParties() + "tasks arrived to the Phaser.");}
然后,创建并启动运行该任务的线程对象,并且等待其结束。
for (int i = 0; i < numTasks; i++) {threads[i] = new Thread(tasks[i]);threads[i].start();}for (int i = 0; i < numTasks; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}
最后,在控制台输出有关执行情况的统计结果,包括执行时间。
System.out.println("Is Terminated: " + phaser.isTerminated());end = new Date();System.out.println("Execution Time: " + (end.getTime() -start.getTime()));System.out.println("Vocabulary Size: " + globalVoc.size());System.out.println("Number of Documents: " + numDocuments);}}
6.2.4 对比两种解决方案
比较一下关键字抽取算法的串行版和并发版。为测试该算法,采用了含有100 673份文档的文档集合。
我们使用JMH框架执行算例,它允许在Java中实现微型基准测试。使用面向基准测试的框架是很好的解决方案,因为可以直接使用currentTimeMillis()或nanoTime()这样的方法度量时间。在两种不同的架构上分别执行这些算例10次。
- 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核且每个核可以执行两个线程,这样就有四个并行线程。
- 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。
| 算法 | Intel | AMD | ||
|---|---|---|---|---|
| 因子 | 执行时间(秒) | 因子 | 执行时间(秒) | |
| 串行版 | N/A | 76.252 | N/A | 168.816 |
| 并发版 | 1 | 35.092 | 1 | 60.740 |
| 2 | 34.495 | 2 | 60.806 | |
| 3 | 34.518 | 3 | 58.752 |
可以得出如下结论。
- 在两种架构中,相对于串行版而言,并发版算法的性能有所提升。
- 如果使用的任务数多于可用硬件线程数,并不会得到更好的结果。存在些许差别,但是并不明显。
通过计算加速比对比该算法的并发版本和串行版本,如下所示:

