31 Ekim 2014 Cuma

Hadoop Block Size ve Split Size Farkı

Hadoop bir dosyayı HDFS e atarken bloklar halinde atmakta. Blok büyüklükleri de varsayılan olarak 64 MB olarak atanmıştır. Bu değer istenildiğinde değiştirilebilir.

Splitler ise her bir map task parçacığının işleyeceği veri kümesidir. Örnek verecek olursak:

1. Durum:

SplitSize  = 64 MB
BlockSize = 64 MB

Bu durumda her split 1 bloğa yerleştirilecektir. Ve her map task bir split işleyeceğinden bir blokdaki veriyi işleyecektir.

2. Durum:

SplitSize  = 128 MB
BlockSize = 64 MB

Bu durumda her split 2 bloğa yerleştirilecektir. Ve her map task bir split işleyeceğinden iki blokdaki veriyi sırası ile alarak işleyecektir.

Her bir map task çok hızlı bir şekilde çalışıp sonlanıyor ise splitSize max seviyesine çıkartılarak bir map task'ın daha fazla veri işlemesi sağlanabilir. Böylelikle map task sayısı düşecektir. Map tasklarının ayağa kaldırılma süresinden kazanç sağlandığından toplam zamandan kazanç sağlanacaktır.

Hadoop - Jop Reduce Sayısı

Bir Hadoop Job da çalıştırılacak toplam reduce sayısı varsayılan olarak birdir. En uygun şekilde bu sayısı belirleyebilmek için hadoop kümesindeki toplam reduce slot sayısına bakmak gerekir. Toplam reduce sayısından hatalı olabilecek task larıda göz önünde bulundurarak bi miktar az sayıda reduce task atanmasıdır. Toplam reduce slot sayısı JobTracker arayüzünden görülebilir.


30 Ekim 2014 Perşembe

Dosyada Her Satırda Belirtilen Sıradaki Kelimeyi Alma

1. Sıradakini almak için yazılmış örnek komut:

cat new_file.txt | awk '{print $1;}' > urlUnique.txt

24 Ekim 2014 Cuma

Hadoop WebUI Admin Yetkileri

Hadoop JobTracker Web UI üzerinden job sonlandırma gibi işlemleri yapabilmek için core-site.xml dosyasında aşağıdaki parametre eklenir.

<property>
   <name>webinterface.private.actions</name>
      <value>true</value>
      <description> If set to true, the web interfaces of JT and NN may contain actions, such as kill job,         delete file, etc., that should not be exposed to public. Enable this option if the interfaces are only          reachable by those who have the right authorization.
      </description>
</property>

Hadoop - SkipBadRecord

-D keep.failed.task.files=true
-D mapred.max.map.failures.percent=50
-D mapred.max.reduce.failures.percent=50
-D mapred.map.max.attempts=10
-D mapred.skip.mode.enabled=true
-D mapred.skip.map.max.skip.records=1
-D mapred.skip.attempts.to.start.skipping=2

Hadoop - MapReduce-1 Job Fail Durumları


  • Task içeride yapılan işlemlerden dolayı fail edebilir.
  • Task uzun süre JobTracker'a durum ilgisi göndermediğinde (10 dakika), JobTracker tarafından fail etmiş kabul edilir ve JVM 'si kill edilir.
  • Bir task fail ettiğinde tekrar başka bir düğümde ayağa kaldırılır ve çalıştırılır.
  • Aynı task 4 kere fail eder ise tekrar çalıştırılması denenmez. Bu derğer parametriktir ve mapred.map.max.attempts ve mapred.reduce.max.attempts paremetreleri ile değiştirilebilir.
  • Bir job içerisinde belirli sayıda task fail eder ise job fail etmiş sayılır ve çalışan task larda kill edilir. Bu sayı mapred.max.map.failures.percent ve mapred.max.reduce.failures.percent parametreleri ile değiştirebilir.
  • Bir job speculative çalıştırma esnasında diğerinin bitmesinden dolayıda fail durumuna çekilip kill edilebilir.
  • Bir tasktracker hata aldığında çalıştırdığı tüm task'lar da  hatalı duruma çekilip kill edilir.

23 Ekim 2014 Perşembe

Hadoop - YARN Job Çalışma Adımları

YARN kullanılarak başlatılan job'larda, job submit edildiğinde bazı adımlar gerçekleşmekte ve bu adımların başarım durumlarına göre job işletilmeye başlanmaktadır:

  1. Resource Manager dan Application ID  alınır.
  2. Output dosyası mevcut mu kontrol edilir.
  3. Input dosyası mevcut mu kontrol edilir.
  4. Job için gerekli kaynak dosyaların (jar dosyası yada konfigürasyon dosyaları gibi) düğümlere dağıtımı yapılır. 
  5. Tum işlemler başarılı ise job  çalışmaya hazır olduğu belirtilir. 
Buraya kadar olan bölüm bir job'ın çalışmaya başlamadan önceki kontrol adımları şeklinde idi ve job'ın çalıştırıldığı istemci makinasında gerçekleşen adımlardı. Bundan sonra Resource Manager sazı eline alır ve job ilklendirilir, çalışmaya başlatılır:
  1. Schedular bir container oluşturur.
  2. Resource Manager oluşturulan container içerisinde None Manager yönetiminde Application Manager oluşturur. 
  3. Application Manager tasklar'ın durumlarını takip etmek ve counterlarını almak için processler oluşturur ve job'ı başlatır. 
  4. Application Manager her bir split adedince map task oluşturur.
  5. Reduce sayısı ise job da tanımlı "mapred.reduce.tasks" özelliğinin değerine göre belirlenir ve bu sayıda reduce task oluşturulur. 
  6. Eğer uygulama küçük bir uygulama ise [1] Application Manager  job 'ı kendisi ile aynı JVM üzerinde çalıştırır. Bun tür uygulamalara uberized denilmekte.
  7. Herhangi bir map task başlamadan önce Job setup metodu çalıştırılır. Örneğin job başlamadan önce output dosyası burada oluşturulabilir. MapReduce - 1 deki TaskTracker'ların  bu metodu çağırmasının aksine, YARD da  bu method doğrudan Application Master tarafından çağırılır.
  8. Tüm reduce task'lar tamamlandıktan sonra Job cleanup metodu çalıştırılır .Örneğin task bittikten sonra, oluşturulan geçici dosyalar burada silinebilir. MapReduce - 1 deki TaskTracker'ların  bu metodu çağırmasının aksine, YARN da  bu method doğrudan Application Master tarafından çağırılır.
  9. Job uberized olarak seçilmedi ise Application Master tüm map ve reduce task'ları için Resource Manager dan container talebinde bulunur. Bu işlem hearthbeat ler ile yapılır. Hearthbeat çağrısının içinde her bir map taskın versinin nerede olduğu ve input splitlerin hangi makinalarda ve rack'lerde olduğu bilgisi yeralır. Resource Manager locality prensibini göz önünde bulundurarak tasklerin düğümlere atamasını gerçekleştirir. Task atamasında verinin olduğu makinaya ilgili task atanmaya çalışılır ancak bu mümkün değil ise veri ile aynı rack üzerinde başka bir makinaya atama yapılır. Application Master  hearthbeat çağrısı içerisinde task için gerekli olan bellek miktarınıda iletir. Varsayılan ayarlarda bu hem map hemde reduce task için 2014 MB'dır. Bu değer "mapreduce.map.memory.mb" ve "mapreduce.reduce.memory.mb" parametrelerini ile değiştirilebilir.

Bu aşamaya kadar bir job için taskların oluşturulması, containerların oluşturulması ve düğümlere atanmasına kadar geldik. Bundan sonra taskların işletimi için sazı eline Application Master alacak:

  1. Application Master tasklar için oluşturulmuş containerlar'ın bulunduğu düğümler ile iletişime geçerek container'ları başlatır. 
  2. Taksk'ın çalışması için gereken jar dosyaları ve konfigürasyon dosyaları distributed cache den alıp task'ın çalışacağı dosya sistemine kopyalar.
  3. Task için lokal bir çalışma alanı oluşturur ve jar dosyası içeriğini buraya un-jar eder.
  4. Task'ın çalışacağı yeni bir JVM ayağa kaldırır. (YARN da Mapreduce-1 'in aksine JVM tekrar kullanılabilirliği yoktur.)
  5. Task'lar durum bilgilerini ve sayaç bilgilerini 3 saniyede bir  Application Master a iletirler.
  6. İstemci ise her saniyede bir Application Master dan sayaç bilgilerini alır.
  7. İstemci her 5 saniyede bir Application Master dan tamamlanma bilgisini zalır.
  8. Job tamamlandıktan sonra Application Master ve Task Container kendi JVM si içinde setup ve cleanup aksiyonlarını çalıştırır. Böylelikle geçici olarak yazılan dosyalar gibi job a özel şeyler temizlenir.


NOTLAR:
[1]: 10 mapper yada daha az ve 1 reducer'a sahip ve veri boyutu bir HDFS blok boyutundan daha küçük ise küçük bir job olarak kabul edilir. Ancak bu değerler parametrik olarak değiştirebilmekte. "mapreduce.job.ubertask.maxmaps", "mapreduce.job.ubertask.maxreduces", "mapreduce.job.ubertask.maxbytes"  özellikleri ile yeni değerler atanabilir. "mapreduce.job.ubertask.enable" özelliği "false" yapılarak  bu özellik kapatılabilir. 

Hadoop - YARN Temel Bileşenleri

YARN temelde aşağıdaki 4 bileşenden oluşmaktadır:
  1. Verinin okunup yazabileceği dağıtık dosya sistemi olan (HDFS).
  2. Çalıştıracağı job'ı başlatacağı istemci.
  3. Kaynakların yönetileceği "resource manager".
  4. Cluster da bulunan düğümlerdeki kayakların durumlarını izleyen "node manager".
  5. MapReduce job!larında bulunan task'ları yöneten "application master". Application master ve mapreduce taskları resource manager tarafından zamanlanırlar ve node manager tarafından yönetilirler.

Hadoop - YARN 'ın Sağladığı Artılar


  1. JobTracker'ın sorumlulukları ayrıştırıldı:
    1. "Resource Manager":  kaynakların yönetimi.
    2. "Application Manager": uygulamaların yönetimi.
  2. Aynı anda farklı türde ytgulamalar clusterda çalışabilmekte. MapReduce ve MPI uygulaması aynı anda çalışabilmekte.
  3. Her uygulama kendi container'ında çalıştığından uygulamalardaki sorunlar birbirini etkilememekte.
  4. Kullanıcılar aynı cluster üzerinde farklı mapreduce versiyonları üzerinde çalışabilmekte. Böylelikle upgrade işlemleri daha sorunsuz gerçekleşebilmekte.

22 Ekim 2014 Çarşamba

Hadoop - MapReduce (Classic) Job Çalışma Adımları

Hadoop da klasik MapReduce framework kullanılarak başlatılan job'larda, job submit edildiğinde bazı adımlar gerçekleşmekte ve bu adımların başarım durumlarına göre job işletilmeye başlanmaktadır:

  1. JobId alınır.
  2. Output dosyası mevcutmu kontrol edilir.
  3. Input dosyası mevcut mu kontrol edilir.
  4. Job için gerekli kaynak dosyaların (jar dosyası yada konfigürasyondosyaları gibi) düğümlere dağıtımı yapılır. Bu işlem yedekli olarak yapılır. Kaç tane yedeği olacağı "mapred.submit.replication" özelliğinde belirtilir. Bu değer varsayılan olarak 10 dur.
  5. Tum işlemler başarılı ise job  çalışmaya hazır olduğu belirtilir. 
Buraya kadar olan bölüm bir job'ın çalışmaya başlamadan önceki kontrol adımları şeklinde idi ve job'ın çalıştırıldığı istemci makinasında gerçekleşen adımlardı. Bundan sonra JobTracker sazı eline alır ve job ilklendirilir, çalışmaya başlatılır:
  1. Job'ın işleyeceği splitler incelenir.
  2. Her bir split adedince map task oluşturulur.
  3. Reduce sayısı ise job da tanımlı "mapre.reduce.tasks" özelliğinin değerine göre belirlenir ve bu sayıda reduce task oluşturulur. 
  4. Oluşturulan map ve reduce task'larına ID ataması yapılır.
  5. 1 adet job setup task oluşturulur. Bu task map task başlamadan önceki yapılacak işlemlerin tanımlanabileceği bir tasktır. Örneğin job başlamadan önce output dosyası burada oluşturulabilir.
  6. 1 adet job cleanup task oluşturulur. Bu task reduce task'lar tamamlandıktan sonra yapılacak işlemlerin tanımlanabileceği bir tasktır. Örneğin task bittikten sonra, oluşturulan geçici dosyalar burada silinebilir. 
  7. TakTrackerlar belirli aralıklar ile ayakta olduklarına dair jobTracker' a hearthbeat mesajı gönderirler (minimum 5 saniyede bir, büyük cluster'larda bu daha uzun bir süre). Bu mesaja ek olarak yeni bir task alabilmeye uygunluk durumlarınıda JobTracker'a bildirirler.
  8. Eğer tasktracker yeni bir task almaya müsait ise hearthbeat mesajının dönüş değeri olarak JobTracker TaskTracker'a yeni bir task atar. Hangi taskin öncelikli atanacağı belirlenirken scheduler olarak ne kullanıldığı gözönünde bulundurulur. Ek olarak JobTacker eğer tasktracker hem map hemde reduce task almaya musait ise, JobTracker tarafından öncelikle map task kapasitesi doldurulur ve daha sonra reduce task ataması yapılır. Atanacak map task seçilirken "data locality" prensibine dikkat edilir. Ancak reduce task için bu durum göz önünde bulundurulmaz.
Bu aşamaya kadar bir job için taskların oluşturulması ve düğümlere atanmasına kadar geldik. Bundan sonra taskların işletimi için sazı eline TaskTracker alacak:


  1. Tasktracker çalıştıracağı job' a ait jar dosyasını ve gerekli diğer dosyaları distributed cache den alıp tasktrackerın dosya sistemine kopyalar.
  2. Task için lokal bir çalışma alanı oluşturur ve jar dosyası içeriğini buraya un-jar eder.
  3. Task ı çalıştırabilmek için bir TaskRunner çalıştırır.
  4. TaskRunner yeni bir JVM ayağa kaldırır. (JVM tekrar kullanılabilir olarak ayarlama yapıldı ise tekrar ayağa kaldırılmaz. )
  5. Her task kendi JVM si içinde setup ve cleanup aksiyonlarını çalıştırabilir.
  6. Task çalışmaya başladığında statüsünü "progress" e çeker.
  7. TaskTracker hearthbeatler ile sürekli durumunu JobTracker'a bildirir. Task ile ilgili counterlar ise 5 saniyeden saha seyrek olarak JobTracker'a iletilir. Böylelikle durum izlenebilirliği sağlanır.
  8. JobTracker son task için de tamamlandı bilgisini aldıktan sonrajob ın durum bilgisini "success" olarak günceller.
  9. Job ile ilgili conterların ve istatistiklerin bilgileri konsola yazılır.
  10. JobTracker tamamlanan job ile ilgili cleanup işlemini yapar ve TaskTracker' ada aynı işlemi yapmalarını söyler. Böylelikle geçici olarak yazılan dosyalar gibi job a özel şeyler temizlenir.

Hadoop - MapReduce - 1 Temel Bileşenleri

Hadooop da veri işlemek için geliştirilmiş frameworklerden ilki olan MapReduce - 1 (Classic) temelde aşağıdaki 4 bileşenden oluşmaktadır:


  1. Verinin okunup yazabileceği dağıtık dosya sistemi olan (HDFS)
  2. Çalıştıracağı job'ı başlatacağı istemci.
  3. Jobların tamamının paralel olarak işlenmesini kontrol eden JobTracker
  4. Her bir düğüme JobTracker tarafından gönderilmiş olan işlerin düğümlerdeki işletimini sağlayan ve kontrol eden TaskTracker.

16 Ekim 2014 Perşembe

Hadoop Konfigürasyon Ayarları

Hadoop da konfigürasyonlar Configuration sınıfı üzerinden yönetilir. Konfigürasyonlar sisteme xml dosyaları şeklinde import edilebilir. Bunun için farklı yöntemler mevcut.

  • Bu dosyaları kendimiz ayrıştırabileceğimiz gibi GenericOptionsParse sınıfı ile de ayrıştırma işlemi yapılabilir.
  • xml dosyalarının ayrıştırılması ve job ların başlatılması gibi işlemleri soyutlamak için Tool ve ToolRunner sınıfları bulunmakta. Bu sınıfları kullanarak kodlama yapıldığında konfigürasyon dosyalarıdaki parametreler otomatik olarak yüklenmekte ve job'lar çalıştırılmaktadır.
  • Birden çok xml dosyası konfigürasyon ayarları için kodsal olarak eklenebilmektedir: 

    • Configuration.addDefaultResource("hdfs-default.xml");
    • Configuration.addDefaultResource("hdfs-site.xml");
    • Configuration.addDefaultResource("mapred-default.xml");
    • Configuration.addDefaultResource("mapred-site.xml");

  • Konfigürasyon dosyaları job çalıştırılırken komut satırından da eklenebilmektedir:
    • -conf filename
  • Konfigürasyon dosyalarında aynı parametreler var ise en son yüklenen dosyadaki parametre diğerlerini ezmektedir.


  • Bir paremetrenin hiç bir zaman başka değerler ile eilmemesini istiyor isek bu parametrede <final>true</final> özelliği eklenmelidir.
  • Konfigürasyon parametreleri jon çalıştırılırken komut satırından da verilebilmektedir. Bu şekilde ayarlanan parametrelerin önceden aynı isimde tanımlanmış olan değerlere göre önceliği vardır:
    • HADOOP_OPTS="-Dmapred.reduce.tasks=10"

13 Ekim 2014 Pazartesi

Hadoop - Yedek Verilerini (Replica) Neye Göre Konumlandırılıyor?

Hadoop dosya sistemlerinden HDFS de bir veri oluşturulduğunda, bu verinin istenilen adette yedeği altyapı tarafından oluşturulmakta. Yedek sayısı varsayılan olarak 3 adet. Bu değiştirilebilen bir değer.
Peki HDFS üzerindeki verileri yedeklenirken, hangi verinin hangi sunucularda yedekleneceği neye göre belirleniyor.

Burada bir kaç faktör var. Bunlardan en önemlisi bant genişliği. Bant genişliği göz önünde bulundurulduğunda verinin tüm yedeklerini aynı düğümde tutmak yada aynı rack üzerinde tutmak bandwith bakımından kazanç sağlayacaktır. Ancak burada oluşacak bir arıza veri kaybına neden olacaktır. Bu nedenle bu yöntem tercih edilmemekte.

Hadoop verilerin yedeklerini dağıtırken: ilk yedeği istemcinin çalıştığı düğüm üzerinde tutumakta. Eğer istemci hadoop kümesi dışında çalışıyor ise, ilk düğümü çok dolu yada çok meşgul olamayan düğümler arasından rastgele seçmekte. İkinci yedek ise, birinci yedeğin bulunduğu rack haricinden  rack'ler içinden rastgele seçilerek yerleştirilir. Üçüncü yedek ise ikinci ile aynı rack üzerinde, ikinci yedeğin bulunduğu düğümden farklı rastgele bir düğüme yerleştirilir. Hadoop sisteminin ağ topolojisini nasıl öğrendiği  Hadoop - Ağ Topolojisi Tanımlama yazısında detaylandırılmıştır.

Hadoop 1.x den itibaren bu strateji kullanıcı tarafından değiştirilebilir şekildedir.

10 Ekim 2014 Cuma

Hadoop Altyapısının Desteklediği Dosya Sistemleri

  • Local 
  • HDFS
    • Hadoop Distributed Filesystem
  • HFTP
    • HDFS 'e Http üzerinden sadece okuma amaçlı erişim sağlayan dosya sistemi
  • HSFTP
    • HDFS 'e Https üzerinden sadece okuma amaçlı erişim sağlayan dosya sistemi
  • WebHDFS
    • HDFS 'e Http üzerinden okuma ve yazma amaçlı güvenli erişim sağlayan dosya sistemi
  • HAR
  • KFS (Cloud-Store)
    • C++ ile yazılmış HDFS yada GFS benzeri dağıtık dosya sistemi
  • FTP
  • S3 (native)
    • Amazon S3 üzerinde bir dosya sistemi
  • S3(block based)
    • Amazon s3 üzerinde, dosyaları bloklarda saklayan dosya sistemi
  • Distributed RAID
    • HDFS'in RAID versiyonu
  • View
    • İstemci tarafında oluşturulan bir mount tablosudur

    Hadoop Namespace Image, Edit Log, Secondary Namenode

    Hadoop dosyalarını HDFS de saklamakta. HDFS de hangi dosyanın nerde tutulduğunu , replikalarının nerde olduğunu ise kalıcı olarak yerel diskde saklamakta. Bu bilgileri "namespace image" ve "edit log" dosyalarında tutmakta.

    Namenode gittiğinde hdfs'in metadası niteliğindeki bu bilgilerde kaybolacağından hdfs'i ayağa kaldırmak mümkün değildir. Bunu engellemek için hadoop üzerinde belirli ayarlamalar yapılarak bu bilgilerin yedeklenmesi amacıyla kalıcı başka bir diske de senkron olarak yazılması sağlanabilmektedir.

    Secondary namenode ise bir namenode değildir. Namenode da tutulan "namespace image" ve "edit log" dosyalarının belirli periyodlar ile merge edilmesini sağlamaktadır. Böylelikle "edit log" dosyalarının şişmesi engellenmiş olur.

    Secondary namenode farklı bir fiziksel makinada çalıştırılmalıdır. Ve en az namenode kadar belleğe sahip ve CPU gücü yüksek bir makina olmalıdır. Kendi içinde merge işleminden sonra "namespace image" in kopyasını tutmaktadır. Namenode çökerse bu yedekden de dönülebilir ancak veri kaybı yaşanması olasıdır.

    Namenode da bulunan verilerin bir NFS e senkron olarak kopyalanmasını sağlayarak, namenode çöktüğünde bu veriler secondary namenode 'a kopyalanıp burada sistemin ayağa kaldırılması veri kaybı yaşanmadan sistemin tekrar çalışmasını sağlayacaktır.

    Hadoop - Jop Map Sayısı

    Hadoop üzerinde bir job başlatıldığında kaç adet map task oluşturulacağı o job da kullanılacak data nın büyüklüğü ile orantılıdır. Hadoop veriyi HDFS'e atarken splitler şeklinde parçalayarak atmaktadır. Bir job başlatıldığında kullanacağı veri kaç splitten oluşuyor ise o kadar map taski oluşturulmakta. Bu işlem Hadoop altyapısının yönetiminde gerçekleşiyor. Programsal olarak map sayısınına müdahale edilememekte.

    Hadoop veriyi split olarak parçalarken 64 MB lık bölümler oluşturmakta. Ancak veri küçük dosyalardan oluşuyor ise 64 MB dan küçük splitler oluşacaktır. Bu durumda bir job çalıştırıldığında daha fazla map task açılmış olacaktır. Bu işlem de zamandan kayıba neden olacaktır.

    Bunu bir örnek ile açıklayacak olur isek:

    1. Durum

    2 adet 128 MB dosyamız olsun , bu dosyaları HDFS e attığımızda 4 adet split oluşacaktır.
    Tüm veriyi kullanan bir job çalıştırdığımızda 4 adet map task oluşacaktır.

    2. Durum

    16 adet 16 MB dosyamız olsun, bu dosyaları HDFS e attığımızda her bir dosya 64 MB dan küçük olduğundan hepsi için bir split yapılarak  16 adet split oluşacaktır.
    Tüm veriyi kullanan bir job çalıştırdığımızda 16 adet map task oluşacaktır.

    NOT: Bir job da çalışacak reduce sayısı değiştirilebilmektedir.