Example
Home ] Up ]

 

 

Here's an example of the use of a blocking queue, modified from Core Java, Vol 1, Fundamentals.

It uses a blocking queue to coordinate communications between a FileEnumerationTask, and a set of threads each running a SearchFileTask, in order to find instances of a specified keyword in all the .java files within a specified directory and its subdirectories.

package threads;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;

/**
 * Class to enumerate all files in a directory and its subdirectories.
 */
public class FileEnumerationTask implements Runnable
{
  /**
   * A sentinel to indicate to the worker thread that there is
   * no more work to do.
   */
  public static final File END_WORK = new File("");
  
  /**
   * Creates a new instance of FileEnumerationTask
   *
   * @param queue the blocking queue to which the enumerated files are added
   * @param startAt the directory in which to start the enumeration
   */
  public FileEnumerationTask(BlockingQueue<File> queue, File startAt)
  {
    m_queue = queue;
    m_startingDirectory = startAt;
  }

  /**
   * The method that does the work of the thread.
   */
  public void run()
  {
    try
    {
      enumerate(m_startingDirectory);
      m_queue.put(END_WORK);  // Flag/sentinel
    }
    catch (InterruptedException ie)
    {
      // Do nothing
    }
  }
  
  /**
   * Method to recursively enumerate all the files in a given
   * directory and its subdirectories
   * 
   * @param directory the directory in which to start.
   */
  private void enumerate(File directory) throws InterruptedException
  {
    // For simplicity, restrict our interest in .java files only
    File[] files = directory.listFiles( new FileFilter()
      {
        public boolean accept(File file)
        {
          return file.isDirectory() || file.getName().endsWith(".java");
        }
      }
    );
    for (File file : files)
    {
      if (file.isDirectory())
        enumerate(file);
      else
        m_queue.put(file);
    }
  }
  
  ///// Private data /////
  private BlockingQueue<File> m_queue;
  private File m_startingDirectory;
}
package threads;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;

/**
 * Class to search a file for a specified keyword.
 */
public class SearchFileTask implements Runnable
{
  /**
   * Creates a new instance of SearchFileTask
   *
   * @param queue the queue from which to take files
   * @param keyword the keyword to search for
   */
  public SearchFileTask(BlockingQueue<File> queue, String keyword)
  {
    m_queue = queue;
    m_keyword = keyword;
  }
  
  /**
   * The method that does the work for the thread
   */
  public void run()
  {
    try
    {
      boolean done = false;
      while (!done)
      {
        File file = m_queue.take();
        if (file == FileEnumerationTask.END_WORK)
        {
          m_queue.put(file);
          done = true;
        }
        else
        {
          search(file);
        }
      }
    }
    catch (IOException ioe)
    {
      ioe.printStackTrace();
    }
    catch (InterruptedException ie)
    {
      // Do nothing
    }
  }
  
  /**
   * Searchs a file for a given keyword, and prints all matching lines.
   *
   * @param file the file to search
   */
  private void search(File file) throws IOException
  {
    Scanner in = null;
    try
    {
      in = new Scanner(new FileReader(file));
      int lineNumber = 0;
      while (in.hasNextLine())
      {
        lineNumber++;
        String line = in.nextLine();
        if (line.contains(m_keyword))
        {
          System.out.printf("%s [%d] : %s\n",
            file.getPath(), lineNumber, line);
        }
      }
    }
    finally
    {
      in.close();
    }
  }
  
  //// Private data /////
  private BlockingQueue<File> m_queue;
  private String m_keyword;
}
package threads;

import java.io.File;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Class to demonstrate the use of a BlockingQueue
 */
public class BlockingQueueDemo
{
  /**
   * Main entry point
   */
  public static void main(String[] args)
  {
    Scanner in = new Scanner(System.in);
    System.out.print("Enter starting directory: ");
    String directory = in.nextLine();
    System.out.print("Enter keyword to search for: ");
    String keyword = in.nextLine();
    
    final int FILE_QUEUE_SIZE = 10;
    final int SEARCH_THREADS = 100;
    
    BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
    
    FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
    new Thread(enumerator).start();
    
    for (int i = 1; i <= SEARCH_THREADS; i++)
    {
      new Thread( new SearchFileTask(queue, keyword) ).start();
    }
  }
}

Here's an example of the output of this program (we searched for the keyword for):

...\Threads\src\threads\CountDownProgress.java [5] :  * for the CountDownThread.
...\Threads\src\threads\CountDownThreadTester.java [17] :     for (int i = 1; i <= threadCount; i++)
...\Threads\src\threads\BlockingQueueDemo.java [21] :     System.out.print("Enter keyword to search for: ");
...\Threads\src\threads\CountDownRunnable.java [20] :       for (int i = m_countDown; i >= 0; i--)
...\Threads\src\threads\Data.java [4] :  * Dummy Data class for ReadWriteDictionary.
...\Threads\src\threads\CountDownRunnableTester.java [17] :     for (int i = 1; i <= threadCount; i++)
...\Threads\src\threads\FindFactors.java [13] :     // one for each RANGE of divisors to test for.
...\Threads\src\threads\LanguageBigots.java [19] :         for (int i = 0; i < 5; i++)
...\Threads\src\threads\ClockApplet.java [70] :               m_timeLabel.setText(dateFormatter.format(date));
...\Threads\src\threads\SearchFileTask.java [10] :  * Class to search a file for a specified keyword.
...\Threads\src\threads\Producer.java [45] :       // Loop, asking for input from the user until EOF
...\Threads\src\threads\ExplicitLockAccountTransfer3.java [71] :     System.out.println(getName() + " awaiting sufficient funds for " + amount);
...\Threads\src\threads\TestThreadGroup.java [54] :     for (int i = 0; i < 10000; i++)
...\Threads\src\threads\CountDownThread.java [23] :         sleep(1000);    // Sleep for 1 second
...\Threads\src\threads\ThreadListerApplet.java [29] :   public void actionPerformed(ActionEvent e)
...\Threads\src\unsynchronized\SteamBoiler.java [16] :     for (int burner = 0; burner < BURNER_COUNT; burner++)
...\Threads\src\threads\FileEnumerationTask.java [63] :     for (File file : files)
...\Threads\src\threads\ExplicitLockAccountTransfer2.java [94] :         // Try to acquire the lock for the from account
...\Threads\src\threads\SteamBoilerApplet.java [55] :       public void actionPerformed(ActionEvent event)
...\Threads\src\threads\PipedProducer.java [53] :       // Loop, asking for input from the user until EOF
...\Threads\src\threads\SteamBoiler.java [16] :     for (int burner = 0; burner < BURNER_COUNT; burner++)
...\Threads\src\threads\BlockingQueueDemo.java [32] :     for (int i = 1; i <= SEARCH_THREADS; i++)
...\Threads\src\unsynchronized\SteamBoilerApplet.java [56] :       public void actionPerformed(ActionEvent event)
...\Threads\src\threads\CountDownRunnable.java [22] :         Thread.sleep(1000);    // Sleep for 1 second
...\Threads\src\threads\FindFactors.java [17] :     for (int thread = 0; thread < threadCount; thread++)
...\Threads\src\threads\SearchFileTask.java [18] :    * @param keyword the keyword to search for
...\Threads\src\threads\Producer.java [68] :  *  This is a Consumer thread that waits for input from
...\Threads\src\threads\ExplicitLockAccountTransfer3.java [117] :         // Try to acquire the lock for the from account
...\Threads\src\threads\TestThreadGroup.java [85] :     for (int i = 0; i < m_indent; i++)
...\Threads\src\threads\ThreadListerApplet.java [75] :     for(Enumeration e = appContext.getApplets(); 
...\Threads\src\unsynchronized\SteamBoiler.java [41] :     for (int burner = 0; burner < BURNER_COUNT; burner++)
...\Threads\src\threads\ExplicitLockAccountTransfer2.java [102] :             // Try to acquire the lock for the to account
...\Threads\src\threads\SteamBoilerApplet.java [70] :       public void actionPerformed(ActionEvent event)
...\Threads\src\threads\PipedProducer.java [76] :  *  This is a PipedConsumer thread that waits for input from
...\Threads\src\threads\SteamBoiler.java [41] :     for (int burner = 0; burner < BURNER_COUNT; burner++)
...\Threads\src\unsynchronized\SteamBoilerApplet.java [71] :       public void actionPerformed(ActionEvent event)
...\Threads\src\threads\FindFactors.java [46] :     for (long div = m_from; div <= m_to && div < m_number; div++)
...\Threads\src\threads\SearchFileTask.java [27] :    * The method that does the work for the thread
...\Threads\src\threads\ExplicitLockAccountTransfer3.java [128] :             // Try to acquire the lock for the to account
...\Threads\src\threads\TestThreadGroup.java [97] :     for (int i = 0; i < threads.length; i++)
...\Threads\src\unsynchronized\SteamBoiler.java [45] :         burners[burner].join();     // Wait for thread to finish
...\Threads\src\threads\ExplicitLockAccountTransfer2.java [115] :               // We succeeded, so break out of the for while loop.
...\Threads\src\threads\SteamBoiler.java [45] :         burners[burner].join();     // Wait for thread to finish
...\Threads\src\threads\SearchFileTask.java [59] :    * Searchs a file for a given keyword, and prints all matching lines.
...\Threads\src\threads\ExplicitLockAccountTransfer3.java [141] :               // We succeeded, so break out of the for while loop.
...\Threads\src\threads\TestThreadGroup.java [111] :     for (int i = 0; i < threadGroups.length; i++)
...\Threads\src\threads\TestThreadGroup.java [128] :     for (int i = 0; i < m_indent; i++)

 (Note that the ...\ represents uninteresting, machine-specific path information.)

 
The page was last updated February 19, 2008