Previous Up Next

This HTML version of is provided for convenience, but it is not the best format for the book. In particular, some of the symbols are not rendered correctly.

You might prefer to read the PDF version.

Chapter 10  Condition variables

Many simple synchronization problems can be solved using mutexes as shown in the previous chapter. In this chapter I introduce a bigger challenge, the well-known “Producer-Consumer problem”, and a new tool to solve it, the condition variable.

10.1  The work queue

In some multi-threaded programs, threads are organized to perform different tasks. Often they communicate with each other using a queue, where some threads, called “producers”, put data into the queue and other threads, called “consumers”, take data out.

For example, in applications with a graphical user interface, there might be one thread that runs the GUI, responding to user events, and another thread that processes user requests. In that case, the GUI thread might put requests into a queue and the “back end” thread might take requests out and process them.

To support this organization, we need a queue implementation that is “thread safe”, which means that both threads (or more than two) can access the queue at the same time. And we need to handle at least one special case, when the queue is empty; and, if the size of the queue is bounded, when the queue is full.

I’ll start with a simple queue that is not thread safe, then we’ll see what goes wrong and fix it. The code for this example is in the repository for this book, in a folder called queue. The file queue.c contains a basic implementation of a circular buffer, which you can read about at https://en.wikipedia.org/wiki/Circular_buffer.

Here’s the structure definition:

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
} Queue;

array is the array that contains the elements of the queue. For this example the elements are ints, but more generally they would be structures that contain user events, items of work, etc.

length is the length of the array. next_in is an index into the array that indices where the next element should be added; similarly, next_out is the index of the next element that should be removed.

make_queue allocates space for this structure and initializes the fields:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  return queue;
}

The initial value for next_out needs some explaining. Since the queue is initially empty, there is no next element to remove, so next_out is invalid. Setting next_out == next_in is a special case that indicates that the queue is empty, so we can write:

int queue_empty(Queue *queue)
{
  return (queue->next_in == queue->next_out);
}

Now we can add elements to the queue using queue_push:

void queue_push(Queue *queue, int item) {
  if (queue_full(queue)) {
    perror_exit("queue is full");
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
}

If the queue is full, queue_push prints an error message and exits. I will explain queue_full soon.

If the queue is not full, queue_push inserts the new element and then increments next_in using queue_incr:

int queue_incr(Queue *queue, int i)
{
  return (i+1) % queue->length;
}

When the index, i, gets to the end of the array, it wraps around to 0. And that’s where we run into a tricky part. If we keep adding elements to the queue, eventually next_in wraps around and catches up with next_out. But if next_in == next_out, we would incorrectly conclude that the queue was empty.

To avoid that, we define another special case to indicate that the queue is full:

int queue_full(Queue *queue)
{
  return (queue_incr(queue, queue->next_in) == queue->next_out);
}

If incrementing next_in lands on next_out, that means we can’t add another element without making the queue seem empty. So we stop one element before the “end” (keeping in mind that the end of the queue can be anywhere, not necessarily the end of the array).

Now we can write queue_pop, which removes and returns the next element from the queue:

int queue_pop(Queue *queue) {
  if (queue_empty(queue)) {
    perror_exit("queue is empty");
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  return item;
}

If you try to pop from an empty queue, queue_pop prints an error message and exits.

10.2  Producers and consumers

Now let’s make some threads to access this queue. Here’s the producer code:

void *producer_entry(void *arg)
{
  int i;
  Shared *shared = (Shared *) arg;

  for (i=0; i<QUEUE_LENGTH-1; i++) {
    printf("adding item %d\n", i);
    queue_push(shared->queue, i);
  }
  pthread_exit(NULL);
}

Here’s the consumer code:

void *consumer_entry(void *arg)
{
  int i;
  int item;
  Shared *shared = (Shared *) arg;

  for (i=0; i<QUEUE_LENGTH-1; i++) {
    item = queue_pop(shared->queue);
    printf("consuming item %d\n", item);
  }
  pthread_exit(NULL);
}

Here’s the parent code that starts the threads and waits for them

  int i;
  pthread_t child[NUM_CHILDREN];

  Shared *shared = make_shared();

  child[0] = make_thread(producer_entry, shared);
  child[1] = make_thread(consumer_entry, shared);

  for (i=0; i<NUM_CHILDREN; i++) {
    join_thread(child[i]);
  }

And finally here’s the shared structure that contains the queue:

typedef struct {
  Queue *queue;
} Shared;

Shared *make_shared()
{
  Shared *shared = check_malloc(sizeof(Shared));
  shared->queue = make_queue(QUEUE_LENGTH);
  return shared;
}

The code we have so far is a good starting place, but it has several problems:

  • Access to the queue is not thread safe. Different threads could access array, next_in, and next_out at the same time and leave the queue in a broken, “inconsistent” state.
  • If the consumer is scheduled first, it finds the queue empty, print an error message, and exits. We would rather have the consumer block until the queue is not empty. Similarly, we would like the producer to block if the queue is full.

In the next section, we solve the first problem with a Mutex. In the following section, we solve the second problem with condition variables.

10.3  Mutual exclusion

We can make the queue thread safe with a mutex. This version of the code is in queue_mutex.c.

First we add a Mutex pointer to the queue structure:

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
  Mutex *mutex;          //-- this line is new
} Queue;

And initialize the Mutex in make_queue:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  queue->mutex = make_mutex();   //-- new
  return queue;
}

Next we add synchronization code to queue_push:

void queue_push(Queue *queue, int item) {
  mutex_lock(queue->mutex);   //-- new
  if (queue_full(queue)) {
    mutex_unlock(queue->mutex);   //-- new
    perror_exit("queue is full");
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
  mutex_unlock(queue->mutex);   //-- new
}

Before checking whether the queue is full, we have to lock the mutex. If the queue is full, we have to unlock the mutex before exiting; otherwise the thread would leave the mutex locked and no other threads could proceed.

The synchronization code for queue_pop is similar:

int queue_pop(Queue *queue) {
  mutex_lock(queue->mutex);
  if (queue_empty(queue)) {
    mutex_unlock(queue->mutex);
    perror_exit("queue is empty");
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  mutex_unlock(queue->mutex);
  return item;
}

Note that the other Queue functions, queue_full, queue_empty, and queue_incr do not try to lock the mutex. Any thread that calls these functions is required to lock the mutex first; this requirement is part of the documented interface for these functions.

With this additional code, the queue is thread safe; if you run it, you should not see any synchronization errors. But it is likely that the consumer will exit at some point because the queue is empty, or the producer will exit because the queue is full, or both.

The next step is to add condition variables.

10.4  Condition variables

A condition variable is a data structure associated with a condition; it allows threads to block until the condition becomes true. For example, thread_push might want to check whether the queue is full and, if so, block until it is not full. So the “condition” we are interested in is “queue not full”.

Similarly, thread_pop might want to wait for a condition like “queue not empty”.

Here’s how we can add these capabilities to the queue code. First we add two condition variables to the Queue structure:

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
  Mutex *mutex;
  Cond *nonempty;   //-- new
  Cond *nonfull;    //-- new
} Queue;

And initialize them in make_queue:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  queue->mutex = make_mutex();
  queue->nonempty = make_cond();   //-- new
  queue->nonfull = make_cond();    //-- new
  return queue;
}

Now in queue_pop, if we find the queue empty, we don’t exit; instead we use the condition variable to block:

int queue_pop(Queue *queue) {
  mutex_lock(queue->mutex);
  while (queue_empty(queue)) {
    cond_wait(queue->nonempty, queue->mutex);  //-- new
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  mutex_unlock(queue->mutex);
  cond_signal(queue->nonfull);   //-- new
  return item;
}

cond_wait is complicated, so let’s take it slow. The first argument is the condition variable; in this case, the condition we are waiting for is “queue not empty”. The second argument is the mutex that protects the queue. Before you call cond_wait, you have to lock the mutex. Otherwise the whole thing doesn’t work.

When the thread that locked the mutex calls cond_wait, it unlocks the mutex and then blocks. This is important. If cond_wait did not unlock the mutex before blocking, no other thread would be able to access the queue, no more items could be added, and the queue would always be empty.

So while the consumer is blocked on nonempty, the producer can run. Let’s see what happens when the producer runs queue_push:

void queue_push(Queue *queue, int item) {
  mutex_lock(queue->mutex);
  while (queue_full(queue)) {
    cond_wait(queue->nonfull, queue->mutex);    //-- new
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
  mutex_unlock(queue->mutex);
  cond_signal(queue->nonempty);  //-- new
}

Let’s assume that the queue is not full, so the producer doesn’t run cond_wait and doesn’t block. It adds a new element to the queue and then unlocks the mutex. But before exiting, it does one more thing: it “signals” the condition variable nonempty.

Signalling a condition variable indicates that the condition is true, or at least that it might be. If there are no threads waiting on the condition variable, the signal has no effect.

If there are threads waiting on the condition variable, one of them gets unblocked and resumes execution of cond_wait. But before the awakened thread can return from cond_wait, it has to wait for and lock the mutex, again.

Now go back to queue_pop and see what happens when the thread returns from cond_wait. It loops back to the top of the while loop and checks the condition again. I’ll explain why in just a second, but for now let’s assume that the condition is true; that is, the queue is not empty.

When the thread exits the while loop, we know two things: (1) the condition is true, so there is at least one item in the queue, and (2) the mutex is locked, so it is safe to access the queue.

After removing an item, queue_pop unlocks the mutex, signals that the queue is not full, and returns.

In the next section I’ll show you how my Cond code works, but first I want to answer two frequently-asked question:

  • Why is cond_wait inside a for loop rather than an if statement; that is, why do we have to check the condition again after returning from cond_wait?

    The primary reason you have to re-check the condition is the possibility of an intercepted signal. Suppose Thread A is waiting on nonempty. Thread B adds an item to the queue and signals nonempty. Thread A wakes up an tries to lock the mutex, but before it gets the chance, Evil Thread C swoops in, locks the mutex, pops the item from the queue, and unlocks the mutex. Now the queue is empty again, but Thread A is not blocked any more. Thread A could lock the mutex and returns from cond_wait. If Thread A does not check the condition again, it would try to pop an element from an empty queue, and probably cause an error.

  • The other question that comes up when people learn about condition variables is “How does the condition variable know what condition it is associated with?”

    This question is understandable because there is no explicit connection between a Cond structure and the condition it relates to. The connection is implicit in the way it is used.

    Here’s one way to think of it: the condition associated with a Cond is the thing that is false when you call cond_wait and true when you call cond_signal. Of course, there might be many conditions that are true in the first case and false in the second. The right one is only in the mind of the programmer, so it should be explained carefully in the documentation.

10.5  Condition variable implementation

The Cond structure I used in the previous section is a is a wrapper for a type called pthread_cond_t, which is defined in the POSIX threads API. It is very similar to Mutex, which is a wrapper for pthread_mutex_t. Both wrappers are defined in utils.c and utils.h.

Here’s the typedef:

typedef pthread_cond_t Cond;

make_cond allocates space, initializes the condition variable, and returns a pointer:

Cond *make_cond()
{
  Cond *cond = check_malloc(sizeof(Cond)); 
  int n = pthread_cond_init(cond, NULL);
  if (n != 0) perror_exit("make_cond failed");
 
  return cond;
}

And here are the wrappers for cond_wait and cond_signal.

void cond_wait(Cond *cond, Mutex *mutex)
{
  int n = pthread_cond_wait(cond, mutex);
  if (n != 0) perror_exit("cond_wait failed");
}

void cond_signal(Cond *cond)
{
  int n = pthread_cond_signal(cond);
  if (n != 0) perror_exit("cond_signal failed");
}

At this point there should be nothing too surprising there.

Are you using one of our books in a class?

We'd like to know about it. Please consider filling out this short survey.


Think Bayes

Think Python

Think Stats

Think Complexity


Previous Up Next