场景为,给到一个硬盘上文件或文件夹,(当然文件夹时,多线程的优势才能越发体现出来),得到该文件或文件夹的大小和计算该结果所需要的时间。
首先是单线程下的例子,这个可难不倒大家,代码如下:
public class TotalFileSizeSequential { private long getTotalSizeOfFilesInDir(final File file) { if (file.isFile()) return file.length(); final File[] children = file.listFiles(); long total = 0; if (children != null) for(final File child : children) total += getTotalSizeOfFilesInDir(child); return total; } public static void main(final String[] args) { final long start = System.nanoTime(); final long total = new TotalFileSizeSequential() .getTotalSizeOfFilesInDir(new File("D:/idea_ws")); final long end = System.nanoTime(); System.out.println("Total Size: " + total); System.out.println("Time taken: " + (end - start)/1.0e9); }}
上述代码在我的机器上(win7,8g,i5)多次运行后的均值为:文件夹大小为276590351字节,即263M;耗时大概 0.25s
Total Size: 276589881Time taken: 0.258706999
至此,我们当然希望通过多线程扫描计算的方式,来更快的得到这个文件夹的大小,于是有个这个看似设计不错的,native版的代码:
public class NaivelyConcurrentTotalFileSize { private long getTotalSizeOfFilesInDir( final ExecutorService service, final File file) throws InterruptedException, ExecutionException, TimeoutException { if (file.isFile()) return file.length(); long total = 0; final File[] children = file.listFiles(); if (children != null) { final List> partialTotalFutures = new ArrayList >(); for(final File child : children) { partialTotalFutures.add(service.submit(new Callable () { public Long call() throws InterruptedException, ExecutionException, TimeoutException { return getTotalSizeOfFilesInDir(service, child); } })); } for(final Future partialTotalFuture : partialTotalFutures) total += partialTotalFuture.get(100, TimeUnit.SECONDS); } return total; } private long getTotalSizeOfFile(final String fileName) throws InterruptedException, ExecutionException, TimeoutException { final ExecutorService service = Executors.newFixedThreadPool(100); try { return getTotalSizeOfFilesInDir(service, new File(fileName)); } finally { service.shutdown(); } } public static void main(final String[] args) throws InterruptedException, ExecutionException, TimeoutException { final long start = System.nanoTime(); final long total = new NaivelyConcurrentTotalFileSize() .getTotalSizeOfFile("D:/idea_ws"); final long end = System.nanoTime(); System.out.println("Total Size: " + total); System.out.println("Time taken: " + (end - start)/1.0e9); }}
额,运行的结果出乎意料,多次运行该多线程代码,去扫描计算这个263M的文件夹时,仅有几次能运行成功,大部分时候,都是死锁,导致程序崩溃,根本无法计算文件夹大小。
这段程序在getTotalSizeOfFilesInDir(final ExecutorService service, final File file) 方法中,有阻塞线程池的操作。每当扫描到一个子目录的时候,它就将该任务调度给其他的线程。这个逻辑看上去没有错,一旦它调度完了所有任务,该函数就等待任何一个任务的响应。即是代码中的 total += partialTotalFuture.get(100, TimeUnit.SECONDS);这一行。
当然,当给到扫描的目录不是很深,文件不很多时,这样做确实能很快的得到结果,否则,就会在该行卡主,一直等待响应,若是没有我们设置的100的超时操作,这将演变成一种潜在的“线程诱发型死锁(Pool Include DeadLock)”
此时,我们就要考虑如何改善这段代码了,目标是:计算各子目录大小的任务分配给不同线程,当我们等待其他任务响应时,又不会占住当前的主调线程(避免死锁)。 下面,是第一种解决方案:每个任务都返回给定目录的直接子目录列表,而不是去返回计算的大小。这样的好处是,使线程被堵住的时间不会超过扫描给定目录的直接子目录的时间。在返回直接子目录的同时,计算出目录中的文件的大小一并返回。那么,第一个修复版本的代码如下:
public class ConcurrentTotalFileSize { class SubDirectoriesAndSize { final public long size; final public List以上代码多次运行后的结果为:subDirectories; public SubDirectoriesAndSize( final long totalSize, final List theSubDirs) { size = totalSize; subDirectories = Collections.unmodifiableList(theSubDirs); } } private SubDirectoriesAndSize getTotalAndSubDirs(final File file) { long total = 0; /* list的作用: 1.先存放入参file,根据file,得到它下面的SubDirectoriesAndSize对象,即总的文件大小和总的文件夹的ls。根据getTotalSubDirs(final File file)获取 2.清空上一步的入参file,用于存放SubDirectoriesAndSize.dirs */ final List subDirectories = new ArrayList (); if(file.isDirectory()) { final File[] children = file.listFiles(); if (children != null) for(final File child : children) { if (child.isFile()) total += child.length(); else subDirectories.add(child); } } return new SubDirectoriesAndSize(total, subDirectories); } private long getTotalSizeOfFilesInDir(final File file) throws InterruptedException, ExecutionException, TimeoutException { final ExecutorService service = Executors.newFixedThreadPool(100); try { long total = 0; final List directories = new ArrayList (); directories.add(file); while(!directories.isEmpty()) { final List > partialResults = new ArrayList >(); for(final File directory : directories) { partialResults.add( service.submit(new Callable () { public SubDirectoriesAndSize call() { return getTotalAndSubDirs(directory); } })); } directories.clear(); for(final Future partialResultFuture : partialResults) { final SubDirectoriesAndSize subDirectoriesAndSize = partialResultFuture.get(100, TimeUnit.SECONDS); directories.addAll(subDirectoriesAndSize.subDirectories); total += subDirectoriesAndSize.size; } } return total; } finally { service.shutdown(); } } public static void main(final String[] args) throws InterruptedException, ExecutionException, TimeoutException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSize() .getTotalSizeOfFilesInDir(new File("D:/idea_ws")); final long end = System.nanoTime(); System.out.println("Total Size: " + total); System.out.println("Time taken: " + (end - start)/1.0e9); }}
Total Size: 276592101Time taken: 0.124100452
比起单线程版本,速度基本提升了一倍的样子。到此,我们这个任务就完成了么?不,难道你不觉得上面这个设计,虽然说起来简单,但是实现起来却并不那么容易么?
我们不仅需要设计一个用于保存计算任务的返回结果的不变的子类,而且还要花心思去设计如何不断的分派任务以及协调处理任务的返回结果。所以,即使我们这个设计提高了性能,却引入了相当的复杂性。
所以,我们下面将会引入CountDownLatch辅助实现线程的设计和协作。上面的设计中,我们引入了Future,我们通过它获取任务的执行结果,同时,Future也隐含的帮我们完成了一些任务/线程之间的协调工作,我们因此不用考虑线程切换和并行中可能发现的一些问题。但是,大家知道,Future是需要返回值的,假使我们的任务没有返回值的话,Future就不太适合出现在我们的代码中,并且作为线程间的协作工具了。此时,我们会考虑到CountDownLatch作为替代。
下面,我们将使用到CountDownLatch去做协调,在扫描到一个文件时,线程不在能够通过Future去接受一个计算的返回结果,而是去更新一个AtomicLong类型的共享变量totalSize。AtomicLong提供了更新并取回一个简单long类型的变量的线程安全的方法。此外,我们还会有个AtomicLong类型的变量pendingFileVisits,用于保存当前等待访问的文件或子目录的数量。而当该值为0时,我们就通过countDown()来释放线程闩。
public class ConcurrentTotalFileSizeWLatch { private ExecutorService service; //用于计算待扫描的文件/子文件夹的个数 final private AtomicLong pendingFileVisits = new AtomicLong(); final private AtomicLong totalSize = new AtomicLong(); //定义一个锁存器,给定计数初始化的 CountDownLatch的个数是1个 final private CountDownLatch latch = new CountDownLatch(1); private void updateTotalSizeOfFilesInDir(final File file) { long fileSize = 0; if (file.isFile()) fileSize = file.length(); else { final File[] children = file.listFiles(); if (children != null) { for(final File child : children) { if (child.isFile()) fileSize += child.length(); else { pendingFileVisits.incrementAndGet(); service.execute(new Runnable() { public void run() { updateTotalSizeOfFilesInDir(child); } }); } } } } totalSize.addAndGet(fileSize); //递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 if(pendingFileVisits.decrementAndGet() == 0) latch.countDown(); } private long getTotalSizeOfFile(final String fileName) throws InterruptedException { service = Executors.newFixedThreadPool(100); pendingFileVisits.incrementAndGet(); try { updateTotalSizeOfFilesInDir(new File(fileName)); // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。 latch.await(100, TimeUnit.SECONDS); return totalSize.longValue(); } finally { service.shutdown(); } } public static void main(final String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSizeWLatch() .getTotalSizeOfFile("D:/idea_ws"); final long end = System.nanoTime(); System.out.println("Total Size: " + total); System.out.println("Time taken: " + (end - start)/1.0e9); }}
这个版本的代码比较上一个版本,简洁了很多,下面我们看看运行的结果:
Total Size: 276592105Time taken: 0.114999062
结果比较上一个版本,性能上差不多。然而,即使简洁,高性能,但是这个版本的代码存在着访问共享可变变量的风险。我们将在后面研究,如何二者兼得的方法。
上面2个版本的代码中,我们通过Future和AtomicLong来实现数据的交换功能。当任务完成时能获取返回值时,Future就能派上用场,而AtomicLong的线程安全的原子操作对处理单个共享数据的值来说,是非常有用的。
如果,只是想在两个线程间交换数据,我们可以用Exchanger类。它可以看做一个同步点,两个线程在改同步点上,可以线程安全的方式互换数据,如果两个线程的运行速度不一样,则运行较快的会被阻塞,直到慢的线程赶到同步点时,才能开始数据的互换。
所以,如果想在线程间互发多组数据,则BlockingQueue接口可以派上用场。队列想必大家都比较熟悉了,队列没用空间,插入时会被阻塞;队列里没有可用数据,删除时会被阻塞。若想要插入和删除操作一一对应,可以使用SynchronousQueue类。该类的作用是本线程的每一个插入操作与其他线程相应的删除操作相匹配,以完成类似的手递手形式的数据传输。如果希望数据根据某种优先级在队列中上下浮动,则可以使用PriorityBlockingQueue。另外,如果只是要一个简单的阻塞队列,我们可以用链表的实现LinkedBlockingQueue或者数组的实现ArrayBlockingQueue。
第3版的代码如下:
public class ConcurrentTotalFileSizeWQueue { private ExecutorService service; //定义队列,存放每个线程得到的fileSize final private BlockingQueuefileSizes = new ArrayBlockingQueue (500); //记录文件夹数 final AtomicLong pendingFileVisits = new AtomicLong(); private void startExploreDir(final File file) { pendingFileVisits.incrementAndGet(); service.execute(new Runnable() { public void run() { exploreDir(file); } }); } private void exploreDir(final File file) { long fileSize = 0; if (file.isFile()) fileSize = file.length(); else { final File[] children = file.listFiles(); if (children != null) for(final File child : children) { if (child.isFile()) fileSize += child.length(); else { startExploreDir(child); } } } try { fileSizes.put(fileSize); } catch(Exception ex) { throw new RuntimeException(ex); } pendingFileVisits.decrementAndGet(); } private long getTotalSizeOfFile(final String fileName) throws InterruptedException { service = Executors.newFixedThreadPool(100); try { startExploreDir(new File(fileName)); long totalSize = 0; while(pendingFileVisits.get() > 0 || fileSizes.size() > 0) { final Long size = fileSizes.poll(10, TimeUnit.SECONDS); totalSize += size; } return totalSize; } finally { service.shutdown(); } } public static void main(final String[] args) throws InterruptedException { final long start = System.nanoTime(); final long total = new ConcurrentTotalFileSizeWQueue() .getTotalSizeOfFile("D:/idea_ws"); final long end = System.nanoTime(); System.out.println("Total Size: " + total); System.out.println("Time taken: " + (end - start)/1.0e9); }}
这个版本的运行结果如下:
Total Size: 276591906Time taken: 0.124069344
这一版的性能与之前一版相仿,但是代码简化方面又有提升了,主要归功与阻塞队列帮我们完成了线程之间的数据交换和同步的操作。由于oschina博客的字数限制,之后一篇文章,将利用java7引入的新的api来进一步改进上面的方案。