Mesajlaşarak Eş Zamanlı Programlama
Eş zamanlı programlama bir önceki bölümde gördüğümüz koşut işlemlere çok benzer. İkisi de işlemlerin farklı iş parçacıkları üzerinde aynı anda işletilmeleri ile ilgilidir ve aslında koşut işlemler de perde arkasında eş zamanlı programlama ile gerçekleştirilir. Bu iki kavram bu yüzden çok karıştırılır.
Koşut işlemlerle eş zamanlı programlama arasındaki farklar şunlardır:
- Koşut işlemlerin temel amacı mikro işlemci çekirdeklerinden yararlanarak programın hızını arttırmaktır. Eş zamanlı programlama ise yalnızca tek çekirdeği bulunan ortamlarda bile gerekebilir ve programın aynı anda birden fazla iş yürütmesini sağlar. Örneğin bir sunucu program her istemcinin işini farklı bir iş parçacığında yürütebilir.
- Koşut işlemler birbirlerinden bağımsızdırlar. Hatta, birbirlerine bağlı olan işlemlerin koşut olarak işletilmeleri hata olarak kabul edilir. Eş zamanlı programlamada ise çoğu zaman iş parçacıkları birbirlerine bağlıdırlar. Örneğin, devam edebilmek için başka iş parçacıklarının ürettikleri verilere gerek duyarlar.
- Her iki yöntem de işletim sisteminin iş parçacıklarını kullanırlar. Koşut işlemler iş parçacıklarını görev kavramının arkasına gizlerler; eş zamanlı programlama ise doğrudan iş parçacıklarını kullanır.
- Koşut işlemler çok kolay kullanılırlar ve görevler bağımsız oldukları sürece program doğruluğu açısından güvenlidirler. Eş zamanlı programlama ise ancak mesajlaşma yöntemi kullanıldığında güvenlidir. Veri paylaşımına dayanan geleneksel eş zamanlı programlamada programın doğru çalışacağı kanıtlanamayabilir.
D hem mesajlaşmaya dayanan hem de veri paylaşımına dayanan eş zamanlı programlamayı destekler. Veri paylaşımı ile hatasız programlar üretmek çok zor olduğundan modern programcılıkta mesajlaşma yöntemi benimsenmiştir. Bu bölümde std.concurrency
modülünün sağladığı mesajlaşma olanaklarını, bir sonraki bölümde ise veri paylaşımına dayalı eş zamanlı programlama olanaklarını göreceğiz.
Kavramlar
İş parçacığı (thread): İşletim sistemi bütün programları iş parçacığı adı verilen işlem birimleri ile işletir. Çalıştırılan her D programının main()
ile başlayan işlemleri işletim sisteminin o programı çalıştırmak için seçmiş olduğu bir iş parçacığı üzerinde başlatılır. main()
'in işlettiği bütün işlemler normalde hep aynı iş parçacığı üzerinde işletilirler. Program, gerektiğinde kendisi başka iş parçacıkları başlatabilir ve böylece aynı anda birden fazla iş yürütebilir. Örneğin bir önceki bölümde gördüğümüz her görev, std.parallelism
'in olanakları tarafından başlatılmış olan bir iş parçacığını kullanır.
İşletim sistemi iş parçacıklarını önceden kestirilemeyecek anlarda duraksatır ve tekrar başlatır. Bunun sonucunda örneğin aşağıdaki kadar basit işlemler bile bir süre yarım kalmış olabilirler:
++i;
Yukarıdaki işlem aslında üç adımdan oluşur: Değişkenin değerinin okunması, değerin arttırılması ve tekrar değişkene atanması. İşletim sisteminin bu iş parçacığını duraksattığı bir anda bu adımlar sonradan devam edilmek üzere yarım kalmış olabilirler.
Mesaj (message): İş parçacıklarının işleyişleri sırasında birbirlerine gönderdikleri bilgilere mesaj denir. Mesaj her türden ve her sayıda değişkenden oluşabilir.
İş parçacığı kimliği (Tid): Her iş parçacığının bir kimliği vardır. Kimlik, gönderilen mesajın alıcısı olan iş parçacığını belirler.
Sahip (owner): İş parçacığı başlatan her iş parçacığı, başlatılan iş parçacığının sahibi olarak anılır.
İşçi (worker): Başlatılan iş parçacığına işçi denir.
İş parçacıklarını başlatmak
Yeni bir iş parçacığı başlatmak için spawn()
kullanılır. spawn()
parametre olarak bir işlev alır ve yeni iş parçacığını o işlevden başlatır. O işlevin belki de başka işlevlere de dallanarak devam eden işlemleri artık yeni iş parçacığı üzerinde işletilir. spawn()
ile task()
arasındaki bir fark, spawn()
ile başlatılan iş parçacıklarının birbirlerine mesaj gönderebilmeleridir.
İşçinin başlatılmasından sonra sahip ve işçi birbirlerinden bağımsız iki alt program gibi işlemeye devam ederler:
import std.stdio; import std.concurrency; import core.thread; void işçi() { foreach (i; 0 .. 5) { Thread.sleep(500.msecs); writeln(i, " (işçi)"); } } void main() { spawn(&işçi); foreach (i; 0 .. 5) { Thread.sleep(300.msecs); writeln(i, " (main)"); } writeln("main tamam"); }
İşlemlerin aynı anda işletildiklerini gösterebilmek için buradaki örneklerde de Thread.sleep
'ten yararlanıyorum. Programın çıktısı main()
'den ve işçi()
'den başlamış olan iki iş parçacığının diğerinden bağımsız olarak işlediğini gösteriyor:
0 (main) 0 (işçi) 1 (main) 2 (main) 1 (işçi) 3 (main) 2 (işçi) 4 (main) main tamam 3 (işçi) 4 (işçi)
Program bütün iş parçacıklarının tamamlanmasını otomatik olarak bekler. Bunu yukarıdaki çıktıda görüyoruz: main()
'in sonundaki "main tamam" yazdırıldığı halde işçi()
işlevinin de tamamlanması beklenmiştir.
İş parçacığını başlatan işlevin aldığı parametreler spawn()
'a işlev isminden sonra verilirler. Aşağıdaki programdaki iki işçi, çıkışa dörder tane sayı yazdırıyor. Hangi değerden başlayacağını parametre olarak alıyor:
import std.stdio; import std.concurrency; import core.thread; void işçi(int başlangıçDeğeri) { foreach (i; 0 .. 4) { Thread.sleep(500.msecs); writeln(başlangıçDeğeri + i); } } void main() { foreach (i; 1 .. 3) { spawn(&işçi, i * 10); } }
İş parçacıklarından birisinin yazdırdıklarını işaretli olarak belirtiyorum. İşletim sisteminin iş parçacıklarını başlatmasına ve duraklatmasına bağlı olarak bu çıktıdaki satırlar farklı sırada da olabilirler:
10 20 11 21 12 22 13 23
İşletim sistemleri belirli bir anda işlemekte olan iş parçacıklarının sayısı konusunda kısıtlama getirir. Bu kısıtlamalar kullanıcı başına olabileceği gibi, bütün sistem başına veya herhangi başka bir kavramla ilgili olabilir. Mikro işlemciyi meşgul ederek işleyen iş parçacıklarının sayısı sistemdeki çekirdek sayısından fazla olduğunda bütün sistemin performansı düşebilir. Belirli bir anda mikro işlemciyi meşgul ederek işlemekte olan iş parçacıklarına mikro işlemciye bağlı denir. Öte yandan, bazı iş parçacıkları zamanlarının çoğunu iş yaparak değil, belirli bir olayın gerçekleşmesini bekleyerek geçirirler. Örneğin, kullanıcıdan veya ağ üzerinden bilgi gelmesinin veya bir Thread.sleep
çağrısının sonlanmasının beklenmesi sırasında mikro işlemci meşgul değildir. Böyle durumdaki iş parçacıklarına giriş/çıkış'a bağlı denir. İş parçacıklarının çoğunluğunun giriş/çıkış'a bağlı olarak işlediği bir programın sistemdeki çekirdek sayısından daha fazla iş parçacığı başlatmasında bir sakınca yoktur. Program hızıyla ilgili her tasarım kararında olması gerektiği gibi, bu konudaki kararlarınızı da ölçümler yaparak vermenizi öneririm.
İş parçacıklarının kimlikleri
thisTid()
iş parçacığının kendi kimliğini döndürür. İsmi "bu iş parçacığının kimliği" anlamına gelen "this thread's identifier"dan türemiştir. Bir işlev olmasına rağmen daha çok parantezsiz kullanılır:
import std.stdio; import std.concurrency; void kimlikBilgisi(string açıklama) { writefln("%s: %s", açıklama, thisTid); } void işçi() { kimlikBilgisi("işçi "); } void main() { spawn(&işçi); kimlikBilgisi("sahip"); }
Tid
türündeki kimliğin değerinin program açısından bir önemi olmadığından bu türün toString
işlevi bile tanımlanmamıştır. Bu yüzden programın aşağıdaki çıktısında yalnızca türün ismini görüyoruz:
sahip: Tid(std.concurrency.MessageBox) işçi : Tid(std.concurrency.MessageBox)
Çıktıları aynı olsa da sahip ve işçinin kimlikleri farklıdır.
spawn()
'ın bu noktaya kadar gözardı etmiş olduğum dönüş değeri de işçinin kimliğini sahibe bildirir:
Tid işçim = spawn(&işçi);
Her işçinin sahibinin kimliği ise ownerTid()
işlevi ile elde edilir.
Özetle, sahibin kimliği ownerTid
değişkeni ile, işçinin kimliği de spawn
'ın dönüş değeri ile elde edilmiş olur.
Mesajlaşma
Mesaj göndermek için send()
, belirli türden mesaj beklemek için de receiveOnly()
kullanılır. (Çeşitli türlerden mesaj bekleyen receive()
'i ve belirli süreye kadar bekleyen receiveTimeout()
'u daha aşağıda göstereceğim.)
Aşağıdaki programdaki sahip iş parçacığı işçisine int
türünde bir mesaj göndermekte ve ondan double
türünde bir mesaj beklemektedir. Bu iş parçacıkları sahip sıfırdan küçük bir değer gönderene kadar mesajlaşmaya devam edecekler. Önce sahip iş parçacığını gösteriyorum:
void main() { Tid işçi = spawn(&işçiİşlevi); foreach (değer; 1 .. 5) { işçi.send(değer); double sonuç = receiveOnly!double(); writefln("gönderilen: %s, alınan: %s", değer, sonuç); } /* Sonlanmasını sağlamak için işçiye sıfırdan küçük bir * değer gönderiyoruz */ işçi.send(-1); }
main()
, spawn()
'ın döndürdüğü iş parçacığının kimliğini işçi
ismiyle saklamakta ve bu kimliği send()
ile mesaj gönderirken kullanmaktadır.
İşçi ise kullanacağı int
'i bir mesaj olarak alıyor, onu bir hesapta kullanıyor ve ürettiği double
'ı yine bir mesaj olarak sahibine gönderiyor:
void işçiİşlevi() { int değer = 0; while (değer >= 0) { değer = receiveOnly!int(); double sonuç = cast(double)değer / 5; ownerTid.send(sonuç); } }
Yukarıdaki iş parçacığı mesajdaki değerin beşte birini hesaplar. Programın çıktısı şöyle:
gönderilen: 1, alınan: 0.2 gönderilen: 2, alınan: 0.4 gönderilen: 3, alınan: 0.6 gönderilen: 4, alınan: 0.8
Birden fazla değer aynı mesajın parçası olarak gönderilebilir:
ownerTid.send(thisTid, 42, 1.5);
Aynı mesajın parçası olarak gönderilen değerler alıcı tarafta bir çokuzlunun üyeleri olarak belirirler. receiveOnly()
'nin şablon parametrelerinin mesajı oluşturan türlere uymaları şarttır:
/* Tid, int, ve double türlerinden oluşan bir mesaj * bekliyoruz */ auto mesaj = receiveOnly!(Tid, int, double)(); /* Mesaj bir çokuzlu olarak alınır */ auto gönderen = mesaj[0]; // Tid türünde auto tamsayı = mesaj[1]; // int türünde auto kesirli = mesaj[2]; // double türünde
Türler uymadığında "mesaj uyumsuzluğu" anlamına gelen MessageMismatch
hatası atılır:
import std.concurrency; void işçiİşlevi() { ownerTid.send("merhaba"); // ← string gönderiyor } void main() { spawn(&işçiİşlevi); auto mesaj = receiveOnly!double(); // ← double bekliyor }
Çıktısı:
std.concurrency.MessageMismatch@std/concurrency.d(202):
Unexpected message type: expected 'double', got 'immutable(char)[]'
Örnek
Şimdiye kadar gördüğümüz kavramları kullanan basit bir benzetim programı tasarlayalım.
Bu örnek iki boyutlu düzlemdeki robotların birbirlerinden bağımsız ve rasgele hareketlerini belirliyor. Her robotu farklı bir iş parçacığı yönetiyor. Her iş parçacığı başlatılırken üç bilgi alıyor:
- Robotun numarası: Gönderilen mesajın hangi robotla ilgili olduğu
- Başlangıç noktası: Robotun hareketinin başlangıç noktası
- Robotun dinlenme süresi: Robotun ne kadar zamanda bir yer değiştireceği
Yukarıdaki üç bilgiyi bir arada tutan bir İş
yapısı şöyle tanımlanabilir:
struct İş {
size_t robotNumarası;
Yer başlangıç;
Duration dinlenmeSüresi;
}
Bu iş parçacığının yaptığı tek iş, robotun numarasını ve hareketini koşulsuz bir döngüde sahibine göndermek:
void gezdirici(İş iş) { Yer nereden = iş.başlangıç; while (true) { Thread.sleep(iş.dinlenmeSüresi); Yer nereye = rasgeleKomşu(nereden); Hareket hareket = Hareket(nereden, nereye); nereden = nereye; ownerTid.send(HareketMesajı(iş.robotNumarası, hareket)); } }
Sahip de sonsuz bir döngü içinde bu mesajları bekliyor. Aldığı mesajların hangi robotla ilgili olduğunu her mesajın parçası olan robot numarasından anlıyor:
while (true) { auto mesaj = receiveOnly!HareketMesajı(); writefln("%s %s", robotlar[mesaj.robotNumarası], mesaj.hareket); }
Bu örnekteki bütün mesajlar işçilerden sahibe gönderiliyor. Daha karmaşık programlarda her iki yönde ve çok çeşitli türlerden mesajlar da gönderilebilir. Programın tamamı şöyle:
import std.stdio; import std.random; import std.string; import std.concurrency; import core.thread; struct Yer { int satır; int sütun; string toString() { return format("%s,%s", satır, sütun); } } struct Hareket { Yer nereden; Yer nereye; string toString() { return ((nereden == nereye) ? format("%s (durgun)", nereden) : format("%s -> %s", nereden, nereye)); } } class Robot { string görünüm; Duration dinlenmeSüresi; this(string görünüm, Duration dinlenmeSüresi) { this.görünüm = görünüm; this.dinlenmeSüresi = dinlenmeSüresi; } override string toString() { return format("%s(%s)", görünüm, dinlenmeSüresi); } } /* 0,0 noktası etrafında rasgele bir yer döndürür */ Yer rasgeleYer() { return Yer(uniform!"[]"(-10, 10), uniform!"[]"(-10, 10)); } /* Verilen değerin en fazla bir adım ötesinde bir değer * döndürür */ int rasgeleAdım(int şimdiki) { return şimdiki + uniform!"[]"(-1, 1); } /* Verilen Yer'in komşusu olan bir Yer döndürür; çapraz * komşusu olabileceği gibi tesadüfen aynı yer de olabilir. */ Yer rasgeleKomşu(Yer yer) { return Yer(rasgeleAdım(yer.satır), rasgeleAdım(yer.sütun)); } struct İş { size_t robotNumarası; Yer başlangıç; Duration dinlenmeSüresi; } struct HareketMesajı { size_t robotNumarası; Hareket hareket; } void gezdirici(İş iş) { Yer nereden = iş.başlangıç; while (true) { Thread.sleep(iş.dinlenmeSüresi); Yer nereye = rasgeleKomşu(nereden); Hareket hareket = Hareket(nereden, nereye); nereden = nereye; ownerTid.send(HareketMesajı(iş.robotNumarası, hareket)); } } void main() { /* Farklı hızlardaki robotlar */ Robot[] robotlar = [ new Robot("A", 600.msecs), new Robot("B", 2000.msecs), new Robot("C", 5000.msecs) ]; /* Her birisi için bir iş parçacığı başlatılıyor */ foreach (robotNumarası, robot; robotlar) { spawn(&gezdirici, İş(robotNumarası, rasgeleYer(), robot.dinlenmeSüresi)); } /* Artık hareket bilgilerini işçilerden toplamaya * başlayabiliriz */ while (true) { auto mesaj = receiveOnly!HareketMesajı(); /* Bu robotla ilgili yeni bilgiyi çıkışa * yazdırıyoruz */ writefln("%s %s", robotlar[mesaj.robotNumarası], mesaj.hareket); } }
Program sonlandırılana kadar robotların konumlarını çıkışa yazdırır:
A(600 ms) -3,3 -> -4,4 A(600 ms) -4,4 -> -4,3 A(600 ms) -4,3 -> -3,2 B(2 secs) -6,9 (durgun) A(600 ms) -3,2 -> -2,2 A(600 ms) -2,2 -> -3,1 A(600 ms) -3,1 -> -2,0 B(2 secs) -6,9 -> -5,9 A(600 ms) -2,0 (durgun) A(600 ms) -2,0 -> -3,-1 C(5 secs) -6,6 -> -6,7 A(600 ms) -3,-1 -> -4,-1 ...
Mesajlaşmaya dayanan eş zamanlı programlamanın yararını bu örnekte görebiliyoruz. Her robotun hareketi aynı anda ve diğerlerinden bağımsız olarak hesaplanıyor. Bu basit örnekteki sahip yalnızca robotların hareketlerini çıkışa yazdırıyor; bütün robotları ilgilendiren başka işlemler de uygulanabilir.
Farklı çeşitlerden mesaj beklemek
receiveOnly()
yalnızca belirtilen türden mesaj bekleyebilir. receive()
ise farklı çeşitlerden mesajlar beklemek için kullanılır. Parametre olarak belirsiz sayıda mesajcı işlev alır. Gelen mesaj bu mesajcı işlevlere sırayla uydurulmaya çalışılır ve mesaj, mesajın türünün uyduğu ilk işleve gönderilir.
Örneğin aşağıdaki receive()
çağrısı ilki int
, ikincisi de string
bekleyen iki mesajcı işlev kullanmaktadır:
void işçiİşlevi() { bool tamam_mı = false; while (!tamam_mı) { void intİşleyen(int mesaj) { writeln("int mesaj: ", mesaj); if (mesaj == -1) { writeln("çıkıyorum"); tamam_mı = true; } } void stringİşleyen(string mesaj) { writeln("string mesaj: ", mesaj); } receive(&intİşleyen, &stringİşleyen); } }
Gönderilen int
mesajlar intİşleyen()
'e, string
mesajlar da stringİşleyen()
'e uyarlar. O iş parçacığını şöyle bir kodla deneyebiliriz:
import std.stdio; import std.concurrency; // ... void main() { auto işçi = spawn(&işçiİşlevi); işçi.send(10); işçi.send(42); işçi.send("merhaba"); işçi.send(-1); // ← işçinin sonlanması için }
Mesajlar alıcı taraftaki uygun mesajcı işlevlere gönderilirler:
int mesaj: 10 int mesaj: 42 string mesaj: merhaba int mesaj: -1 çıkıyorum
receive()
, yukarıdaki normal işlevler yerine isimsiz işlevler veya opCall()
üye işlevi tanımlanmış olan türlerin nesnelerini de kullanabilir. Bunun bir örneğini görmek için programı isimsiz işlevler kullanacak şekilde değiştirelim. Ek olarak, işçinin sonlanmasını da -1 gibi özel bir değer yerine ismi açıkça Sonlan
olan özel bir türle bildirelim.
Aşağıda receive()
'e parametre olarak üç isimsiz işlev gönderildiğine dikkat edin. Bu işlevlerin açma ve kapama parantezlerini işaretlenmiş olarak belirtiyorum:
import std.stdio; import std.concurrency; struct Sonlan { } void işçiİşlevi() { bool devam_mı = true; while (devam_mı) { receive( (int mesaj) { writeln("int mesaj: ", mesaj); }, (string mesaj) { writeln("string mesaj: ", mesaj); }, (Sonlan mesaj) { writeln("çıkıyorum"); devam_mı = false; }); } } void main() { auto işçi = spawn(&işçiİşlevi); işçi.send(10); işçi.send(42); işçi.send("merhaba"); işçi.send(Sonlan()); }
Beklenmeyen mesaj almak
std.variant
modülünde tanımlanmış olan Variant
her türden veriyi sarmalayabilen bir türdür. receive()
'e verilen diğer mesajcı işlevlere uymayan mesajlar Variant
türünü bekleyen bir mesajcı tarafından yakalanabilirler:
import std.stdio; import std.concurrency; void işçiİşlev() { receive( (int mesaj) { /* ... */ }, (double mesaj) { /* ... */ }, (Variant mesaj) { writeln("Beklemediğim bir mesaj aldım: ", mesaj); }); } struct ÖzelMesaj { // ... } void main() { auto işçi = spawn(&işçiİşlev); işçi.send(ÖzelMesaj()); }
Çıktısı:
Beklemediğim bir mesaj aldım: ÖzelMesaj()
Bu bölümün konusu dışında kaldığı için Variant
'ın ayrıntılarına girmeyeceğim.
Mesajları belirli süreye kadar beklemek
Mesajların belirli bir süreden daha fazla beklenmesi istenmeyebilir. Gönderen iş parçacığı geçici olarak meşgul olmuş olabilir veya bir hata ile sonlanmış olabilir. Mesaj bekleyen iş parçacığının belki de hiç gelmeyecek olan bir mesajı sonsuza kadar beklemesini önlemek için receiveTimeout()
çağrılır.
receiveTimeout()
'un ilk parametresi mesajın en fazla ne kadar bekleneceğini bildirir. Dönüş değeri de mesajın o süre içinde gelip gelmediğini belirtir: Mesaj alındığında true
, alınmadığında ise false
değeridir.
import std.stdio; import std.concurrency; import core.thread; void işçi() { Thread.sleep(3.seconds); ownerTid.send("merhaba"); } void main() { spawn(&işçi); writeln("mesaj bekliyorum"); bool alındı = false; while (!alındı) { alındı = receiveTimeout(600.msecs, (string mesaj) { writeln("geldi: ", mesaj); }); if (!alındı) { writeln("... henüz yok"); /* ... burada başka işlere devam edilebilir ... */ } } }
Yukarıdaki sahip, gereken mesajı en fazla 600 milisaniye bekliyor. Mesaj o süre içinde gelmezse başka işlerine devam edebilir:
mesaj bekliyorum ... henüz yok ... henüz yok ... henüz yok ... henüz yok geldi: merhaba
Mesajın belirli süreden uzun sürmesi sonucunda çeşitli durumlarda başka kararlar da verilebilir. Örneğin, mesaj geç geldiğinde artık bir anlamı yoktur.
İşçide atılan hatalar
Hatırlayacağınız gibi, std.parallelism
modülünün çoğu olanağı görevler sırasında atılan hataları yakalar ve görevle ilgili bir sonraki işlem sırasında tekrar atmak üzere saklar. Böylece örneğin bir görevin işlemesi sırasında atılmış olan hata daha sonra yieldForce()
çağrıldığında görevi başlatan tarafta yakalanabilir:
try { görev.yieldForce(); } catch (Exception hata) { writefln("görev sırasında bir hata olmuş: '%s'", hata.msg); }
std.concurrency
genel hata türleri konusunda kolaylık sağlamaz. Atılan olası bir hatanın sahip iş parçacığına iletilebilmesi için açıkça yakalanması ve bir mesaj halinde gönderilmesi gerekir. Bir kolaylık olarak, biraz aşağıda göreceğimiz gibi, OwnerTerminated
ve LinkTerminated
hataları mesaj olarak da alınabilirler.
Aşağıdaki hesapçı()
işlevi string
türünde mesajlar alıyor; onları double
türüne çeviriyor, 0.5 değerini ekliyor ve sonucu bir mesaj olarak gönderiyor:
void hesapçı() { while (true) { auto mesaj = receiveOnly!string(); ownerTid.send(to!double(mesaj) + 0.5); } }
Yukarıdaki to!double()
, "merhaba" gibi double
'a dönüştürülemeyen bir dizgi ile çağrıldığında hata atar. Atılan o hata hesapçı()
'dan hemen çıkılmasına neden olacağından aşağıdaki üç mesajdan yalnızca birincisinin yanıtı alınabilir:
import std.stdio; import std.concurrency; import std.conv; // ... void main() { Tid hesapçı = spawn(&hesapçı); hesapçı.send("1.2"); hesapçı.send("merhaba"); // ← hatalı veri hesapçı.send("3.4"); foreach (i; 0 .. 3) { auto mesaj = receiveOnly!double(); writefln("sonuç %s: %s", i, mesaj); } }
Bu yüzden, sahip "1.2"nin sonucunu 1.7 olarak alır ama işçi sonlanmış olduğundan bir sonraki mesajı alamaz:
sonuç 0: 1.7
← hiç gelmeyecek olan mesajı bekleyerek takılır
Hesapçı iş parçacığının bu konuda yapabileceği bir şey, kendi işlemleri sırasında atılabilecek olan hatayı try-catch
ile yakalamak ve özel bir mesaj olarak iletmektir. Programı hatanın nedenini bir HesapHatası
nesnesi olarak gönderecek şekilde aşağıda değiştiriyoruz. Ek olarak, iş parçacığının sonlanması da özel Sonlan
türü ile sağlanıyor:
import std.stdio; import std.concurrency; import std.conv; struct HesapHatası { string neden; } struct Sonlan { } void hesapçı() { bool devam_mı = true; while (devam_mı) { receive( (string mesaj) { try { ownerTid.send(to!double(mesaj) + 0.5); } catch (Exception hata) { ownerTid.send(HesapHatası(hata.msg)); } }, (Sonlan mesaj) { devam_mı = false; }); } } void main() { Tid hesapçı = spawn(&hesapçı); hesapçı.send("1.2"); hesapçı.send("merhaba"); // ← hatalı veri hesapçı.send("3.4"); hesapçı.send(Sonlan()); foreach (i; 0 .. 3) { writef("sonuç %s: ", i); receive( (double mesaj) { writeln(mesaj); }, (HesapHatası hata) { writefln("HATA! '%s'", hata.neden); }); } }
Hatanın nedeninin "rakam bulunamadı" anlamına gelen "no digits seen" olduğunu bu sefer görebiliyoruz:
sonuç 0: 1.7 sonuç 1: HATA! 'no digits seen' sonuç 2: 3.9
Bu konuda diğer bir yöntem, işçinin yakaladığı hatanın olduğu gibi sahibe gönderilmesidir. Aynı hata sahip tarafından kullanılabileceği gibi tekrar atılabilir de:
// ... işçi tarafta ... try { // ... } catch (shared(Exception) hata) { ownerTid.send(hata); }}, // ... sahip tarafta ... receive( // ... (shared(Exception) hata) { throw hata; });
Yukarıdaki shared
belirteçlerine neden gerek olduğunu bir sonraki bölümde göreceğiz.
İş parçacıklarının sonlandıklarını algılamak
İş parçacıkları alıcı tarafın herhangi bir nedenle sonlanmış olduğunu algılayabilirler.
OwnerTerminated
hatası
"Sahip sonlandı" anlamına gelen bu hata işçinin bu durumdan haberinin olmasını sağlar. Aşağıdaki programdaki aracı iş parçası iki mesaj gönderdikten sonra sonlanıyor. Bunun sonucunda işçi tarafta bir OwnerTerminated
hatası atılıyor:
import std.stdio; import std.concurrency; void main() { spawn(&aracıİşlev); } void aracıİşlev() { auto işçi = spawn(&işçiİşlev); işçi.send(1); işçi.send(2); } // ← İki mesajdan sonra sonlanıyor. void işçiİşlev() { while (true) { auto mesaj = receiveOnly!int(); // ← Sahip sonlanmışsa // hata atılır. writeln("Mesaj: ", mesaj); } }
Çıktısı:
Mesaj: 1
Mesaj: 2
std.concurrency.OwnerTerminated@std/concurrency.d(248):
Owner terminated
İstendiğinde o hata işçi tarafından yakalanabilir ve böylece işçinin de hatasız olarak sonlanması sağlanabilir:
void işçiİşlev() { bool devam_mı = true; while (devam_mı) { try { auto mesaj = receiveOnly!int(); writeln("Mesaj: ", mesaj); } catch (OwnerTerminated hata) { writeln("Sahibim sonlanmış."); devam_mı = false; } } }
Çıktısı:
Mesaj: 1 Mesaj: 2 Sahibim sonlanmış.
Bu hatanın mesaj olarak da alınabileceğini biraz aşağıda göreceğiz.
LinkTerminated
hatası
spawnLinked()
ile başlatılmış olan bir iş parçacığı sonlandığında sahibin tarafında LinkTerminated
hatası atılır. spawnLinked()
, spawn()
ile aynı biçimde kullanılır:
import std.stdio; import std.concurrency; void main() { auto işçi = spawnLinked(&işçiİşlev); while (true) { auto mesaj = receiveOnly!int(); // ← İşçi sonlanmışsa // hata atılır. writeln("Mesaj: ", mesaj); } } void işçiİşlev() { ownerTid.send(10); ownerTid.send(20); } // ← İki mesajdan sonra sonlanıyor.
İşçi yalnızca iki mesaj gönderdikten sonra sonlanıyor. İşçisini spawnLinked()
ile başlatmış olduğu için sahip bu durumu bir LinkTerminated
hatası ile öğrenir:
Mesaj: 10
Mesaj: 20
std.concurrency.LinkTerminated@std/concurrency.d(263):
Link terminated
OwnerTerminated
hatasında olduğu gibi bu hata da yakalanabilir ve sahip de bu durumda düzenli olarak sonlanabilir:
bool devam_mı = true; while (devam_mı) { try { auto mesaj = receiveOnly!int(); writeln("Mesaj: ", mesaj); } catch (LinkTerminated hata) { writeln("İşçi sonlanmış."); devam_mı = false; } }
Çıktısı:
Mesaj: 10 Mesaj: 20 İşçi sonlanmış.
Bu hata mesaj olarak da alınabilir.
Hataları mesaj olarak almak
OwnerTerminated
ve LinkTerminated
hataları karşı tarafta mesaj olarak da alınabilirler. Aşağıdaki kod bunu OwnerTerminated
hatası üzerinde gösteriyor:
bool devam_mı = true; while (devam_mı) { receive( (int mesaj) { writeln("Mesaj: ", mesaj); }, (OwnerTerminated hata) { writeln("Sahip sonlanmış; çıkıyorum."); devam_mı = false; } ); }
Posta kutusu yönetimi
İş parçacıklarına gönderilen mesajlar her iş parçacığına özel bir posta kutusunda dururlar. Posta kutusundaki mesajların sayısı alıcının mesajları işleyiş hızına bağlı olarak zamanla artabilir ve azalabilir. Posta kutusunun aşırı büyümesi hem sistem belleğine fazla yük getirir hem de programın tasarımındaki bir hataya işaret eder. Posta kutusunun sürekli olarak büyümesi bazı mesajların hiçbir zaman alınamayacaklarını da gösteriyor olabilir.
Posta kutusunun uzunluğu setMaxMailboxSize()
işlevi ile kısıtlanır. Bu işlevin ilk parametresi hangi iş parçacığına ait posta kutusunun kısıtlanmakta olduğunu, ikinci parametresi posta kutusunun en fazla kaç mesaj alabileceğini, üçüncü parametresi de posta kutusu dolu olduğunda ne olacağını belirler. Üçüncü parametre için dört seçenek vardır:
-
OnCrowding.block
: Gönderen taraf posta kutusunda yer açılana kadar bekler. OnCrowding.ignore
: Mesaj gözardı edilir.-
OnCrowding.throwException
: Mesaj gönderilirkenMailboxFull
hatası atılır. bool function(Tid)
türünde işlev göstergesi: Belirtilen işlev çağrılır.
Bunun bir örneğini görmek için önce posta kutusunun sürekli olarak büyümesini sağlayalım. Aşağıdaki programdaki işçi hiç zaman geçirmeden art arda mesaj gönderdiği halde sahip iş parçacığı her mesaj için bir saniye zaman harcamaktadır:
/* UYARI: Bu program çalışırken sisteminiz aşırı derecede * yavaşlayabilir. */ import std.concurrency; import core.thread; void işçiİşlev() { while (true) { ownerTid.send(42); // ← Sürekli olarak mesaj üretiyor. } } void main() { spawn(&işçiİşlev); while (true) { receive( (int mesaj) { // Her mesajda zaman geçiriyor. Thread.sleep(1.seconds); }); } }
Mesajları tüketen taraf üreten taraftan yavaş kaldığı için yukarıdaki programın kullandığı bellek sürekli olarak artacaktır. Bunun önüne geçmek için ana iş parçacığının posta kutusu daha işçi başlatılmadan önce belirli bir mesaj sayısı ile kısıtlanabilir:
void main() { setMaxMailboxSize(thisTid, 1000, OnCrowding.block); spawn(&işçiİşlev); // ... }
Yukarıdaki setMaxMailboxSize()
çağrısı ana iş parçacığının posta kutusunun uzunluğunu 1000 ile kısıtlamaktadır. OnCrowding.block
, gönderen tarafın mesaja yer açılana kadar beklemesine neden olur.
OnCrowding.throwException
kullanılan aşağıdaki örnekte ise mesajı gönderen taraf posta kutusunun dolu olduğunu atılan MailboxFull
hatasından anlamaktadır:
import std.concurrency; import core.thread; void işçiİşlev() { while (true) { try { ownerTid.send(42); } catch (MailboxFull hata) { /* Gönderemedim; biraz sonra tekrar denerim. */ Thread.sleep(1.msecs); } } } void main() { setMaxMailboxSize(thisTid, 1000, OnCrowding.throwException); spawn(&işçiİşlev); while (true) { receive( (int mesaj) { Thread.sleep(1.seconds); }); } }
Öncelikli mesajlar
prioritySend()
ile gönderilen mesajlar önceliklidir. Bu mesajlar posta kutusunda beklemekte olan mesajlardan daha önce alınırlar:
prioritySend(ownerTid, ÖnemliMesaj(100));
Alıcı tarafta prioritySend()
ile gönderilmiş olan mesajın türünü bekleyen mesajcı işlev yoksa PriorityMessageException
hatası atılır:
std.concurrency.PriorityMessageException@std/concurrency.d(280):
Priority message
İş parçacığı isimleri
Şimdiye kadar kullandığımız basit örneklerde sahip ve işçinin birbirlerinin kimliklerini kolayca edindiklerini gördük. Çok sayıda iş parçacığının görev aldığı programlarda ise iş parçacıklarının Tid
değerlerini birbirlerini tanısınlar diye elden ele geçirmek karmaşık olabilir. Bunun önüne geçmek için iş parçacıklarına bütün program düzeyinde isimler atanabilir.
Aşağıdaki üç işlev bütün iş parçacıkları tarafından erişilebilen bir eşleme tablosu gibi düşünülebilirler:
-
register()
: İş parçacığını bir isimle eşleştirir. -
locate()
: Belirtilen isme karşılık gelen iş parçacığını döndürür. O isme karşılık gelen iş parçacığı yoksaTid.init
değerini döndürür. -
unregister()
: İş parçacığı ile ismin ilişkisini kaldırır.
Aşağıdaki program birbirlerini isimleriyle bulan iki eş iş parçacığı başlatıyor. Bu iş parçacıkları sonlanmalarını bildiren Sonlan
mesajını alana kadar birbirlerine mesaj gönderiyorlar:
import std.stdio; import std.concurrency; import core.thread; struct Sonlan { } void main() { // Eşinin ismi "ikinci" olan bir iş parçacığı auto birinci = spawn(&oyuncu, "ikinci"); register("birinci", birinci); scope(exit) unregister("birinci"); // Eşinin ismi "birinci" olan bir iş parçacığı auto ikinci = spawn(&oyuncu, "birinci"); register("ikinci", ikinci); scope(exit) unregister("ikinci"); Thread.sleep(2.seconds); prioritySend(birinci, Sonlan()); prioritySend(ikinci, Sonlan()); // unregister() çağrıları iş parçacıkları sonlandıktan // sonra işletilsinler diye main() beklemelidir. thread_joinAll(); } void oyuncu(string eşİsmi) { Tid eş; while (eş == Tid.init) { Thread.sleep(1.msecs); eş = locate(eşİsmi); } bool devam_mı = true; while (devam_mı) { eş.send("merhaba " ~ eşİsmi); receive( (string mesaj) { writeln("Mesaj: ", mesaj); Thread.sleep(500.msecs); }, (Sonlan mesaj) { writefln("%s, ben çıkıyorum.", eşİsmi); devam_mı = false; }); } }
main
'in sonunda çağrıldığını gördüğümüz thread_joinAll
, sahip iş parçacığının işçilerinin hepsinin sonlanmalarını beklemesini sağlar.
Çıktısı:
Mesaj: merhaba birinci Mesaj: merhaba ikinci Mesaj: merhaba birinci Mesaj: merhaba ikinci Mesaj: merhaba birinci Mesaj: merhaba ikinci Mesaj: merhaba ikinci Mesaj: merhaba birinci birinci, ben çıkıyorum. ikinci, ben çıkıyorum.
Özet
- İş parçacıklarının birbirlerine bağlı olmadıkları durumlarda bir önceki bölümün konusu olan
std.parallelism
modülünün sunduğu koşut programlamayı yeğleyin. Ancak iş parçacıkları birbirlerine bağlı olduklarındastd.concurrency
'nin sunduğu eş zamanlı programlamayı düşünün. - Veri paylaşımı çeşitli program hatalarına açık olduğundan eş zamanlı programlama gerçekten gerektiğinde bu bölümün konusu olan mesajlaşmayı yeğleyin.
spawn()
vespawnLinked()
iş parçacığı başlatır.thisTid
bu iş parçacığının kimliğidir.ownerTid
bu iş parçacığının sahibinin kimliğidir.send()
veprioritySend()
mesaj gönderir.receiveOnly()
,receive()
vereceiveTimeout()
mesaj bekler.Variant
her mesaja uyar.setMaxMailboxSize()
posta kutusunun büyüklüğünü kısıtlar.register()
,unregister()
velocate()
iş parçacıklarını isimle kullanma olanağı sağlar.- Mesajlaşma sırasında hata atılabilir:
MessageMismatch
,OwnerTerminated
,LinkTerminated
,MailboxFull
vePriorityMessageException
. - Sahip, işçiden atılan hataları otomatik olarak yakalayamaz.