Skip navigation links
Tools
SDKs
Libraries
Samples & Demos
Docs
Zones
Community
Support
Multithreaded Producer-Consumer: The Easy Way 
Skip Navigation LinksHome > Docs > Articles & Whitepapers
When your threaded application needs multiple consumer threads, it's time to consider using a thread pool. Fortunately, Windows makes this easy.
Anderson Bailey  12/5/2006 
» Overview
» Pools to the Rescue
» The Sample Program
» Conclusion

Overview

The producer-consumer design uses one or more threads to produce data that is consumed by one or more other threads. A typical example is one thread reading data and passing it to one or more threads that process it. This specific design is particularly helpful when I/O is slow and you want to get processing started right away.

Code and explanation of the single-producer thread/single-consumer thread design was presented in my earlier article. There, one thread read data and placed it in blocks in a queue from which a consumer thread retrieved them. A series of signals helped handle the situations in which the queue was either full or empty.

When multiple consumer threads are involved, the management of the queue can become very complex. Not only does the queue code become more elaborate, but a whole layer of thread management must be added. Threads must be put into a dormant state when there's no work to be done and woken up when there is; and a separate thread generally has to keep track of the state of the various consumer threads. This code can be complex to develop, but even more importantly, such code is almost impossible to test completely. It is hard to envision all the possible ways the threads might interact and create those exact scenarios, since, as we know, developers have no control over when threads are scheduled for execution—this is the exclusive prerogative of the operating system. So reproducing problems of thread interactions can be very difficult.

Pools to the Rescue

Many operating systems and runtime frameworks (such as Java and .NET) solve the problem of multiple consumer threads by using a thread pool. These pools are a construct that, at a high level, vary little among different implementations.

Essentially, the operating system or runtime framework creates a bunch of threads (the actual number is rarely known beforehand), which it reuses repeatedly. A queue is set up that accepts tasks for the thread to complete. The pool logic assigns the next item in the queue to the first available thread in the pool. It keeps assigning tasks until it runs out of work or it runs out of threads. In the latter case, it either expands the pool or waits until a working thread finishes. At that point, the thread is assigned the work.

All this happens smoothly, efficiently, and without need for active management by the developer. In fact, in most cases, all the developer does is send work to the pool and automatically it is done.

In the native Windows API (that is, Win32) sending work to the queue is accomplished by calling the QueueUserWorkItem() function. Various parameters that we'll discuss shortly identify what work needs to be done and provide some guidance on how to do it. The first time this function is called, Windows sets up the thread pool and adds work to the queue; subsequent calls only add work. In most cases, as in our example, when the all the work is complete, you leave the queue alone. The threads are dormant and no overhead is caused by them waiting in hibernation, except perhaps that they do occupy some RAM. (And later if you do assign them some work, they'll be right there—ready to go.)

The Sample Program

The code in Listing 1 shows a single producer thread that reads a file specified on the command line. It reads blocks of 5000 bytes and then drops a data item into the queue that contains two integers: the number of the data block that was just read and the total number of bytes read so far.

The consumer threads in the pool retrieve these integer pairs and print the data block number to the screen. To simulate the overhead of real work, they also pause for a duration specified on the command line in milliseconds.

/*************************************************************
* Win32 implementation of a producer-consumer arrangement. It
* uses a single thread that queues data blocks for the producer
* and a thread pool for the consumer. **Win32 specific**
*
* This program is placed in the public domain.
* Anderson Bailey, 2006
*************************************************************/

#define _WIN32_WINNT 0x0500

#include <stdio.h>
#include <windows.h>

struct dataBlock {
int blockNum;
int bytesRead;
};

int addToQueue( struct dataBlock *pdbk );

long delayMillisecs;
long activeItems = 1L;
HANDLE hHandle;
bool atEOF;

#define BLK_SIZE 5000

int main ( int argc, char *argv[] )
{
FILE *infile;
char *inbuf;
int bytes_read = 0;
long bytes_read_total = 0;
long block_number = 0;
struct dataBlock *dblk;

if ( argc != 3 )
{
printf (
"Usage: threadQwithPool file delay-duration(ms)\n" );
return ( -1 );
}

infile = fopen ( argv[1], "r+b" );
if ( infile == NULL )
{
printf ( "Error opening %s\n", argv[1] );
return ( -1 );
}

inbuf = (char*) malloc ( BLK_SIZE+1 );
if ( inbuf == NULL )
{
printf ( "Could not allocate read buffer\n" );
return ( -1 );
}

delayMillisecs = atol( argv[2] );

// hHandle is an event that is set to unsignalled.
// When the number of executing threads == 0, the last
// thread will signal this event. This signal tells this
// main line that it can exit (see WaitForSingleObject()
// call below.
hHandle = CreateEvent( NULL, TRUE, FALSE, NULL );

// now start reading (so, becoming the producer thread)
bytes_read = (int) fread ( inbuf, 1, BLK_SIZE, infile );
if ( bytes_read < BLK_SIZE )
{
printf (
"Need a file longer than %d bytes\n", BLK_SIZE );
return ( -1 );
}
else
{
bytes_read_total += bytes_read;
dblk = (struct dataBlock *) malloc(
sizeof( struct dataBlock ));
dblk->blockNum = ++block_number;
dblk->bytesRead = bytes_read_total;
addToQueue( dblk );
}

while ( !feof ( infile ))
{
bytes_read = (int) fread ( inbuf, 1, BLK_SIZE, infile );
bytes_read_total += bytes_read;
dblk = (struct dataBlock *) malloc(
sizeof( struct dataBlock ));
dblk->blockNum = ++block_number;
dblk->bytesRead = bytes_read_total;
addToQueue( dblk );
//Sleep( 50 ); // if you want to, simulate I/O delay
}

printf ( "\nRead a total of %d bytes\n",
(int) bytes_read_total );

// cannot exit right away or you'll kill the threads
// executing in the thread pool, so wait to give pool time to
// finish all processing. We wait for the event handle to
// be signaled (no threads left) or for 5 seconds (that is,
// 5000 milliseconds), whichever comes first.

InterlockedDecrement( &activeItems );
if ( activeItems != 0 )
{
WaitForSingleObject( hHandle, 5000 );
}
return ( 0 );
}

/*
* The consumer thread. Pauses for command-line specified number
* of milliseconds and then prints out the block number.
*/
DWORD CALLBACK ProcessData( void* pv )
{

struct dataBlock *pdblk = (struct dataBlock *) pv;
Sleep( delayMillisecs );
printf( "processing block %d\n", pdblk->blockNum );

// now, decrement the number of active work items
InterlockedDecrement( &activeItems );
if ( activeItems == 0 )
{
// if this is the last thread out, then signal the
// event, so that the main line knows it can exit.
printf( "activeItems = 0, signaling main thread\n" );
SetEvent( hHandle );
}
return( 0 );
}

/*
* Add items to the queue.
*/

int addToQueue( struct dataBlock *pdblk )
{
// interlocked increments increase a long integer as a
// single atomic operation, so they're safe for use by
// multiple threads.
InterlockedIncrement( &activeItems );

BOOL ret =
QueueUserWorkItem(
ProcessData,
(PVOID) pdblk,
//WT_EXECUTEDEFAULT ); << don't use, despite docs.
WT_EXECUTELONGFUNCTION );

if ( ! ret )
printf(
"Error occurred with blk %ld, bytes read: %ld\n",
pdblk->blockNum, pdblk->bytesRead );

return( 0 );
}

Listing 1. Using Windows thread pool for multiple consumer threads.

This program accepts two command-line arguments: the name of a file and the duration to pause each consumer thread when it processes a block. The key routine is the last function, addToQueue() which adds a block containing the two integers explained previously. A pointer to the block is the sole parameter to this function. It calls the QueueUserWorkItem() API mentioned earlier.

This Windows function takes three parameters: a pointer to a function that specifies what work the consumer thread should perform. In our case, this function is ProcessData() which prints the block number to the screen. Notice, it returns a DWORD and is of type CALLBACK, both of which are required by Windows. The second parameter to the QueueUserWorkItem() is a pointer to void, which is a generic pointer to one parameter to pass to the called function. If you have multiple parameters to pass, you create a structure containing the various data items, as we did here, and then pass in the pointer to that structure.

This arrangement of using a function pointer and a pointer to a void will be immediately familiar to developers who have used threads in Windows and other operating systems. It is the common metaphor for tying a thread to a specific piece of work.

The final parameter to the API, however, is uniquely Windows. It is a flag that lets the developer tell Windows something about managing this piece of work. The Microsoft recommendation is to default to WT_EXECUTEDEFAULT, which means treat this piece of work as a generic non-I/O thread. (A generic I/O thread would use the WT_EXECUTEINIOTHREAD flag.)

On my test system, the program written with the WT_EXECUTEDEFAULT flag created three threads: the main thread (which becomes the producer) and two consumer threads (one for each core on my processor). This arrangement worked satisfactorily, but speed was not great. Normally, keeping the number of executing threads equal to or slightly greater than the number of execution pipelines (or cores, in this case) is efficient. However, here, this was not so: Consider that when a consumer thread finishes it must wait for the pool to fetch the new piece of work, get the parameters, and call it again. Ideally, you'd like to have plenty of threads in the queue already lined up this way and raring to go.

This can be done by fooling Windows by using the WT_EXECUTELONGFUNCTION flag. It tells Windows that this piece of work might take a very long time to execute. In response, Windows won’t wait for it to return, but will instead add a new thread to the pool and load it with the new work. This little trick, which gets rid of the latency when a thread finishes and before it starts up again, is so well known that there is speculation this flag will use the WT_EXECUTELONGFUNCTION setting as the default in future releases of the API.

If we run this code with this faster flag on a file of 300KB, part of the output is shown in Figure 1. Notice several things: the blocks are not necessarily executed in order. More importantly, notice the break after block 36. This line prints the total amount of data read. As you can see in the code, this print out occurs when input I/O is complete. Its presence here shows that we processed 36 of 60 blocks while the input I/O was occurring. This advantage of processing data while I/O is ongoing is the whole point behind this threading work.

Fig 1

Figure 1. Sample output from the program.

Notice that some 25 threads complete after the producer thread finishes. If the producer thread is the main thread, therefore, you can't just have it exit when it completes its work, because exiting will kill off the thread pool as well as all the threads still working. For this reason, you’ll notice that the code waits, using the call WaitForSingleObject(hHandle, 5000). This API waits for an event (hHandle) to be signaled or 5000 milliseconds, whichever comes first. hHandle is a standard Windows handle (the equivalent of a pointer to a system resource). It is mapped to an event just before the main processing loop. This event is signaled by the consumer threads, the first time the count of work items reaches 0 (that is, when all work is complete). This count (activeItems) is maintained by the threads (incremented when work is queued, decremented as they finish). These changes in value use the InterlockedIncrement() API, which performs an atomic increment—that is, one that is thread-safe. (This is a Windows-only construct, but it can be emulated easily on Linux and other systems, by placing a mutex around an increment statement.)

A corresponding API, InterlockedDecrement() is used to decrease the number by one. If the consumer threads do not finish 5 seconds after the main line begins waiting on them, the call to WaitForSingleObject() aborts the remaining processing and exits the program. This is done here for illustration, but in most programs you'll still want to have some ultimate time limit so that a wayward thread will not hold up the entire program.

If the working thread count reaches 0 before this time limit (as it should), a note to this effect appears as the last line of the output, as shown here:

processing block 55
processing block 57
processing block 56
processing block 58
activeThreads = 0, signaling main thread

Conclusion

This program has shown how straightforward it is to provide a mechanism for multiple threads to consume the data generated by a producer thread. There are alternative solutions and a number of difficulties for more complex implementations that we haven't touched on here. We will cover those advanced topics in a future article, also touching on some of the changes that will be found in Windows Vista's thread pool API and implementation.

Anderson Bailey is a developer with a longstanding interest in the techniques for using code to exploit processor features. He can be reached at chip.coder@gmail.com.

Back to top
� 2010 Advanced Micro Devices, Inc. AMD, the AMD Arrow logo, AMD Opteron, AMD Athlon, AMD Turion, AMD Sempron, AMD Phenom, ATI Radeon, Catalyst, AMD LIVE!, and combinations thereof, are trademarks of Advanced Micro Devices, Inc. Microsoft and Windows are registered trademarks of Microsoft Corporation in the United States and/or other jurisdictions. Linux is a registered trademark of Linus Torvalds. Other names are for informational purposes only and may be trademarks of their respective owners.

This website may be linked to other websites which are not in the control of and are not maintained by AMD. AMD is not responsible for the content of those sites. AMD provides these links to you only as a convenience, and the inclusion of any link to such sites does not imply endorsement by AMD of those sites. AMD reserves the right to terminate any link or linking program at any time.