Giving example of how to use Chain map class in hadoop to call code in sequence.
I am writing comments in between to explain
public class ChainDriver {
public static Logger log = Logger.getLogger(ChainDriver.class);
/**
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// Start main Chain Job and declare its conf and job
Configuration chainConf = new Configuration();
Job chainJob = Job.getInstance(chainConf);
// Variable names kept like conf1 etc to make code less cluttered
// Start Mapper for MyMapperA
Configuration conf1 = new Configuration(false);
// Example for Passing arguments to the mappers
conf1.set("myParameter", args[2]);
ChainMapper.addMapper(chainJob, MyMapperA.class,
LongWritable.class, Text.class, Text.class, Text.class, conf1);
// Start Mapper for Second replacement
Configuration conf2 = new Configuration(false);
// Dynamically take the class name from argument to make more Dynamic chain :)
// (MapperC OR MapperD)
ChainMapper.addMapper(chainJob,
(Class<? extends Mapper>) Class.forName(args[2]), Text.class,
Text.class, NullWritable.class, Text.class, conf2);
// Set the parameters for main Chain Job
chainJob.setJarByClass(ChainDriver.class);
FileInputFormat.addInputPath(chainJob, new Path(args[0]));
FileOutputFormat.setOutputPath(chainJob, new Path(args[1]));
System.exit(chainJob.waitForCompletion(true) ? 0 : 1);
}
}
Now in details few important points
1)
Configuration conf1 = new Configuration(false);
The Sub mappers configuration objects are initiated with boolean false
http://hadoop.apache.org/docs/r2.0.3-alpha/api/src-html/org/apache/hadoop/conf/Configuration.html#line.518
Using constructor
I am writing comments in between to explain
public class ChainDriver {
public static Logger log = Logger.getLogger(ChainDriver.class);
/**
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// Start main Chain Job and declare its conf and job
Configuration chainConf = new Configuration();
Job chainJob = Job.getInstance(chainConf);
// Variable names kept like conf1 etc to make code less cluttered
// Start Mapper for MyMapperA
Configuration conf1 = new Configuration(false);
// Example for Passing arguments to the mappers
conf1.set("myParameter", args[2]);
ChainMapper.addMapper(chainJob, MyMapperA.class,
LongWritable.class, Text.class, Text.class, Text.class, conf1);
// Start Mapper for Second replacement
Configuration conf2 = new Configuration(false);
// Dynamically take the class name from argument to make more Dynamic chain :)
// (MapperC OR MapperD)
ChainMapper.addMapper(chainJob,
(Class<? extends Mapper>) Class.forName(args[2]), Text.class,
Text.class, NullWritable.class, Text.class, conf2);
// Set the parameters for main Chain Job
chainJob.setJarByClass(ChainDriver.class);
FileInputFormat.addInputPath(chainJob, new Path(args[0]));
FileOutputFormat.setOutputPath(chainJob, new Path(args[1]));
System.exit(chainJob.waitForCompletion(true) ? 0 : 1);
}
}
Now in details few important points
1)
Configuration conf1 = new Configuration(false);
The Sub mappers configuration objects are initiated with boolean false
http://hadoop.apache.org/docs/r2.0.3-alpha/api/src-html/org/apache/hadoop/conf/Configuration.html#line.518
Using constructor
public Configuration(boolean loadDefaults)
loadDefaults
- specifies whether to load from the default files2)
Passing arguments
conf1.set("myParameter", args[2]);
You can use same code as we use in any Driver class
3)
ChainMapper.addMapper(chainJob, MyMapperA.class,
LongWritable.class, Text.class, Text.class, Text.class, conf1);
The method signature is like this
public static void addMapper(Job job,
Class<? extends Mapper> klass,
Class<?> inputKeyClass,
Class<?> inputValueClass,
Class<?> outputKeyClass,
Class<?> outputValueClass,
Configuration mapperConf)
The Job argument here is Job object of main Driver , chainJob
Then we tell which mapper to start and key value pairs as used by Mapper
Last argument is of Conf of Mapper being called
4)
You can call as many mappers , Reducers in chain but one thing to be kept in mind is that output of previous mapper ( or reducer) must be consumable directly by next in chain.
For example
Map 1
Map 2
are called in chain
If map 1 emits ,
Text as Key
Long as Value
Then
Map 2 should have
Text as Key Input
and Long and Value Input
The framework will not do any conversions for you.
5)
http://hadoop.apache.org/docs/r2.0.3-alpha/api/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.html
The ChainMapper class allows to use multiple Mapper classes within a single Map task.
Quoting from Javadocs
The key functionality of this feature is that the Mappers in the chain do not need to be aware that they are executed in a chain. This enables having reusable specialized Mappers that can be combined to perform composite operations within a single task.
Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain. It is assumed all Mappers and the Reduce in the chain use matching output and input key and value classes as no conversion is done by the chaining code.
Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like
[MAP+ / REDUCE MAP*]
. And immediate benefit of this pattern is a dramatic reduction in disk IO. I found it pretty good tool while developing multiple processing pipelines. I just develop re usable classes of various tasks and call them in chain.
Update April 6
I would say based on experience using till now , chain mapper makes processing slow. So use it if and only if unless you really cannot avoid this.
Do you have some tips to improve performance of Chain mapper ? Please share below.
Happy Chaining :)
Happy Chaining :)
hi jungnu, thanks for putting up the tutorial.
ReplyDeleteI am facing trouble while using hadoop-core package 1.0.3. Are you aware if the ChainMapper class is going to be available from 2.0.3 version onward only? And if so, how can I use version 1.0.3 and still use ChainMapper class?
I notice the package hadoop-mapred (which has this class) conflicts with the hadoop-core-1.0.3 package.
Thanks in advance.
Hi
ReplyDeleteFor Hadoop 1.0 series
Please use this import class
http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/mapred/lib/ChainMapper.html
Thanks