This is an example of a Producer/Consumer pattern.
The Producer creates jobs (here represented by integers in the global table tab). The Consumers remove a job, process it, then insert the result back into the table.
We have two condition variables and one mutex. One condition variable (cp) is used for the Producer thread. The Producer thread will pause on the producer condition variable if enough jobs are available for processing. The other condition variable (cc) is used for the Consumer threads. When no jobs are available, the Consumer threads will pause on that condition variable until the Producer creates more jobs.
The mutex is used to protect access to the global variable tab.
>
|
Producer := proc( m, cp, cc, max, mindiff )
global tab, e;
local i,j,n;
Threads[Mutex][Lock]( m );
j := 0;
tab[ "maxjob" ] := mindiff;
tab[ "curjob" ] := 1;
for j from 1 to mindiff
do
tab[ j ] := 2*j;
end do;
Threads[ConditionVariable][Signal]( cp );
n := false;
while ( e )
do
j := tab[ "maxjob" ];
if ( j - tab[ "curjob" ] > mindiff/2 ) then
n := true;
Threads[ConditionVariable][Wait]( cp, m );
end if;
for i from j to tab[ "curjob" ] + mindiff
do
tab[ i ] := 2*i;
end do;
tab[ "maxjob" ] := tab[ "curjob" ] + mindiff;
if ( n ) then
Threads[ConditionVariable][Broadcast]( cc );
n := false;
end if;
end do;
Threads[Mutex][Unlock]( m );
end proc:
Consumer := proc( m, cp, cc, max )
global tab, e;
local n, i, j, num;
num := 0;
Threads[Mutex][Lock]( m );
while ( num < max )
do
while ( tab[ "curjob" ] = tab[ "maxjob" ] )
do
Threads[ConditionVariable][Signal]( cp );
Threads[ConditionVariable][Wait]( cc, m );
end do;
n := tab[ "curjob" ];
j := tab[ n ];
tab[ "curjob" ] := n + 1;
Threads[Mutex][Unlock]( m );
j := add( i, i=1..j );
num := num+1;
Threads[Mutex][Lock]( m );
tab[ n ] := j;
end do;
Threads[Mutex][Unlock]( m );
end proc:
|
Start the Producer thread. We wait on cp until the Producer thread has started.
Start the Consumer threads. They will each consume 100 jobs and there are 5 threads so we should process 500 jobs.
Wait for the Consumer threads to finish.
Shutdown the Producer thread.
Check the number of processed jobs.
Check the results of one job.