23 Temmuz 2014 Çarşamba

Fork-Join Framework

Fork-Join Nedir?
Verilen işin sürekli daha küçük parçalara bölünerek işlenmesi ve sonucun birleştirilerek elde edilmesidir. Fork-join ve Java 8 ile gelen stream framework bazı açılardan benzeşiyor. Her ikisi de işleri daha küçük parçalara ayırıyorlar

ForkJoin anladığım kadarıyla Doug Lea'nin bir makalesi üzerine geliştirilmiş.

ForkJoin Pool sınıfı

ForkjoinPool böl ve fethet yöntemi uygulanabilen işlerde çok faydalı.Diğer thread pool'lardan farklı olarak threadler daemon olarak yaratılıyorlar. Dolayısıyla shutdown() metodunun çağırılmasına gerek kalmıyor.

ForkJoinPool ile iki çeşit ForkJoinTask kullanılabiliyor. Bunlar RecursiveAction ve RecursiveTask.
RecursiveAction ile sonuç dönülemezken, RecursiveTask ile dönülebiliyor

Not : ForkJoinPool'un amacı dışında böl ve fethet yöntemi kullanılmadan, sadece bir sabit newFixedThreadPool gibi kullanıldığı bir örnek burada.


RecursiveTask Nasıl Çalışır?

Bu örnekte alt görevler fork() metodu ile başlatılıyor ve join() metodu ile sonuçları birleştiriliyor. Aşağıdaki şekli buradan aldım.

ForkJoin kullanımını eleştiren bir yazıdan ise aşağıdaki şekli aldım.
Result solve(Problem problem) {
       if (problem is small)
         directly solve problem
       else {
        split problem into independent parts
        fork
 new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

invoke Metodu ile Sonuç Nasıl Alınır?
Pool'dan sonucu alma örneği
ForkJoinPool pool = new ForkJoinPool();
Long result = pool .invoke (new MyRecursiveTask()); //Sonucu al
Burada ForkJoinPool kullanımına da bir başka örnek var.
Bu sınıf ile kullanılan RecursiveTask'ın compute(), fork() ve join() metodlarını iyi anlamak lazım. Büyük lokma önce daha küçük parçalara ayrılıyor. Daha sonra küçük lokmaların 1 tanesi hariç, diğerleri iş kuyruğuna ekleniyorlar. Kuyruğa eklenmeyen küçük lokma mevcut thread tarafından çalıştırılıyor.

fork() metodu daha küçük parçaya ayrılan yeni işi kuyruğa ekler. Her thread'in kendi kuyruğu var, ancak gerekirse diğer thread'ler iş çalma (work stealing) yapabilirler.
compute() metodu daha küçük parçaya ayrılan yeni işi kuyruğa eklemeden mevcut thread ile çalıştırır.
join() metodu kuyruktaki çalıştırılan işin bitmesini bekler.



Bir başka örnekte ise verilen dizinin alt dizinlerini listeme var.

RecursiveAction Örneği :
Bu kullanım şekilinde genellikle global tüm action nesneleri tarafından erişilebilen bir nesneye işlem uygulanıyor. Örneği buradan aldım

//Default parallelism level = Runtime.getRuntime().availableProcessors()
ForkJoinPool fjpool = new ForkJoinPool(64);
int arrayLength = 100000000;
int array[] = new int[arrayLength];
RecursiveAction task = new RandomFillAction(array, 0, array.length);
fjpool.invoke(task);

class RandomFillAction extends RecursiveAction {
    int low;
    int high;
    private int[] array;
    splitSize = 40000; //Some threshold size to spit the task
   
    public RandomFillAction(int[] array, int low, int high) {
        this.low = low;
        this.high = high;
        this.array = array;
    }

    @Override
    protected void compute() {
        if (high - low > splitSize) {
            // task is huge so divide in half
            int mid = (low + high) >>> 1;
            invokeAll(Arrays.asList(new RandomFillAction(array, low, mid), 
                                                 new RandomFillAction(array, mid, high))
                          );
        } else {
            //Some calculation logic
            Random random = new Random();
            for (int i = low; i < high; i++) {
                array[i] = random.nextInt(10000);
            }
        }
    }
}  

Bir başka örnekte ise RecursiveAction nesnesi global bir listedeki nesneleri metodlarını çağırıyor. Örnekteki computeDirectly metodu en küçük bölünmenin gerçekleştiği yeri gösteriyor.

ForJoinPool ve Thread Sayısı
ForJoinPool (int parallelism)
ile kaç tane thread ile çalışmasını istediğimizi belirtiyoruzÖrnek:

Eğer parametre vermezsek bu sayı CPU sayısına eşit oluyor. Örnek:

ForkJoinPool ve Thread'in Beklemesi
Buradaki soruda ForkJoinPool'un CPU sayısından daha fazla thread'e sahip olamayacağı yazıyor. Eğer thread'lerden birisi I/O için beklerse, diğer bekleyen işler için yeni bir thread yaratılmıyorBurada bu konuyu açıklayan şimdiye kadar gördüğüm en iyi cevap var.

1 yorum: