17 Aralık 2013 Salı

Paralel İşlemler, PLINQ ve Task Parallel Library

Not : Yazı çok uzun ve bir çok konuya değiniyor. Zaman içinde bu yazıları daha küçük yazılara bölmeyi planlıyorum.

.Net Task Parallel Library

Konuyu Task Parallel Library başlıklı yazıya taşıdım.

.Net ThreadPool

.Net 4.0 ile gelen ThreadPool sınıfı yaratılabilecek azami thread sayısını dinamik olarak hesaplıyor.

MSDN'nin dediğine göre :

There is one thread pool per process. Beginning with the .NET Framework version 4, the default size of the thread pool for a process depends on several factors, such as the size of the virtual address space. A process can call the GetMaxThreads method to determine the number of threads. The number of threads in the thread pool can be changed by using the SetMaxThreads method. Each thread uses the default stack size and runs at the default priority. 
ThreadPool.SetMaxThreads için örnek : İlk parametre worker thread sayısını, ikinci parametre ise I/O completion thread sayısını belirtiyor.

4 tane Core i5 CPU'ya sahip 3 GB hafıza taşıyan sistemim için bu rakamlar :

1023  tane worker thread ve 1000 tane de I/O completion thread çıkınca bayağı şaşırdım. Bu kadar çok thread açılabileceğini hiç sanmazdım.

I/O Completion Thread Nedir ?
.Net ile gelen bazı sınıflar (Stream sınıfları, SqlConnection, WebClient gibi)  BeginXXX, EndXX metodları sunuyorlar. Bu metodları çağırınca I/O Completion threadleri tarafından işletiliyorlar.Aşağıdaki şekilde Completion Port tarafından yönetiliyormuş gibi gösterilen threadleri görebiliriz.  Ama yukarıdaki açıklamadan da anlaşıldığı gibi aslında threadler Completion Port tarafından değil, ThreadPool tarafından yönetiliyor.
Unutmadan eklemek lazım I/O Completion Ports made easy örneğinde de görüldüğü gibi GetQueuedCompletionStatus metodunu çağıran thread'leri biz yaratıyoruz. Bu yüzden bizim yarattıklarımız ThreadPool tarafından yönetilenlere dahil değil.

Limit number of processors used in ThreadPool sorusunda da cevaplandığı gibi sistemdeki işlemci sayısından daha düşük bir sayı verilmesi engellenmiş. Ancak aşağıdaki örnekte olduğu gibi kullanılması istenilen işlemci sayısını düşürerek mevcut işlemcilerin tamamını değil de daha azının kullanılmasını istemek mümkün olabiliyor.

Peki ThreadPool sınıfını kimler kullanıyor ? Are ThreadPool settings global or only for work items? sorusundaki cevaba göre 

  •     ThreadPool
  •     System.Threading.Timer
  •     System.Timers.Timer
  •     TPL Tasks
işleri için ThreadPool sınıfının threadleri kullanılıyor. System.Windows.Threading.DispatcherTimer ve System.Windows.Forms.Timer sınıfları ise GUI thread'i üzerinde çalıştıkları için etkilenmiyorlar.

QueueUserWorkItem
QueueUserWorkItem metodu aynı Java'daki execute metodu gibi bir işi teslim ediyor ve geriye dönüş yapılmıyor.
Aslında bu metodu kullanma artık demode oldu. Onun yerine yeni TPL ile gelen Task.Factory.StartNew(()=>waitCB);
metodunu kullanmak lazım.
QueueUserWorkItem metodundan UI thread'e mesaj yollamak

Konuyu buraya taşıdım.
 .Net Winforms
BackGroundWorker

Konuyu buraya taşıdım.  

QT

QtConcurrent.run
QtConcurrent sınıfı Parallel.Invoke sınıfına benzer şekilde bazı metodları paralel çalıştırma imkanı verir. Dokümantasyonunda da açıklandığı gibi bu metod aslında global QThreadPool nesnesine yeni bir iş atar. Sebebini anlamadığım bir şekilde dokümantasyonda atanan işi iptal etmenin imkanı yoktur ancak QFuture nesnesi ile işin sonucu alınabilir deniyor.

Java'daki CompletionService  veya .Net'teki ContinueWith yapısına benzer şekilde verilen iş bitince bir kod çalıştırmak için buradaki soruda da açıklandığı gibi QFutureWatcher sınıfı kullanılabilir.


Java
Java ile gelen Paralel sınıflar aşağıdaki gibi.
Bu sınıflar arasında en karmaşık yapıya sahip olan ThreadPoolExecutor sınıfının detaylarını gösteren bir şekil ise aşağıda.

Eğer ThreadPool ExecutorService kullanılmadan yaratılmak istenirse aşağıdaki gibi bir kod parçası kullanılabilir.

PriorityBlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>(20, yourPriorityComparator);
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, workQueue);

Bir ThreadPoolExecutor içindeki thread sayısı corePoolSize ve maxPoolSize parametreleri tarafından belirlenir. Bu sayılar kaç tane thread'in sürekli hayatta kalacağını ve eğer gerekirse kaç tane daha yeni thread yaratılabileceğini belirtir. Aşağıdaki şekilde bunu görebiliriz.

getActiveCount
Kaç tane threadin çalıştığını gösterir. Burada anlatıldığı gibi metodun döndürdüğü sayı sayı yaklaşık bir değerdir. Buradaki örnekte, her thread işe başlarken ve bitirirken çağırılan beforeExecute ve afterExecute metodları kullanılarak daha kesin bir sonuç elde etme yöntemi gösterilmiş.Kod parçası aşağıda.

private AtomicInteger activeCount = new AtomicInteger();

@Override
public int getActiveCount() {
    return activeCount.get();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
    activeCount.incrementAndGet();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
    activeCount.decrementAndGet();
}

ExecutorService ile yaratılabilen thread pool tipleri

newSingleThreadExecutor (tek thread ile çalışır, sınırsız sayıda iş eklenebilir)
newSingleThreadExecutor tek bir thread yaratır. Eğer thread sayısından fazla iş gelirse yeni işler bir kuyrukta bekletilir.

Thread pool ile istenirse ThreadFactory de kullanılabilir. Regarding daemon thread providing some service to non daemon thread sorusunda tüm thread'leri daemon olarak yaratan bir factory örneği var. Aşağıdaki örnekte thread'e isim de veriliyor. Thread daemon olunca GUI thread'i sonlanırken diğer thread'lerin bitmesini beklemek zorunda kalmıyor.
Passing a ThreadFactory to the ScheduledThreadPoolExecutor sorusunda ise threadlere sadece isim veren bir başka örnek var.
Assign Priority for Callable type threads sorusunda ise threadlere öncelik atayan örnek var.


newFixedThreadPool (thread sayısı sabittir, sınırsız sayıda iş eklenebilir)
newFixedThreadPool  tipinde verilen sayı kadar sabit thread yaratılır ve threadler boş kalsalar bile yok edilmezler. Sayı her zaman sabittir. Eğer thread sayısından fazla iş gelirse yeni işler bir kuyrukta bekletilir.
Kuyruk sınırsız olduğu için de sınırsız sayıda iş eklenebilir.

Eper sabit sayıda thread ve sınırlı sayıda iş eklenmesini istiyorsak aşağıdaki kodu kullanabiliriz. Kuyruk dolunca default AbortPolicy çağırılır. Bu policy de verilen iş kabul edemeyeceğini belirten RejectedExecutionException atar.
new ThreadPoolExecutor(threadPoolSize,
                        threadPoolSize,
                        0L,
                        MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>(1000));
  
Aşağıdaki Executors.newFixedThreadPool metodunun içini gösteren kod parçası sabit sayıda thread yaratıldığını gösteriyor.

Aşağıdaki örnekte sistemdeki çekirdek sayısı kadar sabit thread yaratma kodu var.

Runtime.getRuntime().availableProcessors() ile alınan çekirdek sayısı sadece bir örnek. Eğer I/O için bloke olma yüzdesi fazlaysa daha fazla sayıda thread bile yaratılabilir.

Specify task order execution in Java sorusunda bu thread pool çeşidine verilen işlere öncelik atanması çözümü ilginç. Örnek çözüm aşağıda.


newCachedThreadPool (thread sayısı dinamik olarak ayarlanır)
newCachedThreadPool tipinde threadler belli bir süreden fazla (60 saniye) boş kaldıkları zaman kaynak kullanımını azaltmak için öldürülürler.Eğer aktif thread sayısından fazla iş gelirse yeni iş için yeni bir thread yaratılır.
 Yalnız dikkat edilmesi gereken bir nokta var .
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
cümlesinden de anlaşıldığı gibi eğer azami thread sayısı verilmişse ve tüm threadler meşgul ise yeni reddedilir. 
Ancak newCachedThreadPool  kullanımında azami thread sayısı Integer.MAX_VALUE olarak kullanıldığı için yaratılabilecek thread sayısına bir üst sınır konulmamış oluyor. Dolayısıyla sistem yeni bir thread açabildiği müddetçe tüm threadler meşgul olsa bile yeni iş reddedilmez.

Impossible to make a cached thread pool with a size limit? sorusunda ThreadPoolExecutor sınıfının ne zaman yeni thread yaratacağı açıklanmış.

  1. If the thread pool has not reached the core size, it creates new threads.
  2. If the core size has been reached and there is no idle threads, it queues tasks.
  3. If the core size has been reached, there is no idle threads, and the queue becomes full, it creates new threads (until it reaches the max size). 
  4. If the max size has been reached, there is no idle threads, and the queue becomes full, the rejection policy kicks in.
Aşağıdaki şekilde de algoritma görülebilir.

newCachedThreadPool ile yaratılan sınıfın core thread sayısından fazla iş gelince yeni thread yaratılmasının sebebi  SynchronousQueue kullanması. Bu sınıfın boyu her zaman 0. Dolayısıyla yeni iş gelince madde 3'e göre kuyruk hep dolu olacağı için yeni bir thread yaratılacak.

60 saniye keep alive süresi olarak adlandırılıyor ve açıklamada da görüldüğü gibi setKeepAliveTime metodu ile süre değiştirilebiliyor. 


newCachedThreadPool core pool size olarak 0 ile başlatılıyor. Bu yüzden hiç iş gelmezse 0 tane thread ile de kalabilir. Eğer 0'dan farklı bir sayı ile başlatılsaydı ve hiç iş gelmezse core thread'lerin bile yok edilmesi istenseydi allowCoreThreadTimeOut metodu ile bu işlem gerçekleştirilebilirdi.

newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor tipinde periyodik olarak çalışması gereken işleri çalıştırılan bir yapı yaratılır. ScheduledExecutorService ve Timer arasındaki farkı görmek için bu soruya bakılabilir.

Tek Seferlik (SingleShot) İşler 
İşi gecikmeli olarak tek bir sefer çalıştırmak istiyorsak aşağıdaki gibi yapılabilir.

Periyodik İşler
İşi periyodik olarak çalıştırmak için aşağıdaki gibi yapılabilir.

Bir başka örnekte ise her 5 saniyede çalışan kod içinse aşağıdaki gibi yapmak lazım.
Executors.newSingleThreadScheduledExecutor()
    .scheduleAtFixedRate(new MyRunnable(),
                        0, //initial delay
                        5, //delay
                        TimeUnit.SECONDS);
Bu çalışma şeklinde aynı anda iki timer expire ederse tek thread olduğu için sadece biri çalıştırılabiliyor. Diğeri sırasını beklemek zorunda.
 
newScheduledThreadPool
ScheduledExecutorService yazısına taşıdım.

Executor.execute 
Bu metod Runnable arayüzü ile kullanılırsa QT'dekinden bile daha ilkel görünüyor. Çünkü ne bir şekilde işi iptal etme ne de Future nesnesi ile sonucu alabilme imkanı var. Sebebi ise  metodun imzasının şöyle olmasıvoid execute (Runnable command) ;

ExecutorService es = Executors.newCachedThreadPool(50);
es.execute(new Job());

public class Job implements Runnable {
    public void run() {
    }
}
Why cannot run() of Runnable throw checked Exceptions? sorusunda da açıklandığı gibi Runnable aslında en eski arayüzlerden birisi ve daha çok bir kodlama yöntemiyle bir thread yaratıp onun içinde çalıştırmak üzere tasarlanmış. 

Daha sonra ortaya çıkan Callable arayüzü aşağıda da görülebileceği gibi Future nesnesi aracılığıyla bir sonuç dönme imkanı da sunuyor.

Eğer bu metod FutureTask ile kullanılırsa kullanması daha kolay bir yapı ortaya çıkıyor. Örnek için Paralel Örüntüler başlıklı yazıya bakabilirsiniz.

ExecutorService.submit
Bu metod ile Future nesnesi alabilme imkanı var. CompletionService ile beraber kullanılınca, kullanımı çok zevkli bir yapı ortaya çıkıyor.
    ExecutorService es = Executors.newCachedThreadPool();

    Future<Boolean> f = es.submit(new Job());

    public class Job implements Callable{

        public boolean call() {

        }

    }

Burada dikkat edilmesi gereken nokta eğer worker thread exception atarsa Future.get()  metodu çağırılınca ExecutionException alabilir. Bu yüzden thread'in sonucunu alırken aşağıdakine benzer bir kod işe yarayabilir.


ExecutorCompletionService
CompletionService .Net'teki ContinueWith yapısına benzese de bence aynı kolaylığı kesinlikle sağlamıyor.

CompletionService örneği aşağıda. Bu yapıyla tüm task'ların sonucunu almak çok kolay.
Under-appreciated Java Classes Part I: CompletionService yazısında da bu sınıfı kullanmak için örnekler var.

Bu sınıfı kullanırken dikkat edilmesi gereken nokta ecs.take().get() metodunu çağırırken dikkatli olmak yoksa bloke olabiliriz. Bu durumdan kaçınmak için yapılması gereken ya verilen iş sayısını sayarak henüz bitmemiş olduğundan eminsek çağırmak ya da poll metodunu kullanmak.


ExecutorService.execute ile başlatılan işi iptal etme
İptidai bir yöntem olarak aşağıdakine benzer bir kod kullanılabilir.

ExecutorService es = Executors.newFixedThreadPool(5);
Worker worker = new Worker();                  
es.execute(worker);
//Arada bir zaman geçsin
worker.interrupt();
//veya
es.shutdownNow();
Worker thread ise iptal edilme işlemini aşağıdakine benzer bir kod parçası ile anlayabilir.

while(!Thread.currentThread().isInterrupted()){
    try{
        // birşeyler yap
    }
    catch(InterruptedException e){
        Thread.currentThread().interrupt();
    }
}
ExecutorService.submit ile başlatılan işi iptal etme
Time limit on individual threads with ExecutorService sorusunda ise başlanılan işi belli bir süre sonra iptal etme örneği verilmiş.
ExecutorService.submit ile başlatılan işi duraklatma
Java ExecutorService pause/resume a specific thread sorusunda submit edilen bir işi duraklatma ve devam ettirme örneği var.
 

ExecutorService.invokeAll
invokeAll metodu ile çağıran thread tüm işler bitinceye kadar bekler. 


invokeAll metodunun bekleme döngüsünü gösteren kod parçası aşağıda.

ForkJoinPool.invoke
Konuyu Fork-Join Framework başlıklı yazıya taşıdım.

ExecutorService ile .Net'teki ContinueWith benzeri bir yapı
submit metodu ile döndürülen Future nesnesi biten işin sonucunu verir ancak submit işlemini yapan thread'in Future nesnesi üzerinden beklemesi gerekir. Eğer beklemek istemiorsak Java executors: how to be notified, without blocking, when a task completes? sorundaki gibi bir sınıf kullanmamız gerekir.



ExecutorService kapatma yöntemleri

shutDown
shutDown metodu ile tüm işlerin bitirilmesi ve yeni işlerin kabul edilmemesi ExecutorService sıfınına bildirilir.

shutDownNow
shutDownNow tüm threadleri derhal durdurmaya çalıır. Bu metod Thread.interrupt() metodunu kullanır. Eğer bizim threadimiz bu metod çağırılınca gönderilen InterruptedException'ı bir şekilde yutuyorsa problem çıkabilir.

awaitTermination
awaitTermination metodu shutDown metodu çağırıldıktan sonra, verilen süre kadar veya tüm işler bitirilinceye kadar bekleyebilmeyi sağlar. Örnek :

service.shutDown();//Yeni gelen işleri reddet
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
awaitTermination aşağıdaki kodu aşağıdaki örnekte görüldüğü gibi çalışan tüm threadlere join yapılmasına benzer.


awaitTermination metodu shutDown metodu çağırılmadan kullanılmamalıdır çünkü bu metod çalışan thread sayısı gösteren bir AtomicInteger üzerinde for döngüsü içinde spin wait yapar.
Verilen sürede tüm işler bitirilmeyebilir. Herşeyin bitmesini beklemenin en iyi yolu döngü içinde beklemektir.
 while(!pool.isTerminated()){ //Tüm işlerin bitmesini bekle
    try {
     pool.awaitTermination(1000, TimeUnit.SECONDS);
     } catch (InterruptedException ex) {
        Logger.getLogger(ThreadManagement.class.getName()).log(Level.SEVERE,null, ex);
     }
}
İşleri teslim edip bitmesini beklemenin en güzel yolu invokeAll metodunu çağırmaktır.
Aşağıdaki örnekte CTRL+C tuşuna basılınca threadleri kapatmak kodu var.

Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                //...
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
ThreadLocal
Thread'ler ile ilgili yazılarda bir de ThreadLocal sınıfına bakmakta fayda var.
Java Executors and per-thread (not per-work unit) objects? yazısından aldığım örnek aşağda.

Future.get
Future sınıfı ile thread'in sonucu alınmaya çalışılırken timeout değeri verilebilir. Eğer istenilen süre içinde cevap alınmazsa bile, çalışan thread iptal edilmiyor.