D Programlama Dili – Programlama dersleri ve D referansı
Ali Çehreli

eniyileştirme: [optimization], kodun daha hızlı çalışacak biçimde davranışı bozulmadan değiştirilmesi
eş zamanlı programlama: [concurrency], iş parçacıklarının birbirlerine bağımlı olarak işlemeleri
fonksiyonel programlama: [functional programming], yan etki üretmeme ilkesine dayalı programlama yöntemi
görev: [task], programın geri kalanıyla koşut işletilebilen işlem birimi
hevesli: [eager], işlemlerin, ürettikleri sonuçların kullanılacaklarından emin olunmadan gerçekleştirilmeleri
iş parçacığı: [thread], işletim sisteminin program işletme birimi
kapsam: [scope], küme parantezleriyle belirlenen bir alan
koşut işlemler: [parallelization], bağımsız işlemlerin aynı anda işletilmeleri
mikro işlemci: [CPU], bilgisayarın beyni
mikro işlemci çekirdeği: [CPU core], başlı başına mikro işlemci olarak kullanılabilen işlemci birimi
tembel değerlendirme: [lazy evaluation], işlemlerin gerçekten gerekene kadar geciktirilmesi
uç birim: [terminal], bilgisayar sistemlerinin kullanıcıyla etkileşen giriş/çıkış birimi; "konsol", "komut satırı", "cmd penceresi", "DOS ekranı", vs.
... bütün sözlük



İngilizce Kaynaklar


Diğer




Koşut İşlemler

Günümüzdeki mikro işlemciler her birisi bağımsız işlem birimi olarak kullanılabilen birden fazla çekirdekten oluşurlar. Çekirdekler farklı programların farklı bölümlerini aynı anda işletebilirler. std.parallelism modülü bu çekirdeklerin aynı anda işletilmelerini ve programın bu sayede daha hızlı çalışmasını sağlayan olanaklar içerir.

Bu bölümde aşağıdaki olanakların ayrıntılarını göreceğiz. Bu olanakları yalnızca işlemler birbirlerinden bağımsız olduklarında kullanabilirsiniz:

Daha önce kullandığımız bütün örneklerdeki bütün kodlarda işlemlerin yazıldıkları sırada işletildiklerini varsaydık:

    ++i;
    ++j;

Yukarıdaki kodda önce i'nin değerinin, ondan sonra da j'nin değerinin arttırılacağını biliyoruz. Aslında bu her zaman doğru değildir: Derleyicinin kodun daha hızlı işlemesi için uyguladığı eniyileştirmeler sonucunda her iki değişken de mikro işlemcinin yazmaçlarında depolanmış olabilirler. Bu yazmaçlar da birbirlerinden bağımsız olduklarından, mikro işlemci o iki işlemi aynı anda işletebilir.

Bu tür eniyileştirmeler yararlıdırlar ama çok alt düzeydeki işlemlerden daha üst düzey kapsamlarda uygulanamazlar. Bir grup üst düzey işlemin birbirlerinden bağımsız olduklarına ve bu yüzden de aynı anda işletilebileceklerine çoğu durumda yalnızca programcı karar verebilir.

Aşağıdaki foreach döngüsündeki elemanların başından sonuna kadar ve teker teker işletileceklerini biliyoruz:

    auto öğrenciler =
        [ Öğrenci(1), Öğrenci(2), Öğrenci(3), Öğrenci(4) ];

    foreach (öğrenci; öğrenciler) {
        öğrenci.uzunBirİşlem();
    }

Yukarıdaki kod, işletim sisteminin o programı çalıştırmak için seçmiş olduğu tek çekirdek üzerinde işletilir. foreach döngüsü de öğrencileri başından sonuna kadar işlettiği için uzunBirİşlem() öğrencilere sırayla ve teker teker uygulanır. Oysa çoğu durumda bir öğrencinin işletilebilmesi için önceki öğrencilerin işlemlerinin tamamlanmış olmaları gerekmez. Öğrenci işlemlerinin birbirlerinden bağımsız oldukları durumlarda diğer çekirdeklerden yararlanılmıyor olması zaman kaybına yol açacaktır.

Aşağıdaki örneklerdeki işlemlerin hissedilir derecede uzun süren işlemlere benzemeleri için core.thread modülündeki Thread.sleep'ten yararlanacağım. Thread.sleep işlemleri belirtilen süre kadar durdurur. Ne kadar bekleneceğini bildirmenin bir yolu, "süre" anlamına gelen "duration"ın kısaltması olan dur'u kullanmaktır. dur'un şablon parametresi zaman birimini belirler: milisaniye için "msecs", saniye için "seconds". Thread.sleep işlemciyi hiç meşgul etmeden zaman geçirdiği için buradaki örneklerde fazla yapay kalıyor; buna rağmen, koşut işlemlerin amaçlarını göstermede yeterince etkilidir.

import std.stdio;
import core.thread;

struct Öğrenci {
    int numara;

    void uzunBirİşlem() {
        writeln(numara,
                " numaralı öğrencinin işlemi başladı");

        /* Gerçekte yavaş olduklarını varsaydığımız işlemlerin
         * yavaşlıklarına benzesin diye 1 saniye bekliyoruz */
        Thread.sleep(1.seconds);

        writeln(numara, " numaralı öğrencinin işlemi bitti");
    }
}

void main() {
    auto öğrenciler =
        [ Öğrenci(1), Öğrenci(2), Öğrenci(3), Öğrenci(4) ];

    foreach (öğrenci; öğrenciler) {
        öğrenci.uzunBirİşlem();
    }
}

Yukarıdaki programın çalışma süresi uç birimde time komutu ile ölçülebilir:

$ time ./deneme
1 numaralı öğrencinin işlemi başladı
1 numaralı öğrencinin işlemi bitti
2 numaralı öğrencinin işlemi başladı
2 numaralı öğrencinin işlemi bitti
3 numaralı öğrencinin işlemi başladı
3 numaralı öğrencinin işlemi bitti
4 numaralı öğrencinin işlemi başladı
4 numaralı öğrencinin işlemi bitti

real    0m4.003s    ← toplam 4 saniye
user    0m0.000s
sys     0m0.000s

Öğrenci işlemleri sırayla işletildiklerinden ve her işlem 1 saniye tuttuğundan toplam süre beklendiği gibi yaklaşık olarak 4 saniye olmaktadır. Oysa 4 öğrencinin işlemleri örneğin 4 çekirdeğin bulunduğu bir ortamda aynı anda ve tek seferde işletilebilseler bütün işlem 1 saniye tutabilir.

Bunun nasıl gerçekleştirildiğine geçmeden önce, programın çalıştırıldığı ortamda kaç çekirdek bulunduğunun std.parallelism.totalCPUs'un değeri ile belirlenebildiğini göstermek istiyorum:

import std.stdio;
import std.parallelism;

void main() {
    writefln("Bu ortamda toplam %s çekirdek var.", totalCPUs);
}

Bu bölümü yazdığım ortamda şu çıktıyı alıyorum:

Bu ortamda toplam 4 çekirdek var.
taskPool.parallel()

Bu işlev kısaca parallel() diye de çağrılabilir.

parallel(), bir aralığın elemanlarına bütün çekirdeklerden yararlanarak koşut olarak erişmeye yarar. Yukarıdaki programa std.parallelism modülünü eklemek ve öğrenciler yerine parallel(öğrenciler) yazmak bütün çekirdeklerden yararlanmak için yeterlidir:

import std.parallelism;

// ...

    foreach (öğrenci; parallel(öğrenciler)) {

Yapı ve Sınıflarda foreach bölümünde gördüğümüz gibi, foreach döngüsünün kapsamı nesnelerin opApply işlevlerine bir delegate olarak gönderilir. parallel()'in döndürdüğü geçici nesne bu delegate'i her eleman için farklı bir çekirdek üzerinde işleten bir aralık nesnesidir.

Asıl topluluğu parallel() işlevine göndererek kullanmak, programın 4 çekirdek bulunan bu ortamda 1 saniyede tamamlanması için yeterli olur:

$ time ./deneme
2 numaralı öğrencinin işlemi başladı
1 numaralı öğrencinin işlemi başladı
3 numaralı öğrencinin işlemi başladı
4 numaralı öğrencinin işlemi başladı
2 numaralı öğrencinin işlemi bitti
3 numaralı öğrencinin işlemi bitti
1 numaralı öğrencinin işlemi bitti
4 numaralı öğrencinin işlemi bitti

real    0m1.004s    ← şimdi 1 saniye
user    0m0.000s
sys     0m0.000s

Not: Programın çalışma süresi sizin ortamınızda farklı olabilir; kabaca "4 saniye bölü çekirdek sayısı" hesabının sonucu kadar sürede tamamlanacağını bekleyebiliriz.

Programların işletilmeleri sırasında mikro işlemcinin kodların üzerinden belirli geçişlerine iş parçacığı denir. Programlar aynı anda etkin olarak işletilen birden fazla iş parçacığından oluşuyor olabilirler. İşletim sistemi her iş parçacığını bir çekirdek üzerinde başlatır, işletir, ve diğer iş parçacıkları da işletilebilsinler diye duraklatır. Her iş parçacığının işletilmesi bir çok kere başlatılması ve duraklatılması ile devam eder.

Mikro işlemcinin bütün çekirdekleri işletim sistemindeki bütün iş parçacıkları tarafından paylaşılır. Bu iş parçacıklarının hangi sırayla başlatıldıklarına ve hangi koşullarda duraksatıldıklarına işletim sistemi karar verir. Bu yüzden uzunBirİşlem() içinde yazdırdığımız mesajların sıralarının karışık olarak çıktıklarını görüyoruz. Döngü içindeki işlemler her öğrenci için bağımsız oldukları sürece hangisinin daha önce sonlandığının programın işleyişi açısından bir önemi yoktur.

parallel() yardımıyla aynı anda işletilen işlemlerin gerçekten birbirlerinden bağımsız oldukları programcının sorumluluğundadır. Örneğin, yukarıdaki mesajların çıkışta belirli bir sırada görünmeleri gerekseydi bunu sağlamak elimizde olmadığından parallel()'in kullanılması bir hata olarak kabul edilirdi. İş parçacıklarının birbirlerine bağımlı oldukları durumlarda eş zamanlı programlamadan yararlanılır. Onu bir sonraki bölümde göreceğiz.

foreach tamamlandığında bütün işlemler de tamamlanmıştır. Program işleyişine bütün öğrenci işlemlerinin tamamlanmış oldukları garantisiyle devam edebilir.

İş birimi büyüklüğü

parallel()'in ikinci parametresinin anlamı duruma göre farklılık gösterir ve bazen bütünüyle gözardı edilir:

    /* ... */ = parallel(aralık, iş_birimi_büyüklüğü = 100);
Görev türü Task

Programdaki başka işlemlerle aynı anda işletilebilen işlemlere görev denir. Görevler std.parallelism.Task türü ile ifade edilirler.

parallel() her iş parçacığı için foreach bloğundaki işlemlerden oluşan farklı bir Task nesnesi kurar ve o görevi otomatik olarak başlatır. foreach döngüsünden çıkmadan önce de başlattığı bütün görevlerin tamamlanmalarını bekler. Kurma, başlatma, ve tamamlanmasını bekleme işlemlerini otomatik olarak yürüttüğü için çok yararlıdır.

Aynı anda işletilebilen işlemlerin herhangi bir topluluk ile doğrudan ilgileri olmayan durumlarda kurma, başlatma, ve bekleme işlevlerinin bir Task nesnesi üzerinden açıkça çağrılmaları gerekir. Görev nesnesi kurmak için task(), görevi başlatmak için executeInNewThread(), görevin tamamlanmasını beklemek için de yieldForce() kullanılır. Bu işlevleri aşağıdaki programın açıklama satırlarında anlatıyorum.

Aşağıdaki programdaki birİşlem() iki farklı iş için iki kere başlatılmaktadır. Hangi iş ile ilgili olarak işlediğini görebilmemiz için kimlik'in baş harfini çıkışa yazdırıyor.

Not: Standart çıkışa yazdırılan bilgiler çoğu durumda çıkışta hemen belirmezler; satır sonu karakteri gelene kadar bir ara bellekte bekletilirler. write satır sonu karakteri yazdırmadığından, programın işleyişini izleyebilmek için o ara belleğin hemen çıkışa gönderilmesini stdout.flush() ile sağlıyoruz.

import std.stdio;
import std.parallelism;
import std.array;
import core.thread;

/* kimlik'in baş harfini yarım saniyede bir çıkışa yazdırır */
int birİşlem(string kimlik, int süre) {
    writefln("%s %s saniye sürecek", kimlik, süre);

    foreach (i; 0 .. (süre * 2)) {
        Thread.sleep(500.msecs);  /* yarım saniye */
        write(kimlik.front);
        stdout.flush();
    }

    return 1;
}

void main() {
    /* birİşlem()'i işletecek olan bir görev kuruluyor.
     * Burada belirtilen işlev parametreleri görev işlevine
     * parametre olarak gönderilirler. */
    auto görev = task!birİşlem("görev", 5);

    /* 'görev' başlatılıyor */
    görev.executeInNewThread();

    /* 'görev' işine devam ederken başka bir işlem
     * başlatılıyor */
    immutable sonuç = birİşlem("main içindeki işlem", 3);

    /* Bu noktada main içinde başlatılan işlemin
     * tamamlandığından eminiz; çünkü onu görev olarak değil,
     * her zaman yaptığımız gibi bir işlev çağrısı olarak
     * başlattık. */

    /* Öte yandan, bu noktada 'görev'in işini tamamlayıp
     * tamamlamadığından emin olamayız. Gerekiyorsa
     * tamamlanana kadar beklemek için yieldForce()'u
     * çağırıyoruz. yieldForce() ancak görev tamamlanmışsa
     * döner. Dönüş değeri görev işlevinin, yani
     * birİşlem()'in dönüş değeridir. */
    immutable görevSonucu = görev.yieldForce();

    writeln();
    writefln("Hepsi tamam; sonuç: %s", sonuç + görevSonucu);
}

Programın çıktısı benim denediğim ortamda aşağıdakine benziyor. İşlemlerin aynı anda gerçekleştiklerini m ve g harflerinin karışık olarak yazdırılmalarından anlıyoruz:

main içindeki işlem 3 saniye sürecek
görev 5 saniye sürecek
mgmggmmgmgmggggg
Hepsi tamam; sonuç: 2

Yukarıdaki task!birİşlem kullanımında görev işlevi task'e şablon parametresi olarak belirtilmektedir. Bu yöntem çoğu duruma uygun olsa da, Şablonlar bölümünde gördüğümüz gibi, bir şablonun her farklı gerçekleştirmesi farklı bir türdendir. Bu fark, aynı türden olmalarını bekleyeceğimiz görev nesnelerinin aslında farklı türden olmalarına ve bu yüzden birlikte kullanılamamalarına neden olabilir.

Örneğin, aşağıdaki iki işlevin parametre ve dönüş türleri aynı olduğu halde task() işlev şablonu yoluyla elde edilen iki Task şablon gerçekleştirmesi farklı türdendir. Bu yüzden, aynı dizinin elemanı olamazlar:

import std.parallelism;

double foo(int i) {
    return i * 1.5;
}

double bar(int i) {
    return i * 2.5;
}

void main() {
    auto tasks = [ task!foo(1),
                   task!bar(2) ];    // ← derleme HATASI
}

Derleyici, "uyumsuz türler" anlamına gelen bir hata mesajı verir:

Error: incompatible types for ((task(1)) : (task(2))):
'Task!(foo, int)*' and 'Task!(bar, int)*'

task()'in başka bir yüklemesi görev işlevini şablon parametresi olarak değil, işlev parametresi olarak alır:

    void işlem(int sayı) {
        // ...
    }

    auto görev = task(&işlem, 42);

Bu yöntem farklı şablon gerçekleştirmeleri kullanmadığından, farklı işlev kullanıyor olsalar bile farklı Task nesneleri aynı dizinin elemanı olabilirler:

import std.parallelism;

double foo(int i) {
    return i * 1.5;
}

double bar(int i) {
    return i * 2.5;
}

void main() {
    auto tasks = [ task(&foo, 1),
                   task(&bar, 2) ];    // ← derlenir
}

Gerektiğinde isimsiz bir işlev veya opCall() işlecini tanımlamış olan bir türün bir nesnesi de kullanılabilir. Örneğin bir isimsiz işlev ile şöyle çağrılabilir:

    auto görev = task((int sayı) {
                          /* ... */
                      }, 42);
Atılan hatalar

Görevler farklı iş parçacıklarında işletildiklerinden, attıkları hatalar onları başlatan iş parçacığı tarafından yakalanamaz. Bu yüzden, atılan hatayı görevin kendisi yakalar ve yieldForce() çağrılana kadar bekletir. Aynı hata yieldForce() çağrıldığında tekrar atılır ve böylece görevi başlatmış olan iş parçacığı tarafından yakalanabilir.

import std.stdio;
import std.parallelism;
import core.thread;

void hataAtanİşlem() {
    writeln("hataAtanİşlem() başladı");
    Thread.sleep(1.seconds);
    writeln("hataAtanİşlem() hata atıyor");
    throw new Exception("Atılan hata");
}

void main() {
    auto görev = task!hataAtanİşlem();
    görev.executeInNewThread();

    writeln("main devam ediyor");
    Thread.sleep(3.seconds);

    writeln("main, görev'in sonucunu alıyor");
    görev.yieldForce();
}

Görev sırasında atılan hatanın programı hemen sonlandırmadığını programın çıktısında görüyoruz:

main devam ediyor
hataAtanİşlem() başladı
hataAtanİşlem() hata atıyor                   ← atıldığı zaman
main, görev'in sonucunu alıyor
object.Exception@deneme.d(10): Atılan hata    ← farkedildiği zaman

Görevin attığı hata, istendiğinde yieldForce()'u sarmalayan bir try-catch bloğu ile yakalanabilir. Bunun alışılmışın dışında bir kullanım olduğuna dikkat edin: try-catch bloğu normalde hatayı atan kodu sarmalar. Görevlerde ise yieldForce()'u sarmalar:

    try {
        görev.yieldForce();

    } catch (Exception hata) {
        writefln("görev sırasında bir hata olmuş: '%s'",
                 hata.msg);
    }

Programın şimdiki çıktısı:

main devam ediyor
hataAtanİşlem() başladı
hataAtanİşlem() hata atıyor                   ← atıldığı zaman
main, görev'in sonucunu alıyor
görev sırasında bir hata olmuş: 'Atılan hata' ← yakalandığı zaman
Task işlevleri

Görevin tamamlanmasını beklemek için üç farklı işlev vardır:

Bunlar arasından çoğu durumda en uygun olan yieldForce()'tur. spinForce(), her ne kadar mikro işlemciyi meşgul etse de görevin çok kısa bir süre sonra tamamlanacağının bilindiği durumlarda yararlıdır. workForce(), görev beklenene kadar başka bir görevin başlatılmasının istendiği durumlara uygundur.

Task'in diğer üye işlevleri için internet üzerindeki Phobos belgelerine bakınız.

taskPool.asyncBuf()

Bu işlev normalde sırayla ilerletilen InputRange aralıklarının koşut olarak ilerletilmelerini sağlar. asyncBuf() koşut olarak ilerlettiği aralığın elemanlarını kendisine ait bir ara bellekte bekletir ve gerektikçe buradan sunar.

Ancak, olasılıkla bütünüyle tembel olan giriş aralığının bütünüyle hevesli hale gelmesini önlemek için elemanları dalgalar halinde ilerletir. Belirli sayıdaki elemanı koşut olarak hazırladıktan sonra onlar popFront() ile aralıktan çıkartılana kadar başka işlem yapmaz. Daha sonraki elemanları hesaplamaya başlamadan önce hazırdaki o elemanların tamamen kullanılmalarını bekler.

Parametre olarak bir aralık ve seçime bağlı olarak her dalgada kaç eleman ilerletileceği bilgisini alır. Bu bilgiyi ara bellek uzunluğu olarak adlandırabiliriz:

    auto elemanlar = taskPool.asyncBuf(aralık, ara_bellek_uzunluğu);

asyncBuf()'ın etkisini görmek için hem ilerletilmesi hem de foreach içindeki kullanımı yarım saniye süren bir aralık olduğunu varsayalım. Bu aralık, kurulurken belirtilmiş olan sınır değere kadar elemanlar üretiyor:

import std.stdio;
import core.thread;

struct BirAralık {
    int sınır;
    int i;

    bool empty() const {
        return i >= sınır;
    }

    int front() const {
        return i;
    }

    void popFront() {
        writefln("%s değerinden sonrası hesaplanıyor", i);
        Thread.sleep(500.msecs);
        ++i;
    }
}

void main() {
    auto aralık = BirAralık(10);

    foreach (eleman; aralık) {
        writefln("%s değeri kullanılıyor", eleman);
        Thread.sleep(500.msecs);
    }
}

Aralık tembel olarak kullanıldıkça elemanları teker teker hesaplanır ve döngü içinde kullanılır. Her elemanın hesaplanması ve kullanılması toplam bir saniye sürdüğü için 10 elemanlı aralığın işlemleri 10 saniye sürer:

$ time ./deneme
0 değeri kullanılıyor
0 değerinden sonrası hesaplanıyor
1 değeri kullanılıyor
1 değerinden sonrası hesaplanıyor
2 değeri kullanılıyor
...
8 değerinden sonrası hesaplanıyor
9 değeri kullanılıyor
9 değerinden sonrası hesaplanıyor

real	0m10.007s    ← toplam 10 saniye
user	0m0.004s
sys	0m0.000s

Elemanların sırayla hesaplandıkları ve kullanıldıkları görülüyor.

Oysa, bir sonraki elemanın hazırlanmasına başlamak için öndeki elemanların işlemlerinin sonlanmaları gerekmeyebilir. Öndeki elemanın kullanılması ile bir sonraki elemanın hesaplanması aynı anda gerçekleşebilseler, bütün süre kabaca yarıya inebilir. asyncBuf() bunu sağlar:

import std.parallelism;
//...
    foreach (eleman; taskPool.asyncBuf(aralık, 2)) {

Yukarıdaki kullanımda asyncBuf() her seferinde iki elemanı hazırda bekletecektir. Yeni elemanların hazırlanmaları döngü işlemleri ile koşut olarak gerçekleştirilir ve toplam süre azalır:

$ time ./deneme
0 değerinden sonrası hesaplanıyor
1 değerinden sonrası hesaplanıyor
0 değeri kullanılıyor
2 değerinden sonrası hesaplanıyor
1 değeri kullanılıyor
3 değerinden sonrası hesaplanıyor
2 değeri kullanılıyor
4 değerinden sonrası hesaplanıyor
3 değeri kullanılıyor
5 değerinden sonrası hesaplanıyor
4 değeri kullanılıyor
6 değerinden sonrası hesaplanıyor
5 değeri kullanılıyor
7 değerinden sonrası hesaplanıyor
6 değeri kullanılıyor
8 değerinden sonrası hesaplanıyor
7 değeri kullanılıyor
9 değerinden sonrası hesaplanıyor
8 değeri kullanılıyor
9 değeri kullanılıyor

real	0m6.007s    ← şimdi 6 saniye
user	0m0.000s
sys	0m0.004s

Hangi ara bellek uzunluğunun daha hızlı sonuç vereceği her programa ve her duruma göre değişebilir. Ara bellek uzunluğunun varsayılan değeri 100'dür.

asyncBuf() foreach döngüleri dışında da yararlıdır. Aşağıdaki kod asyncBuf()'ın dönüş değerini bir InputRange aralığı olarak kullanıyor:

    auto aralık = BirAralık(10);
    auto koşutAralık = taskPool.asyncBuf(aralık, 2);
    writeln(koşutAralık.front);
taskPool.map()

Koşut map()'i anlamadan önce std.algorithm modülündeki map()'i anlamak gerekir. Çoğu fonksiyonel dilde de bulunan std.algorithm.map, belirli bir işlevi belirli bir aralıktaki bütün elemanlara teker teker uygular. Sonuç olarak o işlevin sonuçlarından oluşan yeni bir aralık döndürür. İşleyişi tembeldir; işlevi elemanlara ancak gerektikçe uygular. std.algorithm.map tek çekirdek üzerinde işler.

map()'in tembel işleyişi bir çok programda hız açısından yararlıdır. Ancak, işlevin nasıl olsa bütün elemanlara da uygulanacağı ve o işlemlerin birbirlerinden bağımsız oldukları durumlarda bu tembellik aksine yavaşlığa neden olabilir. std.parallelism modülündeki taskPool.map() ve taskPool.amap() ise bütün işlemci çekirdeklerinden yararlanırlar ve bu gibi durumlarda daha hızlı işleyebilirler.

Bu üç algoritmayı yine Öğrenci örneği üzerinde karşılaştıralım. Elemanlara uygulanacak olan işlev örneği olarak Öğrenci türünün not ortalaması döndüren bir işlevi olduğunu varsayalım. Koşut programlamanın etkisini görebilmek için bu işlevi de Thread.sleep ile yapay olarak yavaşlatalım.

std.algorithm.map, uygulanacak olan işlevi şablon parametresi olarak, aralığı da işlev parametresi olarak alır. İşlevin elemanlara uygulanmasından oluşan sonuç değerleri başka bir aralık olarak döndürür:

    auto sonuç_aralık = map!işlev(aralık);

İşlev map()'e önceki bölümlerde de gördüğümüz gibi isimsiz işlev olarak verilebilir. Aşağıdaki örnekteki ö parametresi işlevin uygulanmakta olduğu elemanı belirler:

import std.stdio;
import std.algorithm;
import core.thread;

struct Öğrenci {
    int numara;
    int[] notlar;

    double ortalamaNot() {
        writeln(numara,
                " numaralı öğrencinin işlemi başladı");
        Thread.sleep(1.seconds);

        const ortalama = notlar.sum / notlar.length;

        writeln(numara, " numaralı öğrencinin işlemi bitti");
        return ortalama;
    }
}

void main() {
    Öğrenci[] öğrenciler;

    foreach (i; 0 .. 10) {
        /* Her öğrenciye 80'li ve 90'lı iki not */
        öğrenciler ~= Öğrenci(i, [80 + i, 90 + i]);
    }

    auto sonuçlar = map!(ö => ö.ortalamaNot)(öğrenciler);

    foreach (sonuç; sonuçlar) {
        writeln(sonuç);
    }
}

Programın çıktısı map()'in tembel olarak işlediğini gösteriyor; ortalamaNot() her sonuç için foreach ilerledikçe çağrılır:

$ time ./deneme
0 numaralı öğrencinin işlemi başladı
0 numaralı öğrencinin işlemi bitti
85              ← foreach ilerledikçe hesaplanır
1 numaralı öğrencinin işlemi başladı
1 numaralı öğrencinin işlemi bitti
86
...
9 numaralı öğrencinin işlemi başladı
9 numaralı öğrencinin işlemi bitti
94

real	0m10.006s    ← toplam 10 saniye
user	0m0.000s
sys	0m0.004s

std.algorithm.map hevesli bir algoritma olsaydı, işlemlerin başlangıç ve bitişleriyle ilgili mesajların hepsi en başta yazdırılırlardı.

std.parallelism modülündeki taskPool.map(), temelde std.algorithm.map ile aynı biçimde işler. Tek farkı, işlevleri aynı anda işletmesidir. Ürettiği sonuçları uzunluğu ikinci parametresi ile belirtilen bir ara belleğe yerleştirir ve buradan sunar. Örneğin, aşağıdaki kod işlevleri her adımda üç eleman için aynı anda işletir:

import std.parallelism;
// ...
double ortalamaNot(Öğrenci öğrenci) {
    return öğrenci.ortalamaNot;
}
// ...
    auto sonuçlar = taskPool.map!ortalamaNot(öğrenciler, 3);

Not: Yukarıdaki ortalamaNot() işlevi temsilcilerin şablonlarla kullanımları ile ilgili bir kısıtlama nedeniyle gerekmiştir. Daha kısa olan aşağıdaki satır, TaskPool.map'in bir "sınıf içi şablon" olması nedeniyle derlenemez:

auto sonuçlar =
    taskPool.map!(ö => ö.ortalamaNot)(öğrenciler, 3); // ← derleme HATASI

Bu sefer işlemlerin üçer üçer aynı anda ama belirsiz sırada işletildiklerini görüyoruz:

$ time ./deneme
0 numaralı öğrencinin işlemi başladı ← aynı anda
2 numaralı öğrencinin işlemi başladı ← ama belirsiz sırada
1 numaralı öğrencinin işlemi başladı
0 numaralı öğrencinin işlemi bitti
2 numaralı öğrencinin işlemi bitti
1 numaralı öğrencinin işlemi bitti
85
86
87
5 numaralı öğrencinin işlemi başladı
3 numaralı öğrencinin işlemi başladı
4 numaralı öğrencinin işlemi başladı
5 numaralı öğrencinin işlemi bitti
4 numaralı öğrencinin işlemi bitti
3 numaralı öğrencinin işlemi bitti
88
89
90
8 numaralı öğrencinin işlemi başladı
6 numaralı öğrencinin işlemi başladı
7 numaralı öğrencinin işlemi başladı
8 numaralı öğrencinin işlemi bitti
6 numaralı öğrencinin işlemi bitti
7 numaralı öğrencinin işlemi bitti
91
92
93
9 numaralı öğrencinin işlemi başladı
9 numaralı öğrencinin işlemi bitti
94

real	0m4.007s    ← toplam 4 saniye
user	0m0.000s
sys	0m0.004s

İşlevin belgesinde bufSize olarak geçen ikinci parametrenin anlamı asyncBuf()'ın ikinci parametresi ile aynı anlamdadır. Bu parametre, üretilen sonuçların depolandığı ara belleğin uzunluğunu belirtir ve varsayılan değeri 100'dür. Üçüncü parametre ise parallel()'de olduğu gibi iş birimi büyüklüğü anlamındadır. Farkı, varsayılan değerinin size_t.max olmasıdır:

    /* ... */ = taskPool.map!işlev(aralık,
                                   ara_bellek_uzunluğu = 100,
                                   iş_birimi_büyüklüğü = size_t.max);
taskPool.amap()

İki fark dışında taskPool.map() ile aynı biçimde işler:

    auto sonuçlar = taskPool.amap!ortalamaNot(öğrenciler);

Hevesli olduğu için amap()'ten dönüldüğünde bütün sonuçlar hesaplanmışlardır:

$ time ./deneme
0 numaralı öğrencinin işlemi başladı ← hepsi en başta
2 numaralı öğrencinin işlemi başladı
1 numaralı öğrencinin işlemi başladı
3 numaralı öğrencinin işlemi başladı
0 numaralı öğrencinin işlemi bitti
4 numaralı öğrencinin işlemi başladı
1 numaralı öğrencinin işlemi bitti
5 numaralı öğrencinin işlemi başladı
3 numaralı öğrencinin işlemi bitti
6 numaralı öğrencinin işlemi başladı
2 numaralı öğrencinin işlemi bitti
7 numaralı öğrencinin işlemi başladı
4 numaralı öğrencinin işlemi bitti
8 numaralı öğrencinin işlemi başladı
5 numaralı öğrencinin işlemi bitti
9 numaralı öğrencinin işlemi başladı
6 numaralı öğrencinin işlemi bitti
7 numaralı öğrencinin işlemi bitti
9 numaralı öğrencinin işlemi bitti
8 numaralı öğrencinin işlemi bitti
85
86
87
88
89
90
91
92
93
94

real	0m3.005s    ← toplam 3 saniye
user	0m0.000s
sys	0m0.004s

amap() koşut map()'ten daha hızlı işler ama bütün sonuçları alacak kadar büyük bir dizi kullanmak zorundadır. Hız kazancının karşılığı olarak daha fazla bellek kullanır.

amap()'in isteğe bağlı olan ikinci parametresi de parallel()'de olduğu gibi iş birimi büyüklüğü anlamındadır:

    auto sonuçlar = taskPool.amap!ortalamaNot(öğrenciler, 2);

Sonuçlar dönüş değeri olarak elde edilmek yerine üçüncü parametre olarak verilen bir RandomAccessRange aralığına da yazılabilirler. O aralığın uzunluğu elemanların uzunluğuna eşit olmalıdır:

    double[] sonuçlar;
    sonuçlar.length = öğrenciler.length;
    taskPool.amap!ortalamaNot(öğrenciler, 2, sonuçlar);
taskPool.reduce()

Koşut reduce()'u anlamadan önce std.algorithm modülündeki reduce()'u anlamak gerekir.

std.algorithm.reduce, daha önce Aralıklar bölümünde gördüğümüz fold()'un eşdeğeridir. En belirgin farkı, işlev parametrelerinin sırasının fold'un tersi olmasıdır. (Bu yüzden, koşut olmayan işlemlerde std.algorithm.reduce yerine UFCS'e olanak veren std.algorithm.fold'u yeğlemenizi öneririm.)

reduce() başka dillerde de bulunan üst düzey bir algoritmadır. map()'te olduğu gibi, şablon parametresi olarak bir veya birden fazla işlev alır. İşlev parametreleri olarak da bir başlangıç değeri ve bir aralık alır. Belirtilen işlevleri o andaki sonuca ve her elemana uygular. Açıkça başlangıç değeri verilmediği zaman aralığın ilk elemanını başlangıç değeri olarak kullanır.

Nasıl işlediği, kendi içinde tanımlamış olduğu varsayılan sonuç isimli bir değişken üzerinden aşağıdaki gibi ifade edilebilir:

  1. sonuç'u başlangıç değeri ile ilkler.
  2. Her bir eleman için sonuç = işlev(sonuç, eleman) ifadesini işletir.
  3. sonuç'un son değerini döndürür.

Örneğin bir dizinin bütün elemanlarının karelerinin toplamı aşağıdaki gibi hesaplanabilir:

import std.stdio;
import std.algorithm;

void main() {
    writeln(reduce!((a, b) => a + b * b)(0, [5, 10]));
}

İşlev yukarıdaki gibi dizgi olarak belirtildiğinde a belirli bir andaki sonuç değerini, b de eleman değerini temsil eder. İlk işlev parametresi başlangıç değeridir (yukarıdaki 0).

Program sonuçta 5 ve 10'un kareleri olan 25 ve 100'ün toplamını yazdırır:

125

Tarifinden de anlaşılacağı gibi reduce() kendi içinde bir döngü işletir. O döngü tek çekirdek üzerinde işlediğinden, elemanların işlemlerinin birbirlerinden bağımsız oldukları durumlarda yavaş kalabilir. Böyle durumlarda std.parallelism modülündeki taskPool.reduce() kullanılarak işlemlerin bütün çekirdekler üzerinde işletilmeleri sağlanabilir.

Bunun örneğini görmek için reduce()'u yine yapay olarak yavaşlatılmış olan bir işlevle kullanalım:

import std.stdio;
import std.algorithm;
import core.thread;

int birHesap(int sonuç, int eleman) {
    writefln("başladı    - eleman: %s, sonuç: %s",
             eleman, sonuç);

    Thread.sleep(1.seconds);
    sonuç += eleman;

    writefln("tamamlandı - eleman: %s, sonuç: %s",
             eleman, sonuç);

    return sonuç;
}

void main() {
    writeln("Sonuç: ", reduce!birHesap(0, [1, 2, 3, 4]));
}

reduce() elemanları sırayla ve teker teker kullanır ve bu yüzden program 4 saniye sürer:

$ time ./deneme
başladı    - eleman: 1, sonuç: 0
tamamlandı - eleman: 1, sonuç: 1
başladı    - eleman: 2, sonuç: 1
tamamlandı - eleman: 2, sonuç: 3
başladı    - eleman: 3, sonuç: 3
tamamlandı - eleman: 3, sonuç: 6
başladı    - eleman: 4, sonuç: 6
tamamlandı - eleman: 4, sonuç: 10
Sonuç: 10

real	0m4.003s    ← 4 saniye
user	0m0.000s
sys	0m0.000s

parallel() ve map() örneklerinde olduğu gibi, bu programa da std.parallelism modülünü eklemek ve reduce() yerine taskPool.reduce()'u çağırmak bütün çekirdeklerden yararlanmak için yeterlidir:

import std.parallelism;
// ...
    writeln("Sonuç: ", taskPool.reduce!birHesap(0, [1, 2, 3, 4]));

Ancak, taskPool.reduce()'un işleyişinin önemli farklılıkları vardır.

Yukarıda gördüğümüz koşut algoritmalarda olduğu gibi taskPool.reduce() da elemanları birden fazla göreve paylaştırarak koşut olarak işletir. Her görev kendisine verilen elemanları kullanarak farklı bir sonuç hesaplar. Yalnızca tek başlangıç değeri olduğundan, her görevin hesapladığı sonuç o değerden başlar (yukarıdaki 0).

Görevlerin hesapları tamamladıkça, onların ürettikleri sonuçlar son bir kez aynı sonuç hesabından geçirilirler. Bu son hesap koşut olarak değil, tek çekirdek üzerinde işletilir. O yüzden taskPool.reduce() bu örnekte olduğu gibi az sayıda elemanla kullanıldığında daha yavaş sonuç verebilir. Bunu aşağıdaki çıktıda göreceğiz.

Aynı başlangıç değerinin bütün görevler tarafından kullanılıyor olması taskPool.reduce()'un hesapladığı sonucun normal reduce()'dan farklı çıkmasına neden olabilir. Bu sonuç aynı nedenden dolayı yanlış da olabilir. O yüzden başlangıç değeri bu örnekteki toplama işleminin başlangıç değeri olan 0 gibi etkisiz bir değer olmak zorundadır.

Ek olarak, elemanlara uygulanan işlevin aldığı parametrelerin türü ve işlevin dönüş türü ya aynı olmalıdır ya da birbirlerine otomatik olarak dönüşebilmelidirler.

taskPool.reduce() ancak bu özellikleri anlaşılmışsa kullanılmalıdır.

import std.parallelism;
// ...
    writeln("Sonuç: ", taskPool.reduce!birHesap(0, [1, 2, 3, 4]));

Çıktısında önce birden fazla görevin aynı anda, onların sonuçlarının ise sırayla işletildiklerini görüyoruz. Sırayla işletilen işlemleri işaretli olarak gösteriyorum:

$ time ./deneme
başladı    - eleman: 1, sonuç: 0 ← önce görevler aynı anda
başladı    - eleman: 2, sonuç: 0
başladı    - eleman: 3, sonuç: 0
başladı    - eleman: 4, sonuç: 0
tamamlandı - eleman: 1, sonuç: 1
başladı    - eleman: 1, sonuç: 0 ← onların sonuçları sırayla
tamamlandı - eleman: 2, sonuç: 2
tamamlandı - eleman: 3, sonuç: 3
tamamlandı - eleman: 4, sonuç: 4
tamamlandı - eleman: 1, sonuç: 1
başladı    - eleman: 2, sonuç: 1
tamamlandı - eleman: 2, sonuç: 3
başladı    - eleman: 3, sonuç: 3
tamamlandı - eleman: 3, sonuç: 6
başladı    - eleman: 4, sonuç: 6
tamamlandı - eleman: 4, sonuç: 10
Sonuç: 10

real	0m5.006s    ← bu örnekte koşut reduce daha yavaş
user	0m0.004s
sys	0m0.000s

Matematik sabiti pi'nin (π) seri yöntemiyle hesaplanması gibi başka hesaplarda koşut reduce() daha hızlı işleyecektir.

Birden çok işlev ve çokuzlu sonuçlar

Hem std.algorithm modülündeki map() hem de std.parallelism modülündeki map(), amap(), ve reduce() birden fazla işlev alabilirler. O durumda bütün işlevlerin sonuçları bir arada Çokuzlular bölümünde gördüğümüz Tuple türünde döndürülür. Her işlevin sonucu, o işlevin sırasına karşılık gelen çokuzlu üyesidir. Örneğin, ilk işlevin sonucu çokuzlunun 0 numaralı üyesidir.

Aşağıdaki program birden fazla işlev kullanımını std.algorithm.map üzerinde gösteriyor. Dikkat ederseniz çeyreği() ve onKatı() işlevlerinin dönüş türleri farklıdır. Öyle bir durumda çokuzlu sonuçların üyelerinin türleri de farklı olur.

import std.stdio;
import std.algorithm;
import std.conv;

double çeyreği(double değer) {
    return değer / 4;
}

string onKatı(double değer) {
    return to!string(değer * 10);
}

void main() {
    auto sayılar = [10, 42, 100];
    auto sonuçlar = map!(çeyreği, onKatı)(sayılar);

    writefln("  Çeyreği  On Katı");

    foreach (çeyrekSonucu, onKatSonucu; sonuçlar) {
        writefln("%8.2f%8s", çeyrekSonucu, onKatSonucu);
    }
}

Çıktısı:

  Çeyreği  On Katı
    2.50     100
   10.50     420
   25.00    1000

taskPool.reduce() kullanımında sonuçların ilk değerlerinin de çokuzlu olarak verilmeleri gerekir:

    taskPool.reduce!(foo, bar)(tuple(0, 1), [1, 2, 3, 4]);
TaskPool

std.parallelism modülünün bütün koşut algoritmalarının perde arkasında yararlandıkları görevler bir TaskPool topluluğunun parçalarıdır. Normalde, bütün algoritmalar aynı taskPool isimli topluluğu kullanırlar.

taskPool programın çalışmakta olduğu ortama uygun sayıda göreve sahip olduğundan çoğu durumda ondan başkaca TaskPool nesnesine gerek duyulmaz. Buna rağmen bazen özel bir görev topluluğunun açıkça oluşturulması ve bazı koşut işlemler için onun kullanılması istenebilir.

TaskPool kaç iş parçacığı kullanacağı bildirilerek kurulur. İş parçacığı adedinin varsayılan değeri ortamdaki çekirdek adedinin bir eksiğidir. Bu bölümde gördüğümüz bütün olanaklar açıkça kurulmuş olan bir TaskPool nesnesi üzerinden kullanılabilirler.

Aşağıdaki örnekte parallel() ile nasıl kullanıldığını görüyoruz:

import std.stdio;
import std.parallelism;

void main() {
    auto işçiler = new TaskPool(2);

    foreach (i; işçiler.parallel([1, 2, 3, 4])) {
        writefln("%s kullanılıyor", i);
    }

    işçiler.finish();
}

Görevler tamamlandığında TaskPool nesnesinin iş parçacıklarının sonlandırılmaları için TaskPool.finish() çağrılır.

Özet