Using R and Hadoop Bigdata

Following packages helps working with Bigdata from R.

1. rmr2
2. rhdfs
3. HadoopStreamingR
4. Rhipe
5. h2o
6. SparkR

The links to documentation and tutorials for each of them are below.

All the packages work on the basis of Hadoop Streaming to run the work on cluster instead of single R node. If you are new to Hadoop read the basics of Hadoop Streaming on https://hadoop.apache.org/docs/stable2/hadoop-streaming/HadoopStreaming.html and short tutorial on writing jobs which run using Python. http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

Package Name

Useful Tutorials / Readings

rmr2

Web links



https://github.com/RevolutionAnalytics/rmr2/blob/master/docs/tutorial.md



Book

R in Nutshell 2nd edition ( Chapter 26 )

http://shop.oreilly.com/product/0636920022008.do




rhdfs

Wiki

https://github.com/RevolutionAnalytics/RHadoop/wiki/user%3Erhdfs%3EHome


HadoopStreamingR


Cran package documentation

https://cran.r-project.org/web/packages/HadoopStreaming/HadoopStreaming.pdf


Rhipe


Web links

http://tessera.io/docs-RHIPE/#install-and-push




h2o

Documentation using h2o from R

http://h2o-release.s3.amazonaws.com/h2o/rel-slater/1/docs-website/h2o-docs/index.html#%E2%80%A6%20From%20R

R h2o package documentation ( ~140 pages )

http://h2o-release.s3.amazonaws.com/h2o/rel-slater/1/docs-website/h2o-r/h2o_package.pdf


SparkR

Api

https://spark.apache.org/docs/latest/api/R/index.html

Documentation

https://spark.apache.org/docs/latest/api/R/index.html

java.lang.NoClassDefFoundError: org/apache/spark/sql/types/AtomicType

15/09/19 12:17:10 INFO DAGScheduler: Job 0 finished: take at CsvRelation.scala:174, took 1.900011 s
Error: application failed with exception
java.lang.NoClassDefFoundError: org/apache/spark/sql/types/AtomicType
at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:155)
at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:70)
at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:138)


If using Spark-csv with Spark 1.3.0 , with spark-csv type inference this error is thrown




Install R package behind the proxy


Sys.setenv(http_proxy="http://username:PASSWORD@proxyurl:8080")

install.packages("RPostgreSQL", dependencies=TRUE)

Initial job has not accepted any resources; check your cluster UI

15/09/19 10:28:44 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

This message in log means that Hadoop cluster does not have resources which are asked by Job.

I submitted Spark job on my test local cluster , and got this message.

Additional logs also show following , 

15/09/19 10:23:20 INFO yarn.YarnRMClient: Registering the ApplicationMaster  15/09/19 10:23:20 INFO yarn.YarnAllocator: Will request 2 executor containers, each with 1 cores and 1408 MB memory including 384 MB overhead  15/09/19 10:23:20 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:1408, vCores:1>)  15/09/19 10:23:20 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:1408, vCores:1>)  15/09/19 10:23:20 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 5000, initial allocation : 200) intervals
My cluster had only 3 GB RAM so Yarn cannot allocated what Spark was asking

Update R packages

update.packages(ask = FALSE,lib = '/usr/lib64/R/library',repos = "http://local/cran-mirror",dependencies = TRUE)

Insert data into Hive from spark

Using the SchemaRDD / DataFrame API via HiveContext

Assume you're using the latest code, something probably like:

val hc = new HiveContext(sc)
import hc.implicits._
existedRdd.toDF().insertInto("hivetable")
or

existedRdd.toDF().registerTempTable("mydata")
hc.sql("insert into hivetable as select xxx from mydata")

Install R package only if not installed

options(echo=TRUE) # if you want see commands in output file
args <- commandArgs(trailingOnly = TRUE)
print(args)
# trailingOnly=TRUE means that only your arguments are returned, check:
# print(commandsArgs(trailingOnly=FALSE))

# Install only if not already installed
# http://stackoverflow.com/questions/9341635/check-for-installed-packages-before-running-install-packages
pkgInstall <- function(x)
{
if (!require(x,character.only = TRUE))
{
install.packages(pkgs = x,lib = '/usr/lib64/R/library',repos = "http://local/cran-mirror",dependencies = TRUE,verbose = FALSE,quiet = FALSE)
if(!require(x,character.only = TRUE)) stop("Package not found")
}
}

pkgInstall(args[1])


Save the above script as installpackage.R

Use it as

sudo Rscript installpackage.R pkgname

Install and configure Httpsfs hadoop service

Build HTTPFs package

Clone the Hadoop code

git clone https://github.com/apache/hadoop.git
cd hadoop/hadoop-hdfs-project/hadoop-hdfs-httpfs/
mvn package -Pdist -DskipTests
cd target/
tar -cf hadoop-hdfs-httpfs-2.6.0.tar.gz hadoop-hdfs-httpfs-2.6.0

Go to server where you want to setup Https server

Extract the tar file

tar -xvf hadoop-hdfs-httpfs-2.6.0.tar.gz
cd hadoop-hdfs-httpfs-2.6.0

In your cluster manager ( e.g Ambari / Cloudera manager etc)

Make change to core-site.xml so that , httpfs user can do proxy

<property>
<name>hadoop.proxyuser.#HTTPFSUSER#.hosts</name>
<value>httpfs-host.foo.com</value>
</property>
<property>
<name>hadoop.proxyuser.#HTTPFSUSER#.groups</name>
<value>*</value>
</property>

Change #HTTPFSUSER# to extact user who will be starting httpfs service.

Restart cluster services

Copy core-site.xml and hdfs-site.xml from your cluster to /etc/hadoop directory

Start Httpfs

./sbin/httpfs.sh start

Check

curl -i "http://<HTTPFSHOSTNAME>:14000?user.name=YourHadoopusername&op=homedir"

Output should be

HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked

{"homeDir":"http:\/\/<HTTPFS_HOST>:14000\/user\/YourHadoopusername"}