Programming in D – Tutorial and Reference
Ali Çehreli

Other D Resources

Data Sharing Concurrency

The previous chapter was about threads sharing information through message passing. As it has been mentioned in that chapter, message passing is a safe method of concurrency.

Another method involves more than one thread reading from and writing to the same data. For example, the owner thread can start the worker with the address of a bool variable and the worker can determine whether to terminate or not by reading the current value of that variable. Another example would be where the owner starts multiple workers with the address of the same variable so that the variable gets modified by more than one worker.

One of the reasons why data sharing is not safe is race conditions. A race condition occurs when more than one thread accesses the same mutable data in an uncontrolled order. Since the operating system pauses and starts individual threads in unspecified ways, the behavior of a program that has race conditions is unpredictable.

The examples in this chapter may look simplistic. However, the issues that they convey appear in real programs at greater scales. Also, although these examples use the std.concurrency module, the concepts of this chapter apply to the core.thread module as well.

Sharing is not automatic

Unlike most other programming languages, data is not automatically shared in D; data is thread-local by default. Although module-level variables may give the impression of being accessible by all threads, each thread actually gets its own copy:

import std.stdio;
import std.concurrency;
import core.thread;

int variable;

void printInfo(string message) {
    writefln("%s: %s (@%s)", message, variable, &variable);
}

void worker() {
    variable = 42;
    printInfo("Before the worker is terminated");
}

void main() {
    spawn(&worker);
    thread_joinAll();
    printInfo("After the worker is terminated");
}

variable that is modified inside worker() is not the same variable that is seen by main(). This fact can be observed by printing both the values and the addresses of the variables:

Before the worker is terminated: 42 (@7F26C6711670)
After the worker is terminated: 0 (@7F26C68127D0)

Since each thread gets its own copy of data, spawn() does not allow passing references to thread-local variables. For example, the following program that tries to pass the address of a bool variable to another thread cannot be compiled:

import std.concurrency;

void worker(bool * isDone) {
    while (!(*isDone)) {
        // ...
    }
}

void main() {
    bool isDone = false;
    spawn(&worker, &isDone);      // ← compilation ERROR

    // ...

    // Hoping to signal the worker to terminate:
    isDone = true;

    // ...
}

A static assert inside the std.concurrency module prevents accessing mutable data from another thread:

src/phobos/std/concurrency.d(329): Error: static assert
"Aliases to mutable thread-local data not allowed."

The address of the mutable variable isDone cannot be passed between threads.

An exception to this rule is a variable that is defined as __gshared:

__gshared int globallyShared;

There is only one copy of such a variable in the entire program and all threads can share that variable. __gshared is necessary when interacting with libraries of languages like C and C++ where data sharing is automatic by default.

shared to share mutable data between threads

Mutable variables that need to be shared must be defined with the shared keyword:

import std.concurrency;

void worker(shared(bool) * isDone) {
    while (*isDone) {
        // ...
    }
}

void main() {
    shared(bool) isDone = false;
    spawn(&worker, &isDone);

    // ...

    // Signalling the worker to terminate:
    isDone = true;

    // ...
}

Note: Prefer message-passing to signal a thread.

On the other hand, since immutable variables cannot be modified, there is no problem with sharing them directly. For that reason, immutable implies shared:

import std.stdio;
import std.concurrency;
import core.thread;

void worker(immutable(int) * data) {
    writeln("data: ", *data);
}

void main() {
    immutable(int) i = 42;
    spawn(&worker, &i);         // ← compiles

    thread_joinAll();
}

The output:

data: 42

Note that since the lifetime of i is defined by the scope of main(), it is important that main() does not terminate before the worker thread. The call to core.thread.thread_joinAll above is to make a thread wait for all of its child threads to terminate.

A race condition example

The correctness of the program requires extra attention when mutable data is shared between threads.

To see an example of a race condition let's consider multiple threads sharing the same mutable variable. The threads in the following program receive the addresses as two variables and swap their values a large number of times:

import std.stdio;
import std.concurrency;
import core.thread;

void swapper(shared(int) * first, shared(int) * second) {
    foreach (i; 0 .. 10_000) {
        int temp = *second;
        *second = *first;
        *first = temp;
    }
}

void main() {
    shared(int) i = 1;
    shared(int) j = 2;

    writefln("before: %s and %s", i, j);

    foreach (id; 0 .. 10) {
        spawn(&swapper, &i, &j);
    }

    // Wait for all threads to finish their tasks
    thread_joinAll();

    writefln("after : %s and %s", i, j);
}

Although the program above gets compiled successfully, in most cases it would work incorrectly. Observe that it starts ten threads that all access the same two variables i and j. As a result of the race conditions that they are in, they inadvertently spoil the operations of other threads.

Also observe that total number of swaps is 10 times 10 thousand. Since that amount is an even number, it is natural to expect that the variables end up having values 1 and 2, their initial values:

before: 1 and 2
after : 1 and 2    ← expected result

Although it is possible that the program can indeed produce that result, most of the time the actual outcome would be one of the following:

before: 1 and 2
after : 1 and 1    ← incorrect result
before: 1 and 2
after : 2 and 2    ← incorrect result

It is possible but highly unlikely that the result may even end up being "2 and 1" as well.

The reason why the program works incorrectly can be explained by the following scenario between just two threads that are in a race condition. As the operating system pauses and restarts the threads at indeterminate times, the following order of execution of the operations of the two threads is likely as well.

Let's consider the state where i is 1 and j is 2. Although the two threads execute the same swapper() function, remember that the local variable temp is separate for each thread and it is independent from the other temp variables of other threads. To identify those separate variables, they are renamed as tempA and tempB below.

The chart below demonstrates how the 3-line code inside the for loop may be executed by each thread over time, from top to bottom, operation 1 being the first operation and operation 6 being the last operation. Whether i or j is modified at each step is indicated by highlighting that variable:

Operation        Thread A                             Thread B
────────────────────────────────────────────────────────────────────────────

  1:   int temp = *second; (tempA==2)
  2:   *second = *first;   (i==1, j==1)

          (Assume that A is paused and B is started at this point)

  3:                                        int temp = *second; (tempB==1)
  4:                                        *second = *first;   (i==1, j==1)

          (Assume that B is paused and A is restarted at this point)

  5:   *first = temp;    (i==2, j==1)

          (Assume that A is paused and B is restarted at this point)

  6:                                        *first = temp;    (i==1, j==1)

As can be seen, at the end of the previous scenario both i and j end up having the value 1. It is not possible that they can ever have any other value after that point.

The scenario above is just one example that is sufficient to explain the incorrect results of the program. Obviously, the race conditions would be much more complicated in the case of the ten threads of this example.

synchronized to avoid race conditions

The incorrect program behavior above is due to more than one thread accessing the same mutable data (and at least one of them modifying it). One way of avoiding these race conditions is to mark the common code with the synchronized keyword. The program would work correctly with the following change:

    foreach (i; 0 .. 10_000) {
        synchronized {
            int temp = *b;
            *b = *a;
            *a = temp;
        }
    }

The output:

before: 1 and 2
after : 1 and 2      ← correct result

The effect of synchronized is to create a lock behind the scenes and to allow only one thread hold that lock at a given time. Only the thread that holds the lock can be executed and the others wait until the lock becomes available again when the executing thread completes its synchronized block. Since one thread executes the synchronized code at a time, each thread would now swap the values safely before another thread does the same. The state of the variables i and j would always be either "1 and 2" or "2 and 1" at the end of processing the synchronized block.

Note: It is a relatively expensive operation for a thread to wait for a lock, which may slow down the execution of the program noticeably. Fortunately, in some cases program correctness can be ensured without the use of a synchronized block, by taking advantage of atomic operations that will be explained below.

When it is needed to synchronize more than one block of code, it is possible to specify one or more locks with the synchronized keyword.

Let's see an example of this in the following program that has two separate code blocks that access the same shared variable. The program calls two functions with the address of the same variable, one function incrementing and the other function decrementing it equal number of times:

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        *value = *value + 1;
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        *value = *value - 1;
    }
}

Note: If the shorter equivalents of the expression above are used (i.e. ++(*value) and ‑‑(*value)), then the compiler warns that such read-modify-write operations on shared variables are deprecated.

Unfortunately, marking those blocks individually with synchronized is not sufficient, because the anonymous locks of the two blocks would be independent. So, the two code blocks would still be accessing the same variable concurrently:

import std.stdio;
import std.concurrency;
import core.thread;

enum count = 1000;

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        synchronized { // ← This lock is different from the one below.
            *value = *value + 1;
        }
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        synchronized { // ← This lock is different from the one above.
            *value = *value - 1;
        }
    }
}

void main() {
    shared(int) number = 0;

    foreach (i; 0 .. 100) {
        spawn(&incrementer, &number);
        spawn(&decrementer, &number);
    }

    thread_joinAll();
    writeln("Final value: ", number);
}

Since there are equal number of threads that increment and decrement the same variable equal number of times, one would expect the final value of number to be zero. However, that is almost never the case:

Final value: -672    ← not zero

For more than one block to use the same lock or locks, the lock objects must be specified within the synchronized parentheses:

Note: This feature is not supported by dmd 2.098.1.

    // Note: dmd 2.098.1 does not support this feature.
    synchronized (lock_object, another_lock_object, ...)

There is no need for a special lock type in D because any class object can be used as a synchronized lock. The following program defines an empty class named Lock to use its objects as locks:

import std.stdio;
import std.concurrency;
import core.thread;

enum count = 1000;

class Lock {
}

void incrementer(shared(int) * value, shared(Lock) lock) {
    foreach (i; 0 .. count) {
        synchronized (lock) {
            *value = *value + 1;
        }
    }
}

void decrementer(shared(int) * value, shared(Lock) lock) {
    foreach (i; 0 .. count) {
        synchronized (lock) {
            *value = *value - 1;
        }
    }
}

void main() {
    shared(Lock) lock = new shared(Lock)();
    shared(int) number = 0;

    foreach (i; 0 .. 100) {
        spawn(&incrementer, &number, lock);
        spawn(&decrementer, &number, lock);
    }

    thread_joinAll();
    writeln("Final value: ", number);
}

Because this time both synchronized blocks are connected by the same lock, only one of them is executed at a given time and the result is zero as expected:

Final value: 0       ← correct result

Class types can be defined as synchronized as well. This means that all of the non-static member functions of that type are synchronized on a given object of that class:

synchronized class Class {
    void foo() {
        // ...
    }

    void bar() {
        // ...
    }
}

The following is the equivalent of the class definition above:

class Class {
    void foo() {
        synchronized (this) {
            // ...
        }
    }

    void bar() {
        synchronized (this) {
            // ...
        }
    }
}

When blocks of code need to be synchronized on more than one object, those objects must be specified together. Otherwise, it is possible that more than one thread may have locked objects that other threads are waiting for, in which case the program may be deadlocked.

A well known example of this problem is a function that tries to transfer money from one bank account to another. For this function to work correctly in a multi-threaded environment, both of the accounts must first be locked. However, the following attempt would be incorrect:

void transferMoney(shared BankAccount from,
                   shared BankAccount to) {
    synchronized (from) {           // ← INCORRECT
        synchronized (to) {
            // ...
        }
    }
}

The error can be explained by an example where one thread attempting to transfer money from account A to account to B while another thread attempting to transfer money in the reverse direction. It is possible that each thread may have just locked its respective from object, hoping next to lock its to object. Since the from objects correspond to A and B in the two threads respectively, the objects would be in locked state in separate threads, making it impossible for the other thread to ever lock its to object. This situation is called a deadlock.

The solution to this problem is to define an ordering relation between the objects and to lock them in that order, which is handled automatically by the synchronized statement. In D, it is sufficient to specify the objects in the same synchronized statement for the code to avoid such deadlocks:

Note: This feature is not supported by dmd 2.098.1.

void transferMoney(shared BankAccount from,
                   shared BankAccount to) {
    // Note: dmd 2.098.1 does not support this feature.
    synchronized (from, to) {       // ← correct
        // ...
    }
}
shared static this() for single initialization and shared static ~this() for single finalization

We have already seen that static this() can be used for initializing modules, including their variables. Because data is thread-local by default, static this() must be executed by every thread so that module-level variables are initialized for all threads:

import std.stdio;
import std.concurrency;
import core.thread;

static this() {
    writeln("executing static this()");
}

void worker() {
}

void main() {
    spawn(&worker);

    thread_joinAll();
}

The static this() block above would be executed once for the main thread and once for the worker thread:

executing static this()
executing static this()

This would cause problems for shared module variables because initializing a variable more than once would be wrong especially in concurrency due to race conditions. (That applies to immutable variables as well because they are implicitly shared.) The solution is to use shared static this() blocks, which are executed only once per program:

int a;              // thread-local
immutable int b;    // shared by all threads

static this() {
    writeln("Initializing per-thread variable at ", &a);
    a = 42;
}

shared static this() {
    writeln("Initializing per-program variable at ", &b);
    b = 43;
}

The output:

Initializing per-program variable at 6B0120    ← only once
Initializing per-thread variable at 7FBDB36557D0
Initializing per-thread variable at 7FBDB3554670

Similarly, shared static ~this() is for final operations that must be executed only once per program.

Atomic operations

Another way of ensuring that only one thread mutates a certain variable is by using atomic operations, functionality of which are provided by the microprocessor, the compiler, or the operating system.

The atomic operations of D are in the core.atomic module. We will see only two of its functions in this chapter:

atomicOp

This function applies its template parameter to its two function parameters. The template parameter must be a binary operator like "+", "+=", etc.

import core.atomic;

// ...

        atomicOp!"+="(*value, 1);    // atomic

The line above is the equivalent of the following line, with the difference that the += operation would be executed without interruptions by other threads (i.e. it would be executed atomically):

        *value += 1;                 // NOT atomic

Consequently, when it is only a binary operation that needs to be synchronized, then there is no need for a synchronized block, which is known to be slow because of needing to acquire a lock. The following equivalents of the incrementer() and decrementer() functions that use atomicOp are correct as well. Note that there is no need for the Lock class anymore either:

import core.atomic;

//...

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        atomicOp!"+="(*value, 1);
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        atomicOp!"-="(*value, 1);
    }
}

atomicOp can be used with other binary operators as well.

cas

The name of this function is the abbreviation of "compare and swap". Its behavior can be described as mutate the variable if it still has its currently known value. It is used by specifying the current and the desired values of the variable at the same time:

    bool is_mutated = cas(address_of_variable, currentValue, newValue);

The fact that the value of the variable still equals currentValue when cas() is operating is an indication that no other thread has mutated the variable since it has last been read by this thread. If so, cas() assigns newValue to the variable and returns true. On the other hand, if the variable's value is different from currentValue then cas() does not mutate the variable and returns false.

The following functions re-read the current value and call cas() until the operation succeeds. Again, these calls can be described as if the value of the variable equals this old value, replace with this new value:

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        int currentValue;

        do {
            currentValue = *value;
        } while (!cas(value, currentValue, currentValue + 1));
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        int currentValue;

        do {
            currentValue = *value;
        } while (!cas(value, currentValue, currentValue - 1));
    }
}

The functions above work correctly without the need for synchronized blocks.

In most cases, the features of the core.atomic module can be several times faster than using synchronized blocks. I recommend that you consider this module as long as the operations that need synchronization are less than a block of code.

Atomic operations enable lock-free data structures as well, which are beyond the scope of this book.

You may also want to investigate the core.sync package, which contains classic concurrency primitives in the following modules:

Summary