D Programlama Dili – Programlama dersleri ve D referansı
Ali Çehreli

çokuzlu: [tuple], bir kaç parçanın diziye benzer biçimde bir araya gelmesinden oluşan yapı
değişmez: [immutable], programın çalışması süresince kesinlikle değişmeyen
eş zamanlı programlama: [concurrency], iş parçacıklarının birbirlerine bağımlı olarak işlemeleri
giriş/çıkış'a bağlı: [I/O bound], zamanının çoğunu giriş/çıkış işlemlerini bekleyerek geçiren
görev: [task], programın geri kalanıyla koşut işletilebilen işlem birimi
iş parçacığı: [thread], işletim sisteminin program işletme birimi
koşut işlemler: [parallelization], bağımsız işlemlerin aynı anda işletilmeleri
mesajlaşma: [message passing], iş parçacıklarının birbirlerine mesaj göndermeleri
mikro işlemciye bağlı: [CPU bound], zamanının çoğunu mikro işlemciyi işleterek geçiren
... bütün sözlük



İngilizce Kaynaklar


Diğer




Mesajlaşarak Eş Zamanlı Programlama

Eş zamanlı programlama bir önceki bölümde gördüğümüz koşut işlemlere çok benzer. İkisi de işlemlerin farklı iş parçacıkları üzerinde aynı anda işletilmeleri ile ilgilidir ve aslında koşut işlemler de perde arkasında eş zamanlı programlama ile gerçekleştirilir. Bu iki kavram bu yüzden çok karıştırılır.

Koşut işlemlerle eş zamanlı programlama arasındaki farklar şunlardır:

D hem mesajlaşmaya dayanan hem de veri paylaşımına dayanan eş zamanlı programlamayı destekler. Veri paylaşımı ile hatasız programlar üretmek çok zor olduğundan modern programcılıkta mesajlaşma yöntemi benimsenmiştir. Bu bölümde std.concurrency modülünün sağladığı mesajlaşma olanaklarını, bir sonraki bölümde ise veri paylaşımına dayalı eş zamanlı programlama olanaklarını göreceğiz.

Kavramlar

İş parçacığı (thread): İşletim sistemi bütün programları iş parçacığı adı verilen işlem birimleri ile işletir. Çalıştırılan her D programının main() ile başlayan işlemleri işletim sisteminin o programı çalıştırmak için seçmiş olduğu bir iş parçacığı üzerinde başlatılır. main()'in işlettiği bütün işlemler normalde hep aynı iş parçacığı üzerinde işletilirler. Program, gerektiğinde kendisi başka iş parçacıkları başlatabilir ve böylece aynı anda birden fazla iş yürütebilir. Örneğin bir önceki bölümde gördüğümüz her görev, std.parallelism'in olanakları tarafından başlatılmış olan bir iş parçacığını kullanır.

İşletim sistemi iş parçacıklarını önceden kestirilemeyecek anlarda duraksatır ve tekrar başlatır. Bunun sonucunda örneğin aşağıdaki kadar basit işlemler bile bir süre yarım kalmış olabilirler:

    ++i;

Yukarıdaki işlem aslında üç adımdan oluşur: Değişkenin değerinin okunması, değerin arttırılması ve tekrar değişkene atanması. İşletim sisteminin bu iş parçacığını duraksattığı bir anda bu adımlar sonradan devam edilmek üzere yarım kalmış olabilirler.

Mesaj (message): İş parçacıklarının işleyişleri sırasında birbirlerine gönderdikleri bilgilere mesaj denir. Mesaj her türden ve her sayıda değişkenden oluşabilir.

İş parçacığı kimliği (Tid): Her iş parçacığının bir kimliği vardır. Kimlik, gönderilen mesajın alıcısı olan iş parçacığını belirler.

Sahip (owner): İş parçacığı başlatan her iş parçacığı, başlatılan iş parçacığının sahibi olarak anılır.

İşçi (worker): Başlatılan iş parçacığına işçi denir.

İş parçacıklarını başlatmak

Yeni bir iş parçacığı başlatmak için spawn() kullanılır. spawn() parametre olarak bir işlev alır ve yeni iş parçacığını o işlevden başlatır. O işlevin belki de başka işlevlere de dallanarak devam eden işlemleri artık yeni iş parçacığı üzerinde işletilir. spawn() ile task() arasındaki bir fark, spawn() ile başlatılan iş parçacıklarının birbirlerine mesaj gönderebilmeleridir.

İşçinin başlatılmasından sonra sahip ve işçi birbirlerinden bağımsız iki alt program gibi işlemeye devam ederler:

import std.stdio;
import std.concurrency;
import core.thread;

void işçi() {
    foreach (i; 0 .. 5) {
        Thread.sleep(500.msecs);
        writeln(i, " (işçi)");
    }
}

void main() {
    spawn(&işçi);

    foreach (i; 0 .. 5) {
        Thread.sleep(300.msecs);
        writeln(i, " (main)");
    }

    writeln("main tamam");
}

İşlemlerin aynı anda işletildiklerini gösterebilmek için buradaki örneklerde de Thread.sleep'ten yararlanıyorum. Programın çıktısı main()'den ve işçi()'den başlamış olan iki iş parçacığının diğerinden bağımsız olarak işlediğini gösteriyor:

0 (main)
0 (işçi)
1 (main)
2 (main)
1 (işçi)
3 (main)
2 (işçi)
4 (main)
main tamam
3 (işçi)
4 (işçi)

Program bütün iş parçacıklarının tamamlanmasını otomatik olarak bekler. Bunu yukarıdaki çıktıda görüyoruz: main()'in sonundaki "main tamam" yazdırıldığı halde işçi() işlevinin de tamamlanması beklenmiştir.

İş parçacığını başlatan işlevin aldığı parametreler spawn()'a işlev isminden sonra verilirler. Aşağıdaki programdaki iki işçi, çıkışa dörder tane sayı yazdırıyor. Hangi değerden başlayacağını parametre olarak alıyor:

import std.stdio;
import std.concurrency;
import core.thread;

void işçi(int başlangıçDeğeri) {
    foreach (i; 0 .. 4) {
        Thread.sleep(500.msecs);
        writeln(başlangıçDeğeri + i);
    }
}

void main() {
    foreach (i; 1 .. 3) {
        spawn(&işçi, i * 10);
    }
}

İş parçacıklarından birisinin yazdırdıklarını işaretli olarak belirtiyorum. İşletim sisteminin iş parçacıklarını başlatmasına ve duraklatmasına bağlı olarak bu çıktıdaki satırlar farklı sırada da olabilirler:

10
20
11
21
12
22
13
23

İşletim sistemleri belirli bir anda işlemekte olan iş parçacıklarının sayısı konusunda kısıtlama getirir. Bu kısıtlamalar kullanıcı başına olabileceği gibi, bütün sistem başına veya herhangi başka bir kavramla ilgili olabilir. Mikro işlemciyi meşgul ederek işleyen iş parçacıklarının sayısı sistemdeki çekirdek sayısından fazla olduğunda bütün sistemin performansı düşebilir. Belirli bir anda mikro işlemciyi meşgul ederek işlemekte olan iş parçacıklarına mikro işlemciye bağlı denir. Öte yandan, bazı iş parçacıkları zamanlarının çoğunu iş yaparak değil, belirli bir olayın gerçekleşmesini bekleyerek geçirirler. Örneğin, kullanıcıdan veya ağ üzerinden bilgi gelmesinin veya bir Thread.sleep çağrısının sonlanmasının beklenmesi sırasında mikro işlemci meşgul değildir. Böyle durumdaki iş parçacıklarına giriş/çıkış'a bağlı denir. İş parçacıklarının çoğunluğunun giriş/çıkış'a bağlı olarak işlediği bir programın sistemdeki çekirdek sayısından daha fazla iş parçacığı başlatmasında bir sakınca yoktur. Program hızıyla ilgili her tasarım kararında olması gerektiği gibi, bu konudaki kararlarınızı da ölçümler yaparak vermenizi öneririm.

İş parçacıklarının kimlikleri

thisTid() iş parçacığının kendi kimliğini döndürür. İsmi "bu iş parçacığının kimliği" anlamına gelen "this thread's identifier"dan türemiştir. Bir işlev olmasına rağmen daha çok parantezsiz kullanılır:

import std.stdio;
import std.concurrency;

void kimlikBilgisi(string açıklama) {
    writefln("%s: %s", açıklama, thisTid);
}

void işçi() {
    kimlikBilgisi("işçi ");
}

void main() {
    spawn(&işçi);
    kimlikBilgisi("sahip");
}

Tid türündeki kimliğin değerinin program açısından bir önemi olmadığından bu türün toString işlevi bile tanımlanmamıştır. Bu yüzden programın aşağıdaki çıktısında yalnızca türün ismini görüyoruz:

sahip: Tid(std.concurrency.MessageBox)
işçi : Tid(std.concurrency.MessageBox)

Çıktıları aynı olsa da sahip ve işçinin kimlikleri farklıdır.

spawn()'ın bu noktaya kadar gözardı etmiş olduğum dönüş değeri de işçinin kimliğini sahibe bildirir:

    Tid işçim = spawn(&işçi);

Her işçinin sahibinin kimliği ise ownerTid() işlevi ile elde edilir.

Özetle, sahibin kimliği ownerTid değişkeni ile, işçinin kimliği de spawn'ın dönüş değeri ile elde edilmiş olur.

Mesajlaşma

Mesaj göndermek için send(), belirli türden mesaj beklemek için de receiveOnly() kullanılır. (Çeşitli türlerden mesaj bekleyen receive()'i ve belirli süreye kadar bekleyen receiveTimeout()'u daha aşağıda göstereceğim.)

Aşağıdaki programdaki sahip iş parçacığı işçisine int türünde bir mesaj göndermekte ve ondan double türünde bir mesaj beklemektedir. Bu iş parçacıkları sahip sıfırdan küçük bir değer gönderene kadar mesajlaşmaya devam edecekler. Önce sahip iş parçacığını gösteriyorum:

void main() {
    Tid işçi = spawn(&işçiİşlevi);

    foreach (değer; 1 .. 5) {
        işçi.send(değer);
        double sonuç = receiveOnly!double();
        writefln("gönderilen: %s, alınan: %s", değer, sonuç);
    }

    /* Sonlanmasını sağlamak için işçiye sıfırdan küçük bir
     * değer gönderiyoruz */
    işçi.send(-1);
}

main(), spawn()'ın döndürdüğü iş parçacığının kimliğini işçi ismiyle saklamakta ve bu kimliği send() ile mesaj gönderirken kullanmaktadır.

İşçi ise kullanacağı int'i bir mesaj olarak alıyor, onu bir hesapta kullanıyor ve ürettiği double'ı yine bir mesaj olarak sahibine gönderiyor:

void işçiİşlevi() {
    int değer = 0;

    while (değer >= 0) {
        değer = receiveOnly!int();
        double sonuç = cast(double)değer / 5;
        ownerTid.send(sonuç);
    }
}

Yukarıdaki iş parçacığı mesajdaki değerin beşte birini hesaplar. Programın çıktısı şöyle:

gönderilen: 1, alınan: 0.2
gönderilen: 2, alınan: 0.4
gönderilen: 3, alınan: 0.6
gönderilen: 4, alınan: 0.8

Birden fazla değer aynı mesajın parçası olarak gönderilebilir:

    ownerTid.send(thisTid, 42, 1.5);

Aynı mesajın parçası olarak gönderilen değerler alıcı tarafta bir çokuzlunun üyeleri olarak belirirler. receiveOnly()'nin şablon parametrelerinin mesajı oluşturan türlere uymaları şarttır:

    /* Tid, int, ve double türlerinden oluşan bir mesaj
     * bekliyoruz */
    auto mesaj = receiveOnly!(Tid, int, double)();

    /* Mesaj bir çokuzlu olarak alınır */
    auto gönderen = mesaj[0];    // Tid türünde
    auto tamsayı =  mesaj[1];    // int türünde
    auto kesirli =  mesaj[2];    // double türünde

Türler uymadığında "mesaj uyumsuzluğu" anlamına gelen MessageMismatch hatası atılır:

import std.concurrency;

void işçiİşlevi() {
    ownerTid.send("merhaba");    // ← string gönderiyor
}

void main() {
    spawn(&işçiİşlevi);

    auto mesaj = receiveOnly!double();    // ← double bekliyor
}

Çıktısı:

std.concurrency.MessageMismatch@std/concurrency.d(202):
Unexpected message type: expected 'double', got 'immutable(char)[]'
Örnek

Şimdiye kadar gördüğümüz kavramları kullanan basit bir benzetim programı tasarlayalım.

Bu örnek iki boyutlu düzlemdeki robotların birbirlerinden bağımsız ve rasgele hareketlerini belirliyor. Her robotu farklı bir iş parçacığı yönetiyor. Her iş parçacığı başlatılırken üç bilgi alıyor:

Yukarıdaki üç bilgiyi bir arada tutan bir İş yapısı şöyle tanımlanabilir:

struct İş {
    size_t robotNumarası;
    Yer başlangıç;
    Duration dinlenmeSüresi;
}

Bu iş parçacığının yaptığı tek iş, robotun numarasını ve hareketini bir sonsuz döngü içinde sahibine göndermek:

void gezdirici(İş iş) {
    Yer nereden = iş.başlangıç;

    while (true) {
        Thread.sleep(iş.dinlenmeSüresi);

        Yer nereye = rasgeleKomşu(nereden);
        Hareket hareket = Hareket(nereden, nereye);
        nereden = nereye;

        ownerTid.send(HareketMesajı(iş.robotNumarası, hareket));
    }
}

Sahip de sonsuz bir döngü içinde bu mesajları bekliyor. Aldığı mesajların hangi robotla ilgili olduğunu her mesajın parçası olan robot numarasından anlıyor:

    while (true) {
        auto mesaj = receiveOnly!HareketMesajı();

        writefln("%s %s",
                 robotlar[mesaj.robotNumarası],
                 mesaj.hareket);
    }

Bu örnekteki bütün mesajlar işçilerden sahibe gönderiliyor. Daha karmaşık programlarda her iki yönde ve çok çeşitli türlerden mesajlar da gönderilebilir. Programın tamamı şöyle:

import std.stdio;
import std.random;
import std.string;
import std.concurrency;
import core.thread;

struct Yer {
    int satır;
    int sütun;

    string toString() {
        return format("%s,%s", satır, sütun);
    }
}

struct Hareket {
    Yer nereden;
    Yer nereye;

    string toString() {
        return ((nereden == nereye)
                ? format("%s (durgun)", nereden)
                : format("%s -> %s", nereden, nereye));
    }
}

class Robot {
    string görünüm;
    Duration dinlenmeSüresi;

    this(string görünüm, Duration dinlenmeSüresi) {
        this.görünüm = görünüm;
        this.dinlenmeSüresi = dinlenmeSüresi;
    }

    override string toString() {
        return format("%s(%s)", görünüm, dinlenmeSüresi);
    }
}

/* 0,0 noktası etrafında rasgele bir yer döndürür */
Yer rasgeleYer() {
    return Yer(uniform!"[]"(-10, 10), uniform!"[]"(-10, 10));
}

/* Verilen değerin en fazla bir adım ötesinde bir değer
 * döndürür */
int rasgeleAdım(int şimdiki) {
    return şimdiki + uniform!"[]"(-1, 1);
}

/* Verilen Yer'in komşusu olan bir Yer döndürür; çapraz
 * komşusu olabileceği gibi tesadüfen aynı yer de olabilir. */
Yer rasgeleKomşu(Yer yer) {
    return Yer(rasgeleAdım(yer.satır),
               rasgeleAdım(yer.sütun));
}

struct İş {
    size_t robotNumarası;
    Yer başlangıç;
    Duration dinlenmeSüresi;
}

struct HareketMesajı {
    size_t robotNumarası;
    Hareket hareket;
}

void gezdirici(İş iş) {
    Yer nereden = iş.başlangıç;

    while (true) {
        Thread.sleep(iş.dinlenmeSüresi);

        Yer nereye = rasgeleKomşu(nereden);
        Hareket hareket = Hareket(nereden, nereye);
        nereden = nereye;

        ownerTid.send(HareketMesajı(iş.robotNumarası, hareket));
    }
}

void main() {
    /* Farklı hızlardaki robotlar */
    Robot[] robotlar = [ new Robot("A",  600.msecs),
                         new Robot("B", 2000.msecs),
                         new Robot("C", 5000.msecs) ];

    /* Her birisi için bir iş parçacığı başlatılıyor */
    foreach (robotNumarası, robot; robotlar) {
        spawn(&gezdirici, İş(robotNumarası,
                             rasgeleYer(),
                             robot.dinlenmeSüresi));
    }

    /* Artık hareket bilgilerini işçilerden toplamaya
     * başlayabiliriz */
    while (true) {
        auto mesaj = receiveOnly!HareketMesajı();

        /* Bu robotla ilgili yeni bilgiyi çıkışa
         * yazdırıyoruz */
        writefln("%s %s",
                 robotlar[mesaj.robotNumarası],
                 mesaj.hareket);
    }
}

Program sonlandırılana kadar robotların konumlarını çıkışa yazdırır:

A(600 ms) -3,3 -> -4,4
A(600 ms) -4,4 -> -4,3
A(600 ms) -4,3 -> -3,2
B(2 secs) -6,9 (durgun)
A(600 ms) -3,2 -> -2,2
A(600 ms) -2,2 -> -3,1
A(600 ms) -3,1 -> -2,0
B(2 secs) -6,9 -> -5,9
A(600 ms) -2,0 (durgun)
A(600 ms) -2,0 -> -3,-1
C(5 secs) -6,6 -> -6,7
A(600 ms) -3,-1 -> -4,-1
...

Mesajlaşmaya dayanan eş zamanlı programlamanın yararını bu örnekte görebiliyoruz. Her robotun hareketi aynı anda ve diğerlerinden bağımsız olarak hesaplanıyor. Bu basit örnekteki sahip yalnızca robotların hareketlerini çıkışa yazdırıyor; bütün robotları ilgilendiren başka işlemler de uygulanabilir.

Farklı çeşitlerden mesaj beklemek

receiveOnly() yalnızca belirtilen türden mesaj bekleyebilir. receive() ise farklı çeşitlerden mesajlar beklemek için kullanılır. Parametre olarak belirsiz sayıda mesajcı işlev alır. Gelen mesaj bu mesajcı işlevlere sırayla uydurulmaya çalışılır ve mesaj, mesajın türünün uyduğu ilk işleve gönderilir.

Örneğin aşağıdaki receive() çağrısı ilki int, ikincisi de string bekleyen iki mesajcı işlev kullanmaktadır:

void işçiİşlevi() {
    bool tamam_mı = false;

    while (!tamam_mı) {
        void intİşleyen(int mesaj) {
            writeln("int mesaj: ", mesaj);

            if (mesaj == -1) {
                writeln("çıkıyorum");
                tamam_mı = true;
            }
        }

        void stringİşleyen(string mesaj) {
            writeln("string mesaj: ", mesaj);
        }

        receive(&intİşleyen, &stringİşleyen);
    }
}

Gönderilen int mesajlar intİşleyen()'e, string mesajlar da stringİşleyen()'e uyarlar. O iş parçacığını şöyle bir kodla deneyebiliriz:

import std.stdio;
import std.concurrency;

// ...

void main() {
    auto işçi = spawn(&işçiİşlevi);

    işçi.send(10);
    işçi.send(42);
    işçi.send("merhaba");
    işçi.send(-1);        // ← işçinin sonlanması için
}

Mesajlar alıcı taraftaki uygun mesajcı işlevlere gönderilirler:

int mesaj: 10
int mesaj: 42
string mesaj: merhaba
int mesaj: -1
çıkıyorum

receive(), yukarıdaki normal işlevler yerine isimsiz işlevler veya opCall() üye işlevi tanımlanmış olan türlerin nesnelerini de kullanabilir. Bunun bir örneğini görmek için programı isimsiz işlevler kullanacak şekilde değiştirelim. Ek olarak, işçinin sonlanmasını da -1 gibi özel bir değer yerine ismi açıkça Sonlan olan özel bir türle bildirelim.

Aşağıda receive()'e parametre olarak üç isimsiz işlev gönderildiğine dikkat edin. Bu işlevlerin açma ve kapama parantezlerini sarı ile belirtiyorum:

import std.stdio;
import std.concurrency;

struct Sonlan {
}

void işçiİşlevi() {
    bool devam_mı = true;

    while (devam_mı) {
        receive(
            (int mesaj) {
                writeln("int mesaj: ", mesaj);
            },

            (string mesaj) {
                writeln("string mesaj: ", mesaj);
            },

            (Sonlan mesaj) {
                writeln("çıkıyorum");
                devam_mı = false;
            });
    }
}

void main() {
    auto işçi = spawn(&işçiİşlevi);

    işçi.send(10);
    işçi.send(42);
    işçi.send("merhaba");
    işçi.send(Sonlan());
}
Beklenmeyen mesaj almak

std.variant modülünde tanımlanmış olan Variant her türden veriyi sarmalayabilen bir türdür. receive()'e verilen diğer mesajcı işlevlere uymayan mesajlar Variant türünü bekleyen bir mesajcı tarafından yakalanabilirler:

import std.stdio;
import std.concurrency;

void işçiİşlev() {
    receive(
        (int mesaj) { /* ... */ },

        (double mesaj) { /* ... */ },

        (Variant mesaj) {
            writeln("Beklemediğim bir mesaj aldım: ", mesaj);
        });
}

struct ÖzelMesaj {
    // ...
}

void main() {
    auto işçi = spawn(&işçiİşlev);
    işçi.send(ÖzelMesaj());
}

Çıktısı:

Beklemediğim bir mesaj aldım: ÖzelMesaj()

Bu bölümün konusu dışında kaldığı için Variant'ın ayrıntılarına girmeyeceğim.

Mesajları belirli süreye kadar beklemek

Mesajların belirli bir süreden daha fazla beklenmesi istenmeyebilir. Gönderen iş parçacığı geçici olarak meşgul olmuş olabilir veya bir hata ile sonlanmış olabilir. Mesaj bekleyen iş parçacığının belki de hiç gelmeyecek olan bir mesajı sonsuza kadar beklemesini önlemek için receiveTimeout() çağrılır.

receiveTimeout()'un ilk parametresi mesajın en fazla ne kadar bekleneceğini bildirir. Dönüş değeri de mesajın o süre içinde gelip gelmediğini belirtir: Mesaj alındığında true, alınmadığında ise false değeridir.

import std.stdio;
import std.concurrency;
import core.thread;

void işçi() {
    Thread.sleep(3.seconds);
    ownerTid.send("merhaba");
}

void main() {
    spawn(&işçi);

    writeln("mesaj bekliyorum");
    bool alındı = false;
    while (!alındı) {
        alındı = receiveTimeout(600.msecs,
                                (string mesaj) {
                                    writeln("geldi: ", mesaj);
                                });

        if (!alındı) {
            writeln("... henüz yok");

            /* ... burada başka işlere devam edilebilir ... */
        }
    }
}

Yukarıdaki sahip, gereken mesajı en fazla 600 milisaniye bekliyor. Mesaj o süre içinde gelmezse başka işlerine devam edebilir:

mesaj bekliyorum
... henüz yok
... henüz yok
... henüz yok
... henüz yok
geldi: merhaba

Mesajın belirli süreden uzun sürmesi sonucunda çeşitli durumlarda başka kararlar da verilebilir. Örneğin, mesaj geç geldiğinde artık bir anlamı yoktur.

İşçide atılan hatalar

Hatırlayacağınız gibi, std.parallelism modülünün çoğu olanağı görevler sırasında atılan hataları yakalar ve görevle ilgili bir sonraki işlem sırasında tekrar atmak üzere saklar. Böylece örneğin bir görevin işlemesi sırasında atılmış olan hata daha sonra yieldForce() çağrıldığında görevi başlatan tarafta yakalanabilir:

    try {
        görev.yieldForce();

    } catch (Exception hata) {
        writefln("görev sırasında bir hata olmuş: '%s'",
                 hata.msg);
    }

std.concurrency genel hata türleri konusunda kolaylık sağlamaz. Atılan olası bir hatanın sahip iş parçacığına iletilebilmesi için açıkça yakalanması ve bir mesaj halinde gönderilmesi gerekir. Bir kolaylık olarak, biraz aşağıda göreceğimiz gibi, OwnerTerminated ve LinkTerminated hataları mesaj olarak da alınabilirler.

Aşağıdaki hesapçı() işlevi string türünde mesajlar alıyor; onları double türüne çeviriyor, 0.5 değerini ekliyor ve sonucu bir mesaj olarak gönderiyor:

void hesapçı() {
    while (true) {
        auto mesaj = receiveOnly!string();
        ownerTid.send(to!double(mesaj) + 0.5);
    }
}

Yukarıdaki to!double(), "merhaba" gibi double'a dönüştürülemeyen bir dizgi ile çağrıldığında hata atar. Atılan o hata hesapçı()'dan hemen çıkılmasına neden olacağından aşağıdaki üç mesajdan yalnızca birincisinin yanıtı alınabilir:

import std.stdio;
import std.concurrency;
import std.conv;

// ...

void main() {
    Tid hesapçı = spawn(&hesapçı);

    hesapçı.send("1.2");
    hesapçı.send("merhaba");  // ← hatalı veri
    hesapçı.send("3.4");

    foreach (i; 0 .. 3) {
        auto mesaj = receiveOnly!double();
        writefln("sonuç %s: %s", i, mesaj);
    }
}

Bu yüzden, sahip "1.2"nin sonucunu 1.7 olarak alır ama işçi sonlanmış olduğundan bir sonraki mesajı alamaz:

sonuç 0: 1.7
                 ← hiç gelmeyecek olan mesajı bekleyerek takılır

Hesapçı iş parçacığının bu konuda yapabileceği bir şey, kendi işlemleri sırasında atılabilecek olan hatayı try-catch ile yakalamak ve özel bir mesaj olarak iletmektir. Programı hatanın nedenini bir HesapHatası nesnesi olarak gönderecek şekilde aşağıda değiştiriyoruz. Ek olarak, iş parçacığının sonlanması da özel Sonlan türü ile sağlanıyor:

import std.stdio;
import std.concurrency;
import std.conv;

struct HesapHatası {
    string neden;
}

struct Sonlan {
}

void hesapçı() {
    bool devam_mı = true;

    while (devam_mı) {
        receive(
            (string mesaj) {
                try {
                    ownerTid.send(to!double(mesaj) + 0.5);

                } catch (Exception hata) {
                    ownerTid.send(HesapHatası(hata.msg));
                }
            },

            (Sonlan mesaj) {
                devam_mı = false;
            });
    }
}

void main() {
    Tid hesapçı = spawn(&hesapçı);

    hesapçı.send("1.2");
    hesapçı.send("merhaba");  // ← hatalı veri
    hesapçı.send("3.4");
    hesapçı.send(Sonlan());

    foreach (i; 0 .. 3) {
        writef("sonuç %s: ", i);

        receive(
            (double mesaj) {
                writeln(mesaj);
            },

            (HesapHatası hata) {
                writefln("HATA! '%s'", hata.neden);
            });
    }
}

Hatanın nedeninin "rakam bulunamadı" anlamına gelen "no digits seen" olduğunu bu sefer görebiliyoruz:

sonuç 0: 1.7
sonuç 1: HATA! 'no digits seen'
sonuç 2: 3.9

Bu konuda diğer bir yöntem, işçinin yakaladığı hatanın olduğu gibi sahibe gönderilmesidir. Aynı hata sahip tarafından kullanılabileceği gibi tekrar atılabilir de:

// ... işçi tarafta ...
                try {
                    // ...

                } catch (shared(Exception) hata) {
                    ownerTid.send(hata);
                }},

// ... sahip tarafta ...
        receive(
            // ...

            (shared(Exception) hata) {
                throw hata;
            });

Yukarıdaki shared belirteçlerine neden gerek olduğunu bir sonraki bölümde göreceğiz.

İş parçacıklarının sonlandıklarını algılamak

İş parçacıkları alıcı tarafın herhangi bir nedenle sonlanmış olduğunu algılayabilirler.

OwnerTerminated hatası

"Sahip sonlandı" anlamına gelen bu hata işçinin bu durumdan haberinin olmasını sağlar. Aşağıdaki programdaki aracı iş parçası iki mesaj gönderdikten sonra sonlanıyor. Bunun sonucunda işçi tarafta bir OwnerTerminated hatası atılıyor:

import std.stdio;
import std.concurrency;

void main() {
    spawn(&aracıİşlev);
}

void aracıİşlev() {
    auto işçi = spawn(&işçiİşlev);
    işçi.send(1);
    işçi.send(2);
}  // ← İki mesajdan sonra sonlanıyor.

void işçiİşlev() {
    while (true) {
        auto mesaj = receiveOnly!int(); // ← Sahip sonlanmışsa
                                        //   hata atılır.
        writeln("Mesaj: ", mesaj);
    }
}

Çıktısı:

Mesaj: 1
Mesaj: 2
std.concurrency.OwnerTerminated@std/concurrency.d(248):
Owner terminated

İstendiğinde o hata işçi tarafından yakalanabilir ve böylece işçinin de hatasız olarak sonlanması sağlanabilir:

void işçiİşlev() {
    bool devam_mı = true;

    while (devam_mı) {
        try {
            auto mesaj = receiveOnly!int();
            writeln("Mesaj: ", mesaj);

        } catch (OwnerTerminated hata) {
            writeln("Sahibim sonlanmış.");
            devam_mı = false;
        }
    }
}

Çıktısı:

Mesaj: 1
Mesaj: 2
Sahibim sonlanmış.

Bu hatanın mesaj olarak da alınabileceğini biraz aşağıda göreceğiz.

LinkTerminated hatası

spawnLinked() ile başlatılmış olan bir iş parçacığı sonlandığında sahibin tarafında LinkTerminated hatası atılır. spawnLinked(), spawn() ile aynı biçimde kullanılır:

import std.stdio;
import std.concurrency;

void main() {
    auto işçi = spawnLinked(&işçiİşlev);

    while (true) {
        auto mesaj = receiveOnly!int(); // ← İşçi sonlanmışsa
                                        //   hata atılır.
        writeln("Mesaj: ", mesaj);
    }
}

void işçiİşlev() {
    ownerTid.send(10);
    ownerTid.send(20);
}  // ← İki mesajdan sonra sonlanıyor.

İşçi yalnızca iki mesaj gönderdikten sonra sonlanıyor. İşçisini spawnLinked() ile başlatmış olduğu için sahip bu durumu bir LinkTerminated hatası ile öğrenir:

Mesaj: 10
Mesaj: 20
std.concurrency.LinkTerminated@std/concurrency.d(263):
Link terminated

OwnerTerminated hatasında olduğu gibi bu hata da yakalanabilir ve sahip de bu durumda düzenli olarak sonlanabilir:

    bool devam_mı = true;

    while (devam_mı) {
        try {
            auto mesaj = receiveOnly!int();
            writeln("Mesaj: ", mesaj);

        } catch (LinkTerminated hata) {
            writeln("İşçi sonlanmış.");
            devam_mı = false;
        }
    }

Çıktısı:

Mesaj: 10
Mesaj: 20
İşçi sonlanmış.

Bu hata mesaj olarak da alınabilir.

Hataları mesaj olarak almak

OwnerTerminated ve LinkTerminated hataları karşı tarafta mesaj olarak da alınabilirler. Aşağıdaki kod bunu OwnerTerminated hatası üzerinde gösteriyor:

    bool devam_mı = true;

    while (devam_mı) {
        receive(
            (int mesaj) {
                writeln("Mesaj: ", mesaj);
            },

            (OwnerTerminated hata) {
                writeln("Sahip sonlanmış; çıkıyorum.");
                devam_mı = false;
            }
        );
    }
Posta kutusu yönetimi

İş parçacıklarına gönderilen mesajlar her iş parçacığına özel bir posta kutusunda dururlar. Posta kutusundaki mesajların sayısı alıcının mesajları işleyiş hızına bağlı olarak zamanla artabilir ve azalabilir. Posta kutusunun aşırı büyümesi hem sistem belleğine fazla yük getirir hem de programın tasarımındaki bir hataya işaret eder. Posta kutusunun sürekli olarak büyümesi bazı mesajların hiçbir zaman alınamayacaklarını da gösteriyor olabilir.

Posta kutusunun uzunluğu setMaxMailboxSize() işlevi ile kısıtlanır. Bu işlevin ilk parametresi hangi iş parçacığına ait posta kutusunun kısıtlanmakta olduğunu, ikinci parametresi posta kutusunun en fazla kaç mesaj alabileceğini, üçüncü parametresi de posta kutusu dolu olduğunda ne olacağını belirler. Üçüncü parametre için dört seçenek vardır:

Bunun bir örneğini görmek için önce posta kutusunun sürekli olarak büyümesini sağlayalım. Aşağıdaki programdaki işçi hiç zaman geçirmeden art arda mesaj gönderdiği halde sahip iş parçacığı her mesaj için bir saniye zaman harcamaktadır:

/* UYARI: Bu program çalışırken sisteminiz aşırı derecede
 *        yavaşlayabilir. */
import std.concurrency;
import core.thread;

void işçiİşlev() {
    while (true) {
        ownerTid.send(42);    // ← Sürekli olarak mesaj üretiyor.
    }
}

void main() {
    spawn(&işçiİşlev);

    while (true) {
        receive(
            (int mesaj) {
                // Her mesajda zaman geçiriyor.
                Thread.sleep(1.seconds);
            });
    }
}

Mesajları tüketen taraf üreten taraftan yavaş kaldığı için yukarıdaki programın kullandığı bellek sürekli olarak artacaktır. Bunun önüne geçmek için ana iş parçacığının posta kutusu daha işçi başlatılmadan önce belirli bir mesaj sayısı ile kısıtlanabilir:

void main() {
    setMaxMailboxSize(thisTid, 1000, OnCrowding.block);

    spawn(&işçiİşlev);
// ...
}

Yukarıdaki setMaxMailboxSize() çağrısı ana iş parçacığının posta kutusunun uzunluğunu 1000 ile kısıtlamaktadır. OnCrowding.block, gönderen tarafın mesaja yer açılana kadar beklemesine neden olur.

OnCrowding.throwException kullanılan aşağıdaki örnekte ise mesajı gönderen taraf posta kutusunun dolu olduğunu atılan MailboxFull hatasından anlamaktadır:

import std.concurrency;
import core.thread;

void işçiİşlev() {
    while (true) {
        try {
            ownerTid.send(42);

        } catch (MailboxFull hata) {
            /* Gönderemedim; biraz sonra tekrar denerim. */
            Thread.sleep(1.msecs);
        }
    }
}

void main() {
    setMaxMailboxSize(thisTid, 1000, OnCrowding.throwException);

    spawn(&işçiİşlev);

    while (true) {
        receive(
            (int mesaj) {
                Thread.sleep(1.seconds);
            });
    }
}
Öncelikli mesajlar

prioritySend() ile gönderilen mesajlar önceliklidir. Bu mesajlar posta kutusunda beklemekte olan mesajlardan daha önce alınırlar:

    prioritySend(ownerTid, ÖnemliMesaj(100));

Alıcı tarafta prioritySend() ile gönderilmiş olan mesajın türünü bekleyen mesajcı işlev yoksa PriorityMessageException hatası atılır:

std.concurrency.PriorityMessageException@std/concurrency.d(280):
Priority message
İş parçacığı isimleri

Şimdiye kadar kullandığımız basit örneklerde sahip ve işçinin birbirlerinin kimliklerini kolayca edindiklerini gördük. Çok sayıda iş parçacığının görev aldığı programlarda ise iş parçacıklarının Tid değerlerini birbirlerini tanısınlar diye elden ele geçirmek karmaşık olabilir. Bunun önüne geçmek için iş parçacıklarına bütün program düzeyinde isimler atanabilir.

Aşağıdaki üç işlev bütün iş parçacıkları tarafından erişilebilen bir eşleme tablosu gibi düşünülebilirler:

Aşağıdaki program birbirlerini isimleriyle bulan iki eş iş parçacığı başlatıyor. Bu iş parçacıkları sonlanmalarını bildiren Sonlan mesajını alana kadar birbirlerine mesaj gönderiyorlar:

import std.stdio;
import std.concurrency;
import core.thread;

struct Sonlan {
}

void main() {
    // Eşinin ismi "ikinci" olan bir iş parçacığı
    auto birinci = spawn(&oyuncu, "ikinci");
    register("birinci", birinci);
    scope(exit) unregister("birinci");

    // Eşinin ismi "birinci" olan bir iş parçacığı
    auto ikinci = spawn(&oyuncu, "birinci");
    register("ikinci", ikinci);
    scope(exit) unregister("ikinci");

    Thread.sleep(2.seconds);

    prioritySend(birinci, Sonlan());
    prioritySend(ikinci, Sonlan());

    // unregister() çağrıları iş parçacıkları sonlandıktan
    // sonra işletilsinler diye main() beklemelidir.
    thread_joinAll();
}

void oyuncu(string eşİsmi) {
    Tid eş;

    while (eş == Tid.init) {
        Thread.sleep(1.msecs);
        eş = locate(eşİsmi);
    }

    bool devam_mı = true;

    while (devam_mı) {
        eş.send("merhaba " ~ eşİsmi);
        receive(
            (string mesaj) {
                writeln("Mesaj: ", mesaj);
                Thread.sleep(500.msecs);
            },

            (Sonlan mesaj) {
                writefln("%s, ben çıkıyorum.", eşİsmi);
                devam_mı = false;
            });
    }
}

main'in sonunda çağrıldığını gördüğümüz thread_joinAll, sahip iş parçacığının işçilerinin hepsinin sonlanmalarını beklemesini sağlar.

Çıktısı:

Mesaj: merhaba birinci
Mesaj: merhaba ikinci
Mesaj: merhaba birinci
Mesaj: merhaba ikinci
Mesaj: merhaba birinci
Mesaj: merhaba ikinci
Mesaj: merhaba ikinci
Mesaj: merhaba birinci
birinci, ben çıkıyorum.
ikinci, ben çıkıyorum.
Özet