Upgrading Large Hadoop Cluster
Using R and Hadoop Bigdata
1. rmr2
2. rhdfs
3. HadoopStreamingR
4. Rhipe
5. h2o
6. SparkR
The links to documentation and tutorials for each of them are below.
All the packages work on the basis of Hadoop Streaming to run the work on cluster instead of single R node. If you are new to Hadoop read the basics of Hadoop Streaming on https://hadoop.apache.org/docs/stable2/hadoop-streaming/HadoopStreaming.html and short tutorial on writing jobs which run using Python. http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
Package Name
Useful Tutorials / Readings
rmr2
Web links
https://github.com/RevolutionAnalytics/rmr2/blob/master/docs/tutorial.md
Book
R in Nutshell 2nd edition ( Chapter 26 )
http://shop.oreilly.com/product/0636920022008.do
rhdfs
Wiki
https://github.com/RevolutionAnalytics/RHadoop/wiki/user%3Erhdfs%3EHome
HadoopStreamingR
Cran package documentation
https://cran.r-project.org/web/packages/HadoopStreaming/HadoopStreaming.pdf
Rhipe
Web links
http://tessera.io/docs-RHIPE/#install-and-push
h2o
Documentation using h2o from R
http://h2o-release.s3.amazonaws.com/h2o/rel-slater/1/docs-website/h2o-docs/index.html#%E2%80%A6%20From%20R
R h2o package documentation ( ~140 pages )
http://h2o-release.s3.amazonaws.com/h2o/rel-slater/1/docs-website/h2o-r/h2o_package.pdf
SparkR
Api
https://spark.apache.org/docs/latest/api/R/index.html
Documentation
https://spark.apache.org/docs/latest/api/R/index.html
Install and configure Httpsfs hadoop service
Clone the Hadoop code
git clone https://github.com/apache/hadoop.git
cd hadoop/hadoop-hdfs-project/hadoop-hdfs-httpfs/
mvn package -Pdist -DskipTests
cd target/
tar -cf hadoop-hdfs-httpfs-2.6.0.tar.gz hadoop-hdfs-httpfs-2.6.0
Go to server where you want to setup Https server
Extract the tar file
tar -xvf hadoop-hdfs-httpfs-2.6.0.tar.gz
cd hadoop-hdfs-httpfs-2.6.0
In your cluster manager ( e.g Ambari / Cloudera manager etc)
Make change to core-site.xml so that , httpfs user can do proxy
<property>
<name>hadoop.proxyuser.#HTTPFSUSER#.hosts</name>
<value>httpfs-host.foo.com</value>
</property>
<property>
<name>hadoop.proxyuser.#HTTPFSUSER#.groups</name>
<value>*</value>
</property>
Change #HTTPFSUSER# to extact user who will be starting httpfs service.
Restart cluster services
Copy core-site.xml and hdfs-site.xml from your cluster to /etc/hadoop directory
Start Httpfs
./sbin/httpfs.sh start
Check
curl -i "http://<HTTPFSHOSTNAME>:14000?user.name=YourHadoopusername&op=homedir"
Output should be
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{"homeDir":"http:\/\/<HTTPFS_HOST>:14000\/user\/YourHadoopusername"}
Operating system level tuning for Hadoop
THC should be disabled
echo never > /sys/kernel/mm/redhat_transparent_hugepage/defrag
echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
Add the above to /etc/rc.local
2)
The noatime mount for disks to speed up reads
3)
The open file limit ulimit to high number 32000
4)
Turn off caching on controller
5)
net.core.somaxconn=1024
Socket listen size
6)
Disable SE Linux
SELINUX=disabled
7)
umask 022
8)
sysctl.conf
vm.swappiness=0
vm.overcommit_memory = 1
vm.overcommit_ratio = 100
9)
Disable ip6
10)
Host DNS is properly set
References
http://www.slideshare.net/vgogate/hadoop-configuration-performance-tuning
Hadoop Yarn tuning calculator
I just tarted one excel sheet where we can plug in the nodes numnber of cores , disks , RAM and
it will give us the values for various yarn properties.
It is based on Hortonworks suggestions.
You can see it at
https://goo.gl/gzSz27
Hortonworks also has the python srcipt.
https://github.com/hortonworks/hdp-configuration-utils
But keep in mind the fact that python script does not takes into consideration for things
which are already running in machine.
Those things we can configure manually in excel sheet
Why my Hive Sqoop job is failing
Find few basics about cluster from your Administrator about cluster configuration.
Sample talk Example can be
How many nodes cluster has , what are its configuration
Answer can be
Each node has 120GB RAM . Out of that memory which we can ask for our jobs is about 80GB
We have 14 cpu cores in each datanode , we have 5 right now the maximum we can ask for processing from each datanode is 8 cores
Leaving rest for other processes like OS / Hadoop / Monitoring services
When we run any job in Mapreduce world you will get minimum RAM of 2GB and Max any task can ask for is 80GB ( See the capacity above)
Given the fact that we are running big jobs for one off loads please tell the system to give you higher RAM for your job. ( RAM in increments of 1024 MB)
Besides RAM you can also ask how many CPU cores you want.
The max cores which given node can provide is 8 for processing
This can be controlled via following parameters
Hive jobs
mapreduce.map.memory.mb
mapreduce.reduce.memory.mb
mapreduce.map.java.opts
mapreduce.reduce.java.opts
mapreduce.map.cpu.vcores
mapreduce.reduce.cpu.vcores
Typically if your jobs fails while inserting to hive query please see check if you need to tune any memory parameter. Hive insert jobs are reduce jobs.
Since you are inserting large amount of data in one go you will face the issues of memory overrun.
Always see the logs , its always mentioned there why the job is failing.
Sqoop jobs
Sqoop jobs spawn only map jobs.
So if Sqoop job is not moving through the indicator is following
Memory issue on our side.
So just add the following parameter in the code
-Dmapreduce.map.memory.mb=5120 -Dmapreduce.map.speculative=false
Tune the above 5120 parameter to based no need.
Where to see logs and job status
You can see whats the status of your job and logs at Resource manager
http://resource:8088/cluster
You can also login to Ambari to see what value has been set as default for given property
http://ambari:8080
Ask for Username and password with readonly access from your administrator
Find out what current default values are
mapreduce.map.java.opts=-Xmx5012m
mapreduce.reduce.java.opts=-Xmx6144m
mapreduce.map.memory.mb=4096
mapreduce.reduce.memory.mb=8192
What if , my job is not even accepted by the cluster :)
You are asking for resources which cluster don't have. Means its crossing the max limit of the cluster. So check with your job what its really asking for and what the cluster can provide.
Why is my job being killed ?
If your job is crossing the resource limit which it has originally asked for from the RM the Yarn will kill your job.
You will see something like below in logs
Killing container....
Remember Google is your friend
Moment you see your job has failed see the error from the above logs and search in Google , try to find which parameters people have suggested to change in the job.
Resources
https://altiscale.zendesk.com/hc/en-us/articles/200801519-Configuring-Memory-for-Mappers-and-Reducers-in-Hadoop-2
Hadoop conf files in Pivotal Hadoop
Design and Architecture considerations for storing time series data
Design and Architecture considerations for storing time series data
Know your Access patterns in advance
- Are we going to do analysis on full day of data or data for just one hour. Having advance notes on what would be the use cases with which data will be used is highly recommended.
- The granularity of information required by the client application helps deciding underlining data model for storing the information.
- Frequency with which data is generated
- Identify the speed with which time is being produced by the source system. Do you produce multiple data points every second.
Though we might need to persist all the time series data but more than often we don't need to store each data point as a separate record in the database.
Most of the time series problems are similar in nature and predominant issues comes when we need to Scale the system . Making the systems evolve with changing schema is another dimension which adds to the complexity. All the problem show similar patterns with only variations in the data model.
If we can define a time window and store all the readings for that period of time as an array then
we can significantly cut the number of actual records persisted in the database, improving the performance of the system.
For example
Stocktock tick information is generated once a second for each stock i.e. 86K ticks per stock per day.
If we store that many records as separate rows then the time complexity to access this information would be huge, so we can group 5 minutes or 1 hour or one day worth of records as a single vector record.
The benefits of storing information in larger chunks is obvious as you would do way fewer lookups into the
NoSQL store to fetch the information for a specific period of time. Other point to remember is that
if your window size is very small then you will be doing a lot of read/write operation and if it is
too big then durability would be a concern as you can lose the information in the event of system failure.
So you need to balance out both the forces.
There is no single size fit each time series problem is different.Fine tune the system based on the requirements and the access patterns.
If the access pattern changes in future then you might have to re-index, re-calculate the array size to optimize your queries.
So each time-series application is very custom made, where you can apply the best practices but cannot just import the data modelling templates to a different time-series problem.
Links
http://www.infoq.com/articles/nosql-time-series-data-management
https://blog.twitter.com/ko/node/4403
Migrating Large Hadoop Cluster
- Testing of the sample production jobs on new cluster
- Total Automation and setup of new cluster via Cloudera manager API
- Code Migration of over 300 production oozie jobs which pump data into cluster and produce scores
- Trickle feed resuming the jobs in batches
- Roaster for shifts in which people will work to complete the activity within one weekend
Architecture pattern for real time processing in Bigdata
The flow is as follows
1) Real time data is ingested into system via stream ingestion tools like Flume , Kafta or Samza. The choice of which tool to be used among these dependent on factors like do we need event guaranteed delivery , are we bothered about sequence of events. etc. This is topic for another posr
2) Apply the processing required on the incoming data via Spark streaming api and make it ready to be consumable by third party apps
3) Using the REST gateway job server of Spark , third party apps trigger further spark jobs to process the data and return results back to the applications.
Does not contain a valid host:port authority: logicaljt
Making oozie hbase work with Kerberos enabled cluster
<credentials>
<credential name='hbaseauth' type='hbase'>
</credential>
</credentials>
Within any action add the details about credentials.
<action name="process" cred="hbaseauth">
Also add details about hbase-site.xml
<job-xml>${hbaseSite}</job-xml>
<file>${hbaseSite}#hbase-site.xml</file>
Complete example
<action name="process" cred="hbaseauth">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>${hbaseSite}</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>${process_classname}</main-class>
<file>${hbaseSite}#hbase-site.xml</file>
<capture-output/>
</java>
<ok to="success"/>
<error to="failed"/>
</action>
Hadoop 2.3 Centralized Cache feature comparison to Spark RDD
- Support for Heterogeneous Storage Hierarchy in HDFS (HDFS-2832)
- In-memory Cache for data resident in HDFS via Datanodes (HDFS-4949)
Teradata meets Hadoop SQL-H
Where is hadoop examples jar
Build and compile Hadoop from source
apt-get -y install maven build-essential protobuf-compiler autoconf automake
libtool cmake zlib1g-dev pkg-config libssl-dev
Checkout code from repo
svn co http://svn.apache.org/repos/asf/hadoop/common/trunk/ hadoop
cd hadoop/
mvn compile
This gave me
[ERROR] Could not find goal 'protoc' in plugin org.apache.hadoop:hadoop-maven-plugins:3.0.0-SNAPSHOT among available goals -> [Help 1]
https://issues.apache.org/jira/browse/HADOOP-9383
Then to debug i re run with
mvn compile -X
I found in log that i did not had version 2.5 of protobuf required to build hadoop
Follow the steps here to install protobuf on your system
http://jugnu-life.blogspot.com/2013/09/install-protobuf-25-on-ubuntu.html
Check the installed version of protobuf , its also documented in
https://svn.apache.org/repos/asf/hadoop/common/trunk/BUILDING.txt
Okay after fixing above ( if required )
Here is final build command
mvn package -Pdist -DskipTests -Dtar
Further reading
http://wiki.apache.org/hadoop/HowToContribute
Hadoop get filesize and directory from ls command
You can just change the numbers above for getting different information
Example
hadoop fs -ls /my/folder | awk '{print $6, $8}' > only_date_directory.txt
Hive Shark Impala Comparison
This post is nothing but reproduce of work done here at AmpLabs. If you want latest and detailed read i suggest you to go there.
Bigdata world is so beautiful , the research in this field is driving at such a fast pace that eventually BigData is no more synonymous with long running queries , its becoming LiveData everywhere.
The work compares the computation time of Redshift , Hive , Impala , Shark with different types of queries
The performance of Shark in memory has been consistent in all 4 types. It would be interesting to see the comparison when Hive 0.11 is used as it also adds few performance improvements driven by work at Hortonworks.
Open Stack meets Hadoop
There has been good announcements this week.
Open Stack allows us to create cloud inside the corporate walls using solid foundations of standards setup world wide with collaboration of many good companies.
Now Mirantis has announced a project which allows you to provision Hadoop based clusters easily within organization. IT uses Ambari as its background engine to do all this magic.
Quoting some of the information and architecture at official website.
Read more at
Summary of above components
The Savanna product communicates with the following OpenStack components:
- Horizon - provides GUI with ability to use all of Savanna’s features;
- Keystone - authenticates users and provides security token that is used to work with the OpenStack, hence limiting user abilities in Savanna to his OpenStack privileges;
- Nova - is used to provision VMs for Hadoop Cluster;
- Glance - Hadoop VM images are stored there, each image containing an installed OS and Hadoop; the pre-installed Hadoop should give us good handicap on node start-up;
- Swift - can be used as a storage for data that will be processed by Hadoop jobs.
The main advantages include
- You can choose which Hadoop version to run
- Run everything against existing infrastructure
- fast provisioning of Hadoop clusters on OpenStack for Dev and QA;
- utilization of unused compute power from general purpose OpenStack IaaS cloud;
Read the link above for awesome details.
Thanks for reading.
Please leave your comments below , or connect with me via Linkedin.
Cloudera Certified Specialist in Apache HBase CCSHB exam topics and syllabus
Test Name: Cloudera Certified Specialist in Apache HBase
Current Version: CCB-400
Number of Questions: 45
Time Limit: 90 minutes
Passing Score: 69%
Languages: English, Japanese
Core HBase Concepts
Recognize the fundamental characteristics of Apache HBase and its role in a big data ecosystem. Identify differences between Apache HBase and a traditional RDBMS. Describe the relationship between Apache HBase and HDFS. Given a scenario, identify application characteristics that make the scenario an appropriate application for Apache HBase.
Data Model
Describe how an Apache HBase table is physically stored on disk. Identify the differences between a Column Family and a Column Qualifier. Given a data loading scenario, identify how Apache HBase will version the rows. Describe how Apache HBase cells store data. Detail what happens to data when it is deleted.
Architecture
Identify the major components of an Apache HBase cluster. Recognize how regions work and their benefits under various scenarios. Describe how a client finds a row in an HBase table. Understand the function and purpose of minor and major compactions. Given a region server crash scenario, describe how Apache HBase fails over to another region server. Describe RegionServer splits.
Schema Design
Describe the factors to be considered with creating Column Families. Given an access pattern, define the row keys for optimal read performance. Given an access pattern, define the row keys for locality.
API
Describe the functions and purpose of the HBaseAdmin class. Given a table and rowkey, use the get() operation to return specific versions of that row. Describe the behavior of the checkAndPut() method.
Administration
Recognize how to create, describe, and access data in tables from the shell. Describe how to bulk load data into Apache HBase. Recognize the benefits of managed region splits.
Sample Questions
Question 1
You want to store the comments from a blog post in HBase. Your data consists of the following:
a. the blog post id
b. the name of the comment author
c. the body of the comment
d. the timestamp for each comment
Which rowkey would you use if you wanted to retrieve the comments from a scan with the most recent first?
A. <(Long)timestamp>
B. <blog_post_id><Long.MAX_VALUE – (Long)timestamp>
C. <timestamp><Long.MAX_VALUE>
D. <Long.MAX_VALUE><timestamp>
Question 2
Your application needs to retrieve 200 to 300 non-sequential rows from a table with one billion rows. You know the rowkey of each of the rows you need to retrieve. Which does your application need to implement?
A. Scan without range
B. Scan with start and stop row
C. HTable.get(Get get)
D. HTable.get(List<Get> gets)
Question 3
You perform a check and put operation from within an HBase application using the following:
table.checkAndPut(Bytes.toBytes("rowkey"),
Bytes.toBytes("colfam"),
Bytes.toBytes("qualifier"),
Bytes.toBytes("barvalue"), newrow));
Which describes this check and put operation?
A. Check if rowkey/colfam/qualifier exists and the cell value "barvalue" is equal to newrow. Then return “true”.
B. Check if rowkey/colfam/qualifier and the cell value "barvalue" is NOT equal to newrow. Then return “true”.
C. Check if rowkey/colfam/qualifier and has the cell value "barvalue". If so, put the values in newrow and return “false”.
D. Check if rowkey/colfam/qualifier and has the cell value "barvalue". If so, put the values in newrow and return “true”.
Question 4
What is the advantage of the using the bulk load API over doing individual Puts for bulk insert operations?
A.Writes bypass the HLog/MemStore reducing load on the RegionServer.
B.Users doing bulk Writes may disable writing to the WAL which results in possible data loss.
C.HFiles created by the bulk load API are guaranteed to be co-located with the RegionServer hosting the region.
D.HFiles written out via the bulk load API are more space efficient than those written out of RegionServers.
Question 5
You have a “WebLog” table in HBase. The Row Keys are the IP Addresses. You want to retrieve all entries that have an IP Address of 75.67.12.146. The shell command you would use is:
A. get 'WebLog', '75.67.21.146'
B. scan 'WebLog', '75.67.21.146'
C. get 'WebLog', {FILTER => '75.67.21.146'}
D. scan 'WebLog', {COLFAM => 'IP', FILTER => '75.67.12.146'}
Answers
Question 1: B
Question 2: D
Question 3: D
Question 4: A
Question 5: A
