Synchronizing Threads
OBJECTIVE
The purpose of this programming project is to explore process synchronization. This will be accomplished by writing a simulation to the Producer / Consumer problem described below. Your simulation will be implemented using Pthreads.
THE PRODUCER / CONSUMER PROBLEM
We developed a model of a system consisting of cooperating sequential processes or threads, all running asynchronously and possibly sharing data. We illustrated this model with the producer – consumer problem, which is representative of operating systems.
We described a technique using a circular buffer that can hold BUFFER_SIZE-1 items. By using a shared memory location count, the buffer can hold all BUFFER_SIZE items. This count is initialized to 0 and is incremented every time an item is placed into the buffer and decremented every time an item is removed from the buffer. The count data item can also be implemented as a counting semaphore.
The producer can place items into the buffer only if the buffer has a free memory location to store the item. The producer cannot add items to a full buffer. The consumer can remove items from the buffer if the buffer is not empty. The consumer must wait to consume items if the buffer is empty.
The “items” stored in this buffer will be integers. Your producer process will have to insert random numbers into the buffer. The consumer process will consume a number and detect if the number is prime.
PROJECT SPECIFICATIONS
The buffer used between producer and consumer processes will consist of a fixed-size array of type buffer_item. The queue of buffer_item objects will be manipulated using a circular array. The buffer will be manipulated with two functions, buffer_insert_item() and buffer_remove_item(), which are called by the producer and consumer threads, respectively. A skeleton outlining these function can be found in buffer.h.
The buffer_insert_item() and buffer_remove_item() functions will synchronize the producer and consumer. The buffer will also require an initialization function (not supplied in buffer.h) that initializes the mutual exclusion object “mutex” along with the “empty” and “full” semaphores.
The producer thread will alternate between sleeping for a random period of time and generating and inserting (trying to) an integer into the buffer. Random numbers will be generated using the rand_r() function. The sleep function used must be a “thread safe” sleep function.
The consumer thread will alternate between sleeping for a random period of time and removing a number out of the buffer. The number removed will then be verified if it is prime.
The main function will initialize the buffer and create the separate producer and consumer threads. Once it has created the producer and consumer threads, the main() function will sleep (thread safe) for duration of the simulation. Upon awakening, the main thread will signal other threads to quit by setting a simulation flag which is a global variable. The main thread will join with the other threads and then display the simulation statistics. The main() function will be passed five parameters on the command line:
- The length of time the main thread is to sleep before terminating (simulation length in seconds)
- The maximum length of time the producer and consumer threads will sleep prior to producing or consuming a buffer_item
- The number of producer threads
- The number of consumer threads
- A “yes” or “no” to output the individual buffer snapshots for each item produced and consumed
A skeleton for the main function appears as:
#include <buffer.h>
int main( intargc, char *argv[] )
{
Get command line arguments
Initialize buffer
Create producer thread(s)
Create consumer thread(s)
Sleep
Join Threads
Display Statistics
Exit
}
Creating Pthreads using the Pthreads API is discussed in Chapter 4 and in supplemental notes provided online. Please refer to those references for specific instructions regarding creation of the producer and consumer Pthreads.
The following code sample illustrates how mutex locks available in the Pthread API can be used to protect a critical section:
#include <pthread.h>
pthread_mutex_tmutex;
/* create the mutex lock */
pthread_mutex_init(&mutex, NULL );
/* aquire the mutex lock */
pthread_mutex_lock(&mutex );
/*** CRITICAL SECTION ***/
/* release the mutex lock */
pthread_mutex_unlock(&mutex );
Pthreads uses the pthread_mutex_t data type for mutex locks. A mutex is created with the pthread_mutex_init() function, with the first parameter being a pointer to the mutex. By passing NULL as a second parameter, we initialize the mutex to its default attributes. The mutex is acquired and released with the pthread_mutex_lock() and pthread_mutex_unlock() functions. If the mutex lock is unavailable when pthread_mutex_lock() is invoked, the calling thread is blocked until the owner invokes pthread_mutex_unlock(). All mutex functions return a value of 0 with correct operation; if an error occurs, these functions return a nonzero error code.
Pthreads provides two types of semaphores: named and unnamed. For this project, we will use unnamed semaphores. The code below illustrates how a semaphore is created:
#include <semaphore.h>
sem_tsem;
/* create the semaphore and initialize it to 5 */
sem_init(&sem, 0, 5 );
The sem_init() function creates and initializes a semaphore. This function is passed three parameters: A pointer to the semaphore, a flag indicating the level of sharing, and the semaphore’s initial value. In this example, by passing the flag 0, we are indicating that this semaphore can only be shared by threads belonging to the same process that created the semaphore. A nonzero value would allow other processes to access the semaphore as well. In this example, we initialize the semaphore to the value 5.
We described the classical wait() and signal() semaphore operations. Pthread names the wait() and signal() operations sem_wait() and sem_post(), respectively. The code example below creates a binary semaphore mutex with an initial value 1 and illustrates it use in protecting a critical section:
#include <semaphore.h>
sem_tmutex;
/* create the semaphore */
sem_init(&mutex, 0, 1 );
/* acquire the semaphore */
sem_wait(&mutex );
/*** CRITICAL SECTION ***/
/* release the semaphore */
sem_post(&mutex );
PROGRAM OUTPUT
Output for this simulation is critical to verify that your simulation program is working correctly. Use this sample as to determine what your simulation should output when various conditions occur (buffer empty/full, location of next producer/consumer, etc.) Your program output format should be identical to the following:
% osproj3 30 3 2 2 yes
Starting Threads…
(buffers occupied: 0)
buffers: -1 -1 -1 -1 -1
—- —- —- —- —-
WR
Producer 12348 writes 31
(buffers occupied: 1)
buffers: 31 -1 -1 -1 -1
—- —- —- —- —-
R W
Producer 12349 writes 4
(buffers occupied: 2)
buffers: 31 4 -1 -1 -1
—- —- —- —- —-
R W
Consumer 12350 reads 31 * * * PRIME * * *
(buffers occupied: 1)
buffers: 31 4 -1 -1 -1
—- —- —- —- —-
R W
…SOME TIME GOES BY…
Consumer 12350 reads 4
(buffers occupied: 0)
buffers: 3 4 19 31 97
—- —- —- —- —-
WR
All buffers empty. Consumer 12351 waits.
All buffers empty. Consumer 12350 waits.
…SOME TIME GOES BY…
Producer 12348 writes 41
(buffers occupied: 5)
buffers: 28 41 23 45 6
—- —- —- —- —-
RW
All buffers full. Producer 12349 waits.
…SOME TIME GOES BY…
Producer 12349 writes 10
(buffers occupied: 1)
buffers: 11 10 18 68 94
—- —- —- —- —-
R W
Consumer 12350 reads 10
(buffers occupied: 0)
buffers: 11 10 18 68 94
—- —- —- —- —-
WR
…SOME TIME GOES BY…
PRODUCER / CONSUMER SIMULATION COMPLETE
=======================================
Simulation Time: 30
Maximum Thread Sleep Time: 3
Number of Producer Threads: 2
Number of Consumer Threads: 2
Size of Buffer: 5
Total Number of Items Produced: 50
Thread 1: 30
Thread 2: 20
Total Number of Items Consumed: 48
Thread 3: 22
Thread 4: 26
Number Of Items Remaining in Buffer: 2
Number Of Times Buffer Was Full: 3
Number Of Times Buffer Was Empty: 4
Solution
buffer.h
#ifndef _BUFFER_H_DEFINED_
#define _BUFFER_H_DEFINED_
typedefintbuffer_item;
#define BUFFER_SIZE 5
boolbuffer_insert_item( buffer_item item );
boolbuffer_remove_item( buffer_item *item );
#endif // _BUFFER_H_DEFINED_
osproj3.c
#define _POSIX_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
typedef unsigned intbool;
#include “buffer.h”
/* check if the given value is a prime number. */
boolis_prime(unsigned int value);
/* global variables to indicate status */
volatileintstop_flag = 0;
unsignedintrun_time;
unsignedintmax_wait_time;
unsignedintproducer_num;
unsignedintconsumer_num;
int trace;
unsignedintfull_num = 0; /* number of times buffer was full */
unsignedintempty_num = 0; /* number of times buffer was empty */
/* the information of one thread */
structthread_t {
pthread_t id; /* thread id*/
unsignedintnum; /* number of items produced or consumed */
};
structthread_t *thds;
/* the pthread procedures for producers and consumers */
void *producer(void *param);
void *consumer(void *param);
voidinitialize_threads();
/* initialize the mutex and semaphores for synchronization
* between producers and consumers. */
voidinitialize_sync();
/* initialize and create an empty buffer of BUFFER_SIZE size. */
voidinitialize_buffer();
voiddisplay_statistics();
voidsleep_safely(unsigned int sec);
/* buffer related operations */
voidprint_buffer();
buffer_item buffer[BUFFER_SIZE];
unsignedint tail; /* the location where consumers read buffer */
unsignedint head; /* the location where producers write buffer */
unsignedintitems_num; /* number of items in the buffer */
sem_tsem_full;
sem_tsem_empty;
pthread_mutex_tmutex;
int main(intargc, char *argv[]) {
unsignedint i;
/* parameter parsing. assuming 5 parameters are provided safely */
if (argc != 6) {
printf(“Usage: %s run_timemax_wait_timeproducer_numconsumer_num [yes|no]\n”, argv[0]);
exit(-1);
}
run_time = atoi(argv[1]);
max_wait_time = atoi(argv[2]);
producer_num = atoi(argv[3]);
consumer_num = atoi(argv[4]);
trace = strcmp(argv[5], “yes”) == 0;
/* initialize buffer */
initialize_buffer();
/* initialize the mutex and semaphores */
initialize_sync();
/* create producer and consumer threads */
if (trace) {
printf(“Starting Threads…\n”);
print_buffer();
}
initialize_threads();
/* sleep for the given time */
sleep_safely(run_time);
/* join threads */
stop_flag = 1;
for (i = 0; i <producer_num + consumer_num; i++) {
/* notify the thread so it can wake up from sem_wait, pthread_mutex_lock, etc. */
pthread_cancel(thds[i].id);
pthread_join(thds[i].id, NULL);
}
/* display statistics and exit */
display_statistics();
free(thds);
return 0;
}
/* the producer function: loop until the main thread flags to stop */
void *producer(void *param) {
structthread_t *t = (structthread_t *)param;
unsignedint id = (unsigned int)t->id;
unsignedint seed = time(NULL) + id;
while (!stop_flag) {
/* generate a random value to store into buffer */
unsignedint value = rand_r(&seed) % 100;
if (buffer_insert_item(value) == 0) {
break;
}
t->num ++;
/* sleep a random time */
value = rand_r(&seed) % max_wait_time + 1;
sleep_safely(value);
}
return NULL;
}
/* the consumer function: loop until the main thread flags to stop */
void *consumer(void *param) {
structthread_t *t = (structthread_t *)param;
unsignedint id = (unsigned int)t->id;
unsignedint seed = time(NULL) + id;
while (!stop_flag) {
int value;
if (buffer_remove_item(&value) == 0) {
break;
}
t->num ++;
/* sleep a random time */
value = rand_r(&seed) % max_wait_time + 1;
sleep_safely(value);
}
return NULL;
}
voidinitialize_sync() {
pthread_mutex_init(&mutex, NULL);
sem_init(&sem_full, 0, 0); /* no items in the buffer, thus, full = 0. */
sem_init(&sem_empty, 0, BUFFER_SIZE); /* buffer is empty, thus empty = size of buffer */
}
voidinitialize_threads() {
unsignedint i;
/* create and initialize producer and consumer threads */
thds = (structthread_t *)calloc(producer_num + consumer_num, sizeof(structthread_t));
if (thds == NULL) {
printf(“could not allocate threads\n”);
exit(errno);
}
/* producer threads */
for (i = 0; i <producer_num; i++) {
errno = pthread_create(&thds[i].id, NULL, producer, (void *)&thds[i]);
if (errno != 0) {
printf(“could not create producer threads\n”);
exit(errno);
}
}
/* consumer threads */
for (i = producer_num; i <producer_num + consumer_num; i++) {
errno = pthread_create(&thds[i].id, NULL, consumer, (void *)&thds[i]);
if (errno != 0) {
printf(“could not create consumer threads\n”);
exit(errno);
}
}
}
voidsleep_safely(unsigned int sec) {
/* implement a thread-safe sleep using select. */
structtimeval timeout;
timeout.tv_sec = sec;
timeout.tv_usec = 0;
select(0, NULL, NULL, NULL, &timeout);
}
voiddisplay_statistics() {
/* calculate the total count */
unsignedinttotal_produced = 0;
unsignedinttotal_consumed = 0;
unsignedint i;
char cache[64];
for (i = 0; i <producer_num; i++) {
total_produced += thds[i].num;
}
for (i = producer_num; i <producer_num + consumer_num; i++) {
total_consumed += thds[i].num;
}
printf(“\nPRODUCER / CONSUMER SIMULATION COMPLETE\n”);
printf(“=======================================\n”);
printf(“%-40s %d\n”, “Simulation Time:”, run_time);
printf(“%-40s %d\n”, “Maximum Thread Sleep Time:”, max_wait_time);
printf(“%-40s %d\n”, “Number of Producer Threads:”, producer_num);
printf(“%-40s %d\n”, “Number of Consumer Threads:”, consumer_num);
printf(“%-40s %d\n”, “Size of Buffer:”, BUFFER_SIZE);
printf(“\n%-40s %d\n”, “Total Number of Items Produced:”, total_produced);
for (i = 0; i <producer_num; i++) {
sprintf(cache, ” Thread %d:”, i + 1);
printf(“%-40s %d\n”, cache, thds[i].num);
}
printf(“\n%-40s %d\n”, “Total Number of Items Consumed:”, total_consumed);
for (i = producer_num; i <producer_num + consumer_num; i++) {
sprintf(cache, ” Thread %d:”, i + 1);
printf(“%-40s %d\n”, cache, thds[i].num);
}
printf(“\n”);
printf(“%-40s %d\n”, “Number Of Items Remaining in Buffer:”, 0);
printf(“%-40s %d\n”, “Number Of Times Buffer Was Full:”, full_num);
printf(“%-40s %d\n”, “Number Of Times Buffer Was Empty:”, empty_num);
}
voidinitialize_buffer() {
/* initialize all the buffer entry to -1 */
unsignedint i;
for (i = 0; i < BUFFER_SIZE; i++) {
buffer[i] = -1;
}
tail = 0;
head = 0;
items_num = 0;
}
boolbuffer_insert_item(buffer_item item) {
unsigned long id = (unsigned long)pthread_self();
if (sem_trywait(&sem_empty) != 0) {
if (errno == EAGAIN) {
if (trace) {
printf(“\nAll buffers full. Producer %lu waits.\n”, id);
}
/* wait until there is space to write the data into the buffer */
if (sem_wait(&sem_empty) != 0) {
printf(“could not wait on semaphore..\n”);
return 0;
}
}
}
/* save the random number into the buffer */
if (pthread_mutex_lock(&mutex) != 0) {
printf(“could not lock the mutex…\n”);
return 0;
}
buffer[head] = item;
head = (head + 1) % BUFFER_SIZE;
items_num ++;
if (items_num == BUFFER_SIZE) {
full_num ++;
}
/* print the buffer information */
if (trace) {
printf(“\nProducer %lu writes %u\n”, id, item);
print_buffer();
}
if (pthread_mutex_unlock(&mutex) != 0) {
printf(“could not unlock the mutex…\n”);
return 0;
}
/* notify the consumers */
sem_post(&sem_full);
return 1;
}
boolbuffer_remove_item(buffer_item *item) {
buffer_item value;
unsigned long id = (unsigned long)pthread_self();
if (sem_trywait(&sem_full) != 0) {
if (errno == EAGAIN) {
if (trace) {
printf(“\nAll buffers empty. Consumer %lu waits.\n”, id);
}
/* wait until there is item available in the buffer */
if (sem_wait(&sem_full) != 0) {
printf(“could not wait on semaphore..\n”);
return 0;
}
}
}
/* read the data from the buffer */
if (pthread_mutex_lock(&mutex) != 0) {
printf(“could not lock the mutex…\n”);
return 0;
}
value = buffer[tail];
tail = (tail + 1) % BUFFER_SIZE;
items_num –;
if (items_num == 0) {
empty_num ++;
}
/* print the buffer information */
if (trace) {
if (is_prime(value)) {
printf(“\nConsumer %lu reads %u * * * PRIME * * *\n”, id, value);
} else {
printf(“\nConsumer %lu reads %u\n”, id, value);
}
print_buffer();
}
if (pthread_mutex_unlock(&mutex) != 0) {
printf(“could not unlock the mutex…\n”);
return 0;
}
/* notify the producers */
sem_post(&sem_empty);
*item = value;
return 1;
}
boolis_prime(unsigned int value) {
unsignedint i;
for (i = 2; i * i <= value; i++) {
if (value % i == 0) {
return 0;
}
}
return 1;
}
voidprint_buffer() {
unsignedint i;
printf(“(buffers occupied: %d)\n”, items_num);
/* display data in the buffer */
printf(“buffers:”);
for (i = 0; i < BUFFER_SIZE; i++) {
printf(” %2d “, buffer[i]);
}
printf(“\n”);
printf(” “);
for (i = 0; i < BUFFER_SIZE; i++) {
printf(” —-“);
}
printf(“\n”);
/* display the location of the head & tail */
printf(” “);
for (i = 0; i < BUFFER_SIZE; i++) {
printf(” %c%c “, (head == i ? ‘W’ : ‘ ‘), (tail == i ? ‘R’ : ‘ ‘));
}
printf(“\n”);
}