Forum: D Programlama Dili RSS
std.thread eski, std.concurrency yeni
acehreli (Moderatör) #1
Kullanıcı başlığı: Ali Çehreli
Üye Haz 2009 tarihinden beri · 4527 mesaj
Grup üyelikleri: Genel Moderatörler, Üyeler
Profili göster · Bu konuya bağlantı
Konu adı: std.thread eski, std.concurrency yeni
Ön özet:  :-) İş parçacıkları kullanmak istiyorsanız en sondaki programdaki gibi mesajlaştırın.

Son bir kaç günde iş parçacıklarıyla (thread) ilgili konuştuk ve programlar paylaştık. İş parçacıkların çok zor olduklarını hatırladık ve D'nin std.thread modülüyle oynadık.

Ben D'yi başka dillerle karşılaştırırken çok ölçülü davranmaya çalışıyorum. D'yi övmek yerine, güçlü taraflarının kendiliklerinden görüneceklerini biliyorum.

İş parçacıkları konusu da öyle bir konu: D, bu konuda çok üstün! :) Bu sefer heyecanımı gizlemek istemiyorum! :) std.concurrency, hem dilin üstünlüğünü, hem de kütüphane tasarımı konusundaki üstünlüğünü gösteriyor. Biraz aşağıdaki örnekteki gibi arayüzde kapama (delegate) kullanabilmek filan nefis şeyler! :) (Bugün Haskell'ci bir arkadaşıma bahsettiğimde, bunların çok tanıdık geldiğini söyledi. Bu da D'nin üst düzey olanaklarının fonksiyonel programlama dillerine benzediğini gösteriyor.)

İş parçacıkları, TDPL'de "Concurrency" (eş zamanlı çalışma) başlığı altında anlatılıyor. Orada std.thread yerine, std.concurrency kullanmamız gerektiğini anladım. Aslında std.concurrency ile de oynamaya başlamıştım ama içindeki açıklama satırlarından henüz deneysel bir durumda olduğunu düşünmüştüm. Daha tamamlanmadığı da doğru; ama iş parçacığı desteği hemen hemen bitmiş.

TDPL, iş parçacıklarını bütünüyle std.concurrency ile anlatıyor. Alexandrescu uzun sayfalar ayırarak donanım ve yazılımın bu konudaki kısa hikayesini veriyor, iş parçacıklarının güçlüklerine değiniyor, ve D'nin bu konuda ne kadar şanslı bir durumda olduğunu anlatıyor. Çok çekirdekli mikro işlemcilerin yaygın olduğu bir ortamda yaşıyoruz. D, bu konuda gereken her şeye sahip.

Burada daha fazla ayrıntıya girmeden bir kaç önemli noktayı aktaracağım:

- D'de veriler özellikle belirtilmediği zaman tek bir iş parçacığına aittir; bu, C ve C++ gibi dillere göre büyük bir fark ve büyük bir üstünlük... Örneğin evrensel değişkenler, özellikle belirtilmediklerinde her bir iş parçacığının özel verisi olurlar:

import std.stdio;
import core.thread;
 
/*
 * Bu program, iş parçacıkları arasındaki iletişimi bu evrensel değişkenle
 * sağlamaya çalışıyor. İşçi bunu gözleyecek, ve 'true' olduğunda
 * sonlanacak. 'main'in çalıştığı ana iş parçacığı da bunu 'true' yaparak
 * işçiyi sonlandırmaya çalışacak.
 */
bool bitti_mi = false;
 
void bekle(double saniye)
{
    core.thread.Thread.sleep(cast(long)(saniye * 10_000_000));
}
 
class İşçi : Thread
{
    this()
    {
        super(&başlat);
    }
 
    void başlat()
    {
        int sayaç;
        while (!bitti_mi) {
            ++sayaç;
            writeln(sayaç, ' ', sayaç % 2 ? "/ tik \\" : "\\ tak /");
            bekle(1);
        }
    }
}
 
void main()
{
    // İş parçacığını başlatıyoruz
    auto işçi = new İşçi;
    işçi.start();
 
    // Belirli bir süre çalışmasını istiyoruz; bu arada kendimiz de iş
    // yapabiliriz (zaten iş parçacıklarının amacı bu)
    foreach (i; 0 .. 10) {
        bekle(0.5);
        writeln("Ana iş parçacığı da meşgul: ", i);
    }
 
    // Bitmesini istiyoruz
    writeln("Bitsin...");
    bitti_mi = true;
 
    // Bitmesini bekliyoruz
    thread_joinAll();
}

O programdaki evrensel bitti_mi değişkeni yöntemi ne iyi ki D'de işe yaramaz ve işçi hiç sonlanmaz. Çünkü, programdaki her iki iş parçacığının kendisine özel bitti_mi değişkeni olur. main'in 'true' yaptığı bitti_mi ile işçi'nin bitti_mi'si ayrıdır.

Bu, D'nin bir üstünlüğü. Örneğin C ve C++'da bu gibi veriler otomatik olarak paylaşılırlar. Doğru olarak paylaşılmalarına özel olarak dikkat edilmesi gerekir.

Yukarıdaki programın D'de çalışabilmesi için, bitti_mi'nin paylaşılacağının özellikle 'shared' ile belirtilmesi gerekir:

shared bool bitti_mi = false;

Artık bütün iş parçacıkları tarafından paylaşılan tek bitti_mi vardır; ve programdaki işçi sonlanır.

Gelelim verilerin böyle paylaşılmalarının sorunlarına... :) Yukarıdaki gibi bool bir değişkene tek iş parçacığı tarafından yazılmasında ve onun tek iş parçacığı tarafından okunmasında bir sakınca olmayabilir.

Şimdi şöyle bir şey yapalım: evrensel bir 'int' dizisi olsun; ana iş parçacığı o diziye kareleri alınacak değerler yerleştirsin; 4 tane işçi de o diziden sayılar çekerek karelerini alsınlar ve sonuçları çıkışa yazdırsınlar. Yani tek istemci olsun, dört işçi de onun istediklerini yerine getirsinler:

import std.array;
import std.stdio;
import core.thread;
 
shared bool bitti_mi;
shared int[] istenenler;
 
void bekle(double saniye)
{
    core.thread.Thread.sleep(cast(long)(saniye * 10_000_000));
}
 
class KareAlıcı : Thread
{
    int işçiNumarası;
 
    this(int işçiNumarası)
    {
        super(&başlat);
        this.işçiNumarası = işçiNumarası;
    }
 
    void başlat()
    {
        while (true) {
            if (istenenler.empty) {
                if (bitti_mi) {
                    break;
                }
 
            } else {
                // Sayı var; en baştakini diziden çekelim
                int sayı = istenenler[0];
 
                // Herhangi bir nedenle bir yavaşlık olsun
                bekle(0.1);
 
                // Sayıyı diziden çıkartalım da başka bir işçi daha onu
                // kullanmasın
                istenenler = istenenler[1 .. $];
 
                writeln("işçi: ", işçiNumarası,
                        " sayı: ", sayı,
                        " karesi: ", sayı * sayı);
            }
        }
    }
}
 
void main()
{
    // Kareleri alınacak sayıları istenenler dizisine ekliyoruz
    foreach (sayı; 0 .. 10) {
        istenenler ~= sayı;
    }
 
    // İşçileri oluşturuyoruz ve başlatıyoruz
    foreach (i; 0 .. 4) {
        auto işçi = new KareAlıcı(i);
        işçi.start();
    }
 
    // Bittiğini bildirelim
    bitti_mi = true;
 
    // Bitmelerini bekliyoruz
    thread_joinAll();
}

O programı çalıştırırsak, dizi indeksleme hatasıyla sonlandığını görürüz:

işçi: 0 sayı: 0 karesi: 0
işçi: 1 sayı: 0 karesi: 0
işçi: 2 sayı: 0 karesi: 0
işçi: 3 sayı: 0 karesi: 0
işçi: 0 sayı: 1 karesi: 1
işçi: 1 sayı: 2 karesi: 4
işçi: 2 sayı: 3 karesi: 9
işçi: 3 sayı: 4 karesi: 16
işçi: 0 sayı: 5 karesi: 25
işçi: 1 sayı: 6 karesi: 36
core.exception.RangeError@deneme(18066): Range violation

Nedeni, yukarıdaki çıktıda görülüyor: İşçilerin dördü de 'istenenler' dizisini dolu buluyorlar, dördü de en baştaki değeri alıyor ve dördü de istenenler = istenenler[1 .. $] yapıyor. Sonuçta 0 sayısı dört kere işleniyor ama 1, 2, ve 3 sayıları görülemiyor bile.

Sonunda başka bir hata daha oluyor: işçilerden birisi diziyi dolu, yani !istenenler.empty durumunda yakalıyor, ama sayıyı daha istenenler[0] ile alamadan başka birisi diziyi boşaltmış oluyor.

İşte synchronized anahtar sözcüğü bu gibi "işçi yarışlarını" (race condition) ortadan kaldırır. Çok kabaca, yukarıdaki programın doğru çalışması için bir synchronized bloğu ekleyebiliriz:

    void başlat()
    {
        while (true) {
            bekle(0.001);
 
            synchronized {
                // ... while(true)'nun eski içeriği ile aynı ...
            }
        }
    }

(Birden fazla işçiye şans tanımak için bu örnek için bir bekle satırı daha ekledim.)

Artık program doğru çalışır. Ama bütün iş synchronized bloğu içine alındığı için belirli bir anda yalnızca bir işçi çalışmaktadır ve asıl amacımıza ters düşmüş oluruz. Dört işçi olduğu halde belirli bir anda tek işçi çalışabiliyor! Ne anladık bu işten! :)

Umarım o örnek iş parçacıkları kullanan programların sorunları hakkında bir örnek olmuştur. Başka sorunlar da vardır. Örneğin kilit (lock) kullanıldığında bir işçi kilidi eline geçirebilir ve bırakmayı unutabilir. O zaman başka hiçbir işçi çalışamaz; hatta kendisi de çalışamaz...

D'nin olanaklarına dönelim...

D'nin C ve C++ gibi dillerde bulunmayan değişmez veri (immutable) özelliği, bazı durumlarda hiç kilit gerektirmeden veri güvenliği getirir. Değişmez veriler iş parçacıkları tarafından serbestçe paylaşılabilirler; ve D, onların değişmeyeceklerini garanti ettiği için paylaşılmalarında kilit (lock, mutex, vs.) gibi düzenekler de gerekmez.

Çoklu iş parçacıkları kullanan programlamada kabul edilmiş olan güvenli bir yöntem, mesajlaşma (message passing) yöntemidir. Phobos, iş parçacıkları arasındaki iletişimde bu yöntemi benimser. Mesajlaşma, daha başka ortamlarda da denenmiş ve çoklu iş parçacıkları (multi-threading) gereken yerlerde en güvenli yöntem olarak kabul edilmiştir. Özellikle Erlang dili, mesajlaşmanın ne kadar sağlam olduğunu ispatlamış ve bu yöntemin problem büyüklüğünden bağımsız olarak etkin olarak çalıştığını ortaya koymuştur (scalable). (Bu paragraflar çeviri gibi oluyor; ama aslında kafadan yazıyorum. :))

İşte D'nin std.concurrency modülü, iş parçacıklarının sağlam mesajlaşmalarını sağlıyor.

Mesajlaşma aslında alt düzey bir kavram; ama bu modül iş parçacıklarıyla da çalışıyor.

Mesajlaşmanın üstünlüğü, iş parçacıklarının verileri doğrudan paylaşmaları yerine, birbirlerine mesaj göndermeleri üzerine kurulu. Böylece veri erişimini denetim altına almak için kilitler gerekmiyor.

Mesajlar send ile gönderiliyor ve receiveOnly veya receive ile alınıyor.

Yukarıdaki programı şimdi std.concurrency ile yazacağım:

import std.stdio;
import std.concurrency;
 
void main()
{
    /*
     * std.concurrency'deki iş parçacıklarının kimliklerinin türü "thread
     * id"den gelen Tid
     */
    Tid[] işçiler;
 
    // İşçileri oluşturuyoruz ve başlatıyoruz
    foreach (i; 0 .. 4) {
        /*
         * spawn, bir iş parçacığı başlatır. İlk parametresi işçi
         * işlevidir.  Geri kalan parametreler, oldukları gibi ve sıra ile
         * o işçi işleve gönderilirler.
         *
         * thisTid, şu anda çalışmakta olan iş parçacığını temsil
         * eder. Biz, sahibi olarak, başlattığımız iş parçacığına
         * kendimizi o şekilde tanıtıyoruz. Böylece işçiler sonuçları mesaj
         * olarak kime göndermeleri gerektiğini bilebilecekler.
         *
         * Daha sonradan kendimiz tanıyabilelim diye, her işçiye bir de
         * kendimiz numara atıyoruz (i parametresi). Böylece hangi sonucun
         * hangi işçiden geldiğini hiç olmazsa kendimiz için öğretici olsun
         * diye bilebileceğiz.
         */
        Tid tid = spawn(&kareAlıcı, thisTid, i);
        işçiler ~= tid;
    }
 
    immutable sayıAdedi = 10;
 
    /*
     * Görevleri dağıtırken işçileri sıra ile seçelim. % işlecini
     * kullanırsak işçiler dizisinin elemanlarını 0,1,2,3,0,1... diye
     * kullanmış oluruz
     */
    foreach (sayı; 0 .. sayıAdedi) {
        int hangiİşçi = sayı % 4;
        Tid işçi = işçiler[hangiİşçi];
 
        /*
         * send, işçiye mesaj gönderir. Mesajın türü, işçinin beklediği
         * mesaj türlerine uymak zorundadır. Bu durumda, karesi alınacak
         * sayıyı bir int olarak gönderiyoruz.
         */
        işçi.send(sayı);
    }
 
    // Şimdi sonuçları teker teker toplayalım
    foreach (i; 0 .. sayıAdedi) {
        /*
         * receive, mesaj kutusundan bir mesaj çeker. receive'in
         * parametreleri işlev göstergesi olabilecekleri gibi, burada
         * olduğu gibi bir 'kapama hazır değeri'i (delegate literal) de
         * olabilirler.
         *
         * Birinci parametre her zaman için mesajı gönderen işçiyi
         * belirtir. Geri kalan parametreler de o işçinin gönderdiği
         * mesajlardır.
         */
        receive((int işçiNumarası, int sayı, int sonuç)
                {
                    writeln("işçi: ", işçiNumarası,
                            " sayı: ", sayı,
                            " sonuç: ", sonuç);
                });
    }
 
    // Hepsini edindik; işçilerin sonlanmalarını sağlayalım
    foreach (işçi; işçiler) {
        /*
         * Ben, sonlanmalarını Bitsin türünde bir mesaj göndererek
         * sağlamayı seçtim
         */
        işçi.send(Bitsin());
    }
}
 
// Bu, yalnızca işçilere işlemlerin bittiğini belirten bir tür
struct Bitsin
{}
 
void kareAlıcı(Tid sahibim, int benimNumaram)
{
    bool bitti_mi = false;
 
    while (!bitti_mi) {
        /*
         * Buradaki receive'in iki adet 'kapama'sı (delegate) var. Birinci
         * kapama, göndericiden beklediğimiz 'int' durumunu, ikinci kapama
         * da Bitsin durumunu hallediyor.
         *
         * receive'in iki parametre aldığına ve ikisinin de kapama hazır
         * değeri olduğuna dikkat edin. İkisi de ismi olmayan işlevcikler
         * gibi düşünülebilir.
         */
        receive((int sayı)
                {
                    /*
                     * Sonucu sahibimize gönderiyoruz. Şart olmasa da, bu
                     * örnekte ne olup bittiğini daha iyi anlayalım diye,
                     * sahibimizin bize vermiş olduğu numarayı da
                     * gönderiyoruz. Böylece sonucun hangi işçiden
                     * geldiğini de çıkışa yazdırabileceğiz.
                     */
                    sahibim.send(benimNumaram, sayı, sayı * sayı);
                },
 
                (Bitsin)
                {
                    // Bitme mesajını aldık; sonlanmamızı sağlayalım
                    bitti_mi = true;
                });
    }
}

Nefis bir çözüm! :)

Beni bu konuda heyecanlandıran en az iki nokta oldu:

1) Çoklu iş parçacıkları kullanmanın bilinen en iyi yöntemlerinden birisi elimizin altında. Artık "iş parçacıklarından kaçının" değil, tam tersine "mesajlaşan iş parçacıkları kullanın" diyeceğiz. TDPL'de okuduğum ve Erlang dilinin önerdiği bir ilke: "mikro işlemcileri olabildiğince meşgul edin." Çok çekirdekli işlemcilerimizi süs diye almadık. :)

2) std.concurrency'nin receive işlevindeki gibi birden fazla kapama verilmesini daha önce hiçbir kütüphanede görmemiştim. Parametre olarak verilen kapamalar içinden parametre listesi ilk uyan seçiliyor ve işletiliyor. Bu olayın otomatikliği çok hoşuma gitti. :) Böyle bir şey C'de veya C++'da mümkün değil. Bu, yeni bir dil öğrenmenin insanın bakış açısını nasıl değiştirdiğini de gösteriyor. Şimdi C++'da olsa nasıl yapardım diye düşünmeye de başladım. :)

Özet: std.thread'i doğrudan kullanmak yerine, onlara mesajlaşma desteği ile kullandıran std.concurrency kullanılsın. Böylece iş parçacıkları veri paylaşmasınlar, mesajlaşsınlar... :)

Ali
Doğrulama Kodu: VeriCode Lütfen resimde gördüğünüz doğrulama kodunu girin:
İfadeler: :-) ;-) :-D :-p :blush: :cool: :rolleyes: :huh: :-/ <_< :-( :'( :#: :scared: 8-( :nuts: :-O
Özel Karakterler:
Bağlı değilsiniz. · Şifremi unuttum · ÜYELİK
This board is powered by the Unclassified NewsBoard software, 20100516-dev, © 2003-10 by Yves Goergen
Şu an: 2017-11-21, 15:18:26 (UTC -08:00)