19 Kasım 2014 Çarşamba

Hadoop Bellek Kullanımı

Varsayılan Değerler

Her bir makinada 2 ader map ve reduce task ayağa kalkar.

DataNode : 1000MB
TaskTracker:1000MB
1 Map Task : 200MB (-Xmx200m)
1 Reduce Task : 200MB (-Xmx200m)

Toplamda: 1000+1000+200*2+200*2= 2800MB bellek gerekir.

*****************************************
1 task'ın kullanabileceği bellek "mapred.child.java.opts" parametresi ile değiştirilebilir. Aynı anda çalışabilecek task sayısı makinadaki işlemci sayısı ile doğru orantılıdır. 5 çekirdekli bir makinada aynı anda 4 adet task çalışabilir(1 slotu asktracker ve datanode kullanır). Bu durumda 200*4 map+ 200*4 reduce+1000datanode+100tasktracker= 3600  MB belleğe ihtiyaç olur.


18 Kasım 2014 Salı

HDFS Açılırken ve Kapanırken Adımlar

start-dfs.sh script'i çalıştırılır:

  • script hangi makinada çalışır ise o makina NameNode ayağa kaldırılır.
  • slaves dosyasında tanımlı tüm  makinalarda DataNode servisleri ayağa kaldırılır.
start-mapred.sh script'i çalıştırılır:

  • script hangi makinada çalışır ise o makina JobTracker ayağa kaldırılır.
  • slaves dosyasında tanımlı tüm  makinalarda TaskTracker servisleri ayağa kaldırılır.
Sistem kapatılırken stop-mapred.sh ve stop-dfs.sh scriptleri çalıştırılır. Sistem tarafından sırası ile TaskTracker servisi, JobTracker servisi, DataNode servisi ve en son NameNode servisi kapatılır.

Hadoop Konfigürasyon Yönetimi


  • Hadoop kümesindeki konfigürasyon dosyaları herbir düğümde ayrı ayrı tutulmakta. Bu dosyaların aynı değerlere sahip olmasını istediğimizde rsync, pdsh yada  dsh gibi komutlar ile dağıtım yapabiliriz.
  • Komut sistemi ile dosyalar dağıtıldığında kapalı durumda olan makinalara ilgili dosyalar iletilmemiş olacaktır. Bu durumda aynı konfigürasyonların tüm düğümlerde aktif olduğuda herzaman garanti edilemez. Bu gibi açıklardan dolayı, komutlar satırı ile dosyaların dağıtılmasından daha gelişmiş olarak Chef, Puppet, cfengine ve bcfg2 gibi araçlarda konfigürasyon yönetimi için kullanılabilir.
  • Bir başka uygulanabilecek yaklaşım ise hadoop-env.sh dosyasında "HADOOP_MASTER" parametresine namenode yada jobtracker ip adresi tanımlanır. Datanode makinalarında servisler başlatılırken konfigürasyonm dosyaları HADOOP_MASTER olarak tanımlanmış olan makinadan konfigürasyonları çekerler ve sonra servisler başlatılır. Bu işlemin aktif olabilmesi için hadoop-env.sh dosyası  HADOOP_MASTER parametresi tanımlandıktan sonra tüm datanodelara dağıtılması gerekir. Yeni eklenen makinalardada bu özelliğin aktif olabilmesi için kickstart scriptlerine eklenmelidir. Bu özellik aktif edildiğinde datanode'lar ayağa kalkarken aynı anda HADOOP_MASTER olarak tanımlı makinaya rsync yapmaya çalışırlar. Büyük ölçekli hadoop kümelerinde bu fazlasıyla yük getirecek ve hatalara sebep olacaktır. Bunun engellenebilmesi için "HADOOP_SLAVE_SLEEP" parametresine 0.1 saniye gibi bir değer verilir. Böylelikle datanode'lar 0.1 saniye aralıklar ile ayağa kaldırılır.
Hadoop namenode, secondarynamenode ve datanode makinalarının IP lerini konfigürasyon dosyalarından öğrenmektedir:
  • masters: secondary namenode IP si bu dosyada tanımlanır.
  • slaves  : datanode makinalarının IP leri bu dosyada tanımlanır.
Bu iki dosyanın sadece namenode ve jobtracker servislerinin çalışacağı makinalarda olması yeterlidir. Datanode'lara dağıtılmasına gerek yoktur.

DataNode ve TaskTracker servislerinin hangi makinalarda çalışacağını slaves dosyasında tanımlayabiliryoruz. Ancak bu durumda Hadoop sistemi çalışmaya devam ederken bir makinayı çıkartma yada ekleme işlemini (Commissioning, Decommissioning) gerçekleştiremiyoruz. Bu işlemleri sağlıklı bir şekilde yapabilmek için :

  • mapred-site.xml dosyasında  "mapred.hosts" özelliğine tasktracker servislerinin çalışacağı makinaları tanımlayacağımız dosya adı atanır.
  • mapred-site.xml dosyasında  "mapred.hosts.exclude" özelliğine tasktracker servisini kapatmak istediğimiz makinaları tanımlayacağımız dosya adı atanır.
  • hdfs-site.xml dosyasında  "dfs.hosts" özelliğine datanode servislerinin çalışacağı makinaları tanımlayacağımız dosya adı atanır.
  • hdfs-site.xml dosyasında  "dfs.hosts.exclude" özelliğine datanode servisini kapatmak istediğimiz makinaları tanımlayacağımız dosya adı atanır.
Ek Konfigürasyonlar:
  • Hadoop varsayılan ayarlarında I/O buffer olarak 4 KB kullanmaktadır. Ancak bu değer genellikle 128 KB olarak ayarlanır. Bu değeri core-site.xml dosyasındaki "io.file.buffer.size" özelliği ile değiştirilebilmektedir.
  • HDFS Block size varsayılan ayarlarda 64 MB  olarak kullanılmaktadır. Ancak genellikle namenode' a fazla yük bindirmemek amacı ile ve map task'lara daha fazla veri vermek amacı ile bu değer 128 MB olarak kullanılmaktadır. Bu değer hdfs-site.xml  dosyasındaki dfs.block.size özelliği  ile değiştirilebilmektedir.
  • HDFS silinen verileri doğrudan silmemekte, çöp kutusuna taşımaktadır. Belirli aralıklar ile çöp kutusu boşaltılmaktadır. Çöp kutusunun hangi aralıklar ile boşaltılacağı core-site.xml dosyasındaki fs.trash.interval özelliği ile belirlenmekte. Varsayılan ayarlarında bu değer "0" olarak  kullanıldığından veriler silindiğinde çöp kutusuna atılmamakta. Verilerin doğrudan silinmesini istemediğimiz durumlarda bu değer değiştirilmelidir.

Hadoop - Ağ Topolojisi Tanımlama

Hadoop sisteminin yedek verileri (replica) dağıtırken ağ topolojisini göz önünde bulundurduğunu
Hadoop - Yedek Verilerini (Replica) Neye Göre Konumlandırılıyor? yazısında belirtmiştik. Peki Hadoop bizim ağ yapımızı nerden biliyor?
Hadoop sistemine ağ topolojimizi bizim tanımlamamız gerekiyor. Aksi durumda tüm sunucuların default-rack  üzerinde bulunduğunu varsayıyor. Bu tanımlamayı yapabilmek için "topology.script.file.name"  parametresinde belirtilen dosya adında bir dosya oluşturularak hadoop_conf dizinine bırakılır. Bu dosyanın örneği:

HADOOP_CONF=/etc/hadoop/conf 

while [ $# -gt 0 ] ; do
  nodeArg=$1
  exec< ${HADOOP_CONF}/topology.data 
  result="" 
  while read line ; do
    ar=( $line ) 
    if [ "${ar[0]}" = "$nodeArg" ] ; then
      result="${ar[1]}"
    fi
  done 
  shift 
  if [ -z "$result" ] ; then
    echo -n "/default/rack "
  else
    echo -n "$result "
  fi
done 

Bu dosyada belirtilen "topology.data" dosyası hazırlanır ve burada hangi sunucunun hangi rack üzerinde olduğu belirtilir.

hadoopdata1.ec.com     /dc1/rack1
hadoopdata1            /dc1/rack1
10.1.1.1               /dc1/rack2

Örnekler: http://wiki.apache.org/hadoop/topology_rack_awareness_scripts adresinden alınmıştır.

Hadoop MapReduce Dosyalar ile Çalışma

Hoddop kümseinde paralel olarak çalışacak mapReduce programlarında bazen dışarıdan verilere ihtiyaç duyulabilir. Bu durumda join işlemlerinin getireceği maliyet gözönünde bulundurularak bu tür kayıtlar çalışacak Job'a konfigürasyon  dosyası olarak eklenir. Hadoop bir job başlatılırken "-files" "-archives", "-jars" ile gönderilen parametreleri Distributed Cache'e yükler. TaskTracker "run" metodunu çağırmadan önce Distributed Cache deki dosyaları lokal diske kopyalar.  Böylelikle bu dosyalar Hadoop Distributed cache'e atılacağından program içerisinden rahatlık ile kullanılabilir.

Bir dosyayı çalışma anında bir job'da kullnabilmek için job başlatılırken "-files dosyaAdi.txt" şeklinde parametre olarak geçilebilir. Birden fazla dosya yüklenecek ise virgül ile ayrılır. Bu işlem kod içerisinden de "addCacheFile" gibi metodlar ile gerçekleştirilebilir. Yüklenen bu dosyaların Job içerisindeki tanımlanan "setup"   methodları içerisende "new File (dosyaAdi.txt)" şeklinde çağırılıp kullanılabilir. Ya da kod içerisinde "context.getLocalCacheFiles()" yada eski API de bulunan "DistributedCache.getLocalCacheFiles(conf)" metodları ile bu dosyalara erişilip kullanılabilir.

Tüm bunlara ek olarak dosyalar Distributed Cachede bulunan dosyalar lokal diske yazılırken, "-libjar"  parametresi ile tanımlanmış dosyalar task'ın classpath'ine eklenir. Task tracker aynı zamanda cache de bulunan bir dosyanın kaç task tarafından kullanıldığı ve kullanımının sona erip ermediği bilgisinide tutar. Bir dosya hiç bir task tarafından kullanılmıyor ise silinecekler listesine eklenir ve cache size 10 GB'a eriştiğinde silinir. 10 GB değeride parametrik olarak "local.cache.size" parametresi ile tanımlanabilmektedir.


12 Kasım 2014 Çarşamba

Hadop Kullanıcı Oluşturma Kota Tanımlama

Kullanıcı Tanımlama

hadoop fs -mkdir /user/yeniKullanici
hadoop fs -chown yeniKullanici/yeniKullanici /user/yeniKullanici

Kullanıcıya Kota Atama

hadoop dfsadmin -setSpaceQuota 1t /user/yeniKullanici

11 Kasım 2014 Salı

MapReduce Sayaçlar (Counter)

MapReduce programları çalışırken sayaç (counter) kayıtlarındaki değerle çalışma esnasındaki gerçekleşen işlemler ile ilgili bize çeşitli bilgiler sunabilir. Hadoop altyapısında bulunan standart sayaçlara ek olarak kullanıcıların ihtiyaçları doğrultusunda yeni sayaçlarda tanımlanabilmektedir.

Tanımlanan sayaçlar Java enum yapısında olmakta. Örn:

public class OdemeAnaliz extends Configured implements Tool {

//Sayaç tanımı
enum OdemeTuru{
KrediKarti,
Nakit
}

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> 
 {......
//Sabit Sayaç
context.getCounter(OdemeTuru.KrediKarti).increment(1);
......
//Dinamik Sayaç
context.getConter("OdemeYil", getOdemeTarih()).increment(1);
......
  }
}

Örnekte 2 adet sabit 1 adet dinamik sayaç tanımlandı. Sabit enum olarak tanımlanan sayaçlar jobTracker arayüzünde enumda tanımlandığı ismi ile görülmekte. Bu ekilde görüntülenmesi çok kullanıcı dostu değil. Ekranda bu sayaçların farklı şekilde görüntülenmesini istersek bir .properties dosyası oluşturularak burada tanımlamalar yapılmakta. Bu dosya sayacın tanımlandığı sınıf ile aynı dizine konulmalı.
Bu dosyanın isimlendirmesi sınıfIsmi_enumIsmi.properties kalıbında olmalı. Dostya içindeki tanımlamalar ise aşağıdaki kalıpta olmalı:

CounterGroupName=Ödeme Türleri
KrediKarti.name=Kredi Kartı ile Ödeme Sayısı
Nakit.name=Nakit Ödeme Sayısı

Bu tür bir dosya tanımı yapıarak sistemin farklı dilleri desteklemesi sağlanabilmekte. İngilizce için sınıfIsmi_enumIsmi_en_EN.properties Türkçe için sınıfIsmi_enumIsmi_tr_TR.properties olarak dosyalar tanımlanarak sistemin farklı dillerde sayaç bilgilerini sunması sağlanabilmekte.

3 Kasım 2014 Pazartesi

Hadoop - Dosyaların Split Olarak Bölünmeden Atılması

Bazı durumlarda HDFS e atılan dosyaların bütün olarak işlenmesi gerekebilir. Çözüm olacak iki yöntem mevcut:

  1. Split size min. değeri verilebilecek en yüksek değer olarak(Long.MAX_VALUE) yada en büyük boyuttaki dosya değeri olarak ayarlanır.
  2. TextInputFormat sınıfından yeni bir sınıf oluşturulup isSplittable metodu override edilerek false değeri döndürülür.

Hadoop - Küçük Dosyalar

Hadoop büyük boyutlardaki dosyalar üzerinde daha verimli çalışmaktadır. Bunun nedenlerinden bazıları:

  • Küçük dosyalar blok boyutundan daha küçük olacağından bloklar verimli kullanılmamaış olacak ve bir çok blok ve split oluşacaktır.
  • Daha fazla split oluşacağından dolayı, bloklarda az veri olsada çok sayıda map task oluşacak ve zamandan kayıp olacaktır.
  • Çok fazla blok oluştuğundan namenode belleğinde daha fazla yer işgal edilecektir.
Bu problemleri ortadan kaldırmak için dosyalar ilk HDFS e atılırken FileInputFormat olarak atılabilir, eğer dosyalar zaten HDFS de ise CombineInputFormat kullanılır. 

Eğer map tasklar çok kısa sürede tamamlanıyor ise split size max. seviyesine ayarlanarak CombineInputFormat kullanmak zamandan kazanç sağlayacaktır.