Fork-Join Nedir?
Verilen işin sürekli daha küçük parçalara bölünerek işlenmesi ve sonucun birleştirilerek elde edilmesidir. Fork-join ve Java 8 ile gelen stream framework bazı açılardan benzeşiyor. Her ikisi de işleri daha küçük parçalara ayırıyorlar.
ForkJoin anladığım kadarıyla Doug Lea'nin bir makalesi üzerine geliştirilmiş.
ForkJoin Pool sınıfı
ForkjoinPool böl ve fethet yöntemi uygulanabilen işlerde çok faydalı.Diğer thread pool'lardan farklı olarak threadler daemon olarak yaratılıyorlar. Dolayısıyla shutdown() metodunun çağırılmasına gerek kalmıyor.
ForkJoinPool ile iki çeşit ForkJoinTask kullanılabiliyor. Bunlar RecursiveAction ve RecursiveTask.
RecursiveAction ile sonuç dönülemezken, RecursiveTask ile dönülebiliyor.
Not : ForkJoinPool'un amacı dışında böl ve fethet yöntemi kullanılmadan, sadece bir sabit newFixedThreadPool gibi kullanıldığı bir örnek burada.
RecursiveTask Nasıl Çalışır?
Bu örnekte alt görevler fork() metodu ile başlatılıyor ve join() metodu ile sonuçları birleştiriliyor. Aşağıdaki şekli buradan aldım.
Result solve(Problem problem) {
if (problem is small)
directly solve problem else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
directly solve problem else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
invoke Metodu ile Sonuç Nasıl Alınır?
Pool'dan sonucu alma örneği
Burada ForkJoinPool kullanımına da bir başka örnek var.ForkJoinPool pool = new ForkJoinPool();Long result = pool .invoke (new MyRecursiveTask()); //Sonucu al
Bu sınıf ile kullanılan RecursiveTask'ın compute(), fork() ve join() metodlarını iyi anlamak lazım. Büyük lokma önce daha küçük parçalara ayrılıyor. Daha sonra küçük lokmaların 1 tanesi hariç, diğerleri iş kuyruğuna ekleniyorlar. Kuyruğa eklenmeyen küçük lokma mevcut thread tarafından çalıştırılıyor.
fork() metodu daha küçük parçaya ayrılan yeni işi kuyruğa ekler. Her thread'in kendi kuyruğu var, ancak gerekirse diğer thread'ler iş çalma (work stealing) yapabilirler.
compute() metodu daha küçük parçaya ayrılan yeni işi kuyruğa eklemeden mevcut thread ile çalıştırır.
join() metodu kuyruktaki çalıştırılan işin bitmesini bekler.
Bir başka örnekte ise verilen dizinin alt dizinlerini listeme var.
RecursiveAction Örneği :
Bu kullanım şekilinde genellikle global tüm action nesneleri tarafından erişilebilen bir nesneye işlem uygulanıyor. Örneği buradan aldım
//Default parallelism level = Runtime.getRuntime().availableProcessors()
ForkJoinPool fjpool = new ForkJoinPool(64);
int arrayLength = 100000000;
int array[] = new int[arrayLength];
RecursiveAction task = new RandomFillAction(array, 0, array.length);
fjpool.invoke(task);
class RandomFillAction extends RecursiveAction {
int low;
int high;
private int[] array;
splitSize = 40000; //Some threshold size to spit the task
public RandomFillAction(int[] array, int low, int high) {
this.low = low;
this.high = high;
this.array = array;
}
@Override
protected void compute() {
if (high - low > splitSize) {
// task is huge so divide in half
int mid = (low + high) >>> 1;
invokeAll(Arrays.asList(new RandomFillAction(array, low, mid),
int arrayLength = 100000000;
int array[] = new int[arrayLength];
RecursiveAction task = new RandomFillAction(array, 0, array.length);
fjpool.invoke(task);
class RandomFillAction extends RecursiveAction {
int low;
int high;
private int[] array;
splitSize = 40000; //Some threshold size to spit the task
public RandomFillAction(int[] array, int low, int high) {
this.low = low;
this.high = high;
this.array = array;
}
@Override
protected void compute() {
if (high - low > splitSize) {
// task is huge so divide in half
int mid = (low + high) >>> 1;
invokeAll(Arrays.asList(new RandomFillAction(array, low, mid),
new RandomFillAction(array, mid, high))
);
} else {
//Some calculation logic
Random random = new Random();
for (int i = low; i < high; i++) {
array[i] = random.nextInt(10000);
}
}
}
}
} else {
//Some calculation logic
Random random = new Random();
for (int i = low; i < high; i++) {
array[i] = random.nextInt(10000);
}
}
}
}
Bir başka örnekte ise RecursiveAction nesnesi global bir listedeki nesneleri metodlarını çağırıyor. Örnekteki computeDirectly metodu en küçük bölünmenin gerçekleştiği yeri gösteriyor.
ForJoinPool ve Thread Sayısı
ForJoinPool (int parallelism)
ile kaç tane thread ile çalışmasını istediğimizi belirtiyoruz. Örnek:
Eğer parametre vermezsek bu sayı CPU sayısına eşit oluyor. Örnek:
ForkJoinPool ve Thread'in Beklemesi
Buradaki soruda ForkJoinPool'un CPU sayısından daha fazla thread'e sahip olamayacağı yazıyor. Eğer thread'lerden birisi I/O için beklerse, diğer bekleyen işler için yeni bir thread yaratılmıyor. Burada bu konuyu açıklayan şimdiye kadar gördüğüm en iyi cevap var.
Müthiş bir yazı olmuş elinize sağlık.
YanıtlaSil