Kafta getting started tutorial
Download KAfta from Apahe website
What steps are being done.
Start Kafta
Create some Topic
Send some events to it
Read them as consumer
Lets do it
Extract the kafta to some place
jj@1204master64bit:~/dev/code/open$ cd '/home/jj/dev/install_local/kafka_2.10-0.8.1.1'
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ ls
bin config libs LICENSE NOTICE
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ ls
bin config libs LICENSE NOTICE
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ cd config/
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1/config$ ls
consumer.properties server.properties zookeeper.properties
log4j.properties test-log4j.properties
producer.properties tools-log4j.properties
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1/config$ cd ..
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ pwd
/home/jj/dev/install_local/kafka_2.10-0.8.1.1
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/
kafka-console-consumer.sh kafka-server-stop.sh
kafka-console-producer.sh kafka-simple-consumer-perf-test.sh
kafka-consumer-perf-test.sh kafka-simple-consumer-shell.sh
kafka-preferred-replica-election.sh kafka-topics.sh
kafka-producer-perf-test.sh windows/
kafka-reassign-partitions.sh zookeeper-server-start.sh
kafka-replay-log-producer.sh zookeeper-server-stop.sh
kafka-run-class.sh zookeeper-shell.sh
kafka-server-start.sh
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ pwd
/home/jj/dev/install_local/kafka_2.10-0.8.1.1
Start the Zookeeper
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/zookeeper-server-start.sh config/zookeeper.properties
Truncated log
[2014-10-26 15:41:29,784] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-10-26 15:41:29,784] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2014-10-26 15:41:29,804] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-10-26 15:41:29,805] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2014-10-26 15:41:29,812] INFO Server environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:host.name=1204master64bit (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.version=1.6.0_45 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.home=/home/jj/dev/tools/jdk1.6.0_45/jre (org.apache.zookeeper.server.ZooKeeperServer)
Start the Kafta server
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server.properties
Truncated log
[2014-10-26 15:42:38,869] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-10-26 15:42:39,017] INFO Registered broker 0 at path /brokers/ids/0 with address 1204master64bit:9092. (kafka.utils.ZkUtils$)
[2014-10-26 15:42:39,040] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-10-26 15:42:39,068] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-10-26 15:44:02,373] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [calc_events,0] (kafka.server.ReplicaFetcherManager)
[2014-10-26 15:44:02,403] INFO Completed load of log calc_events-0 with log end offset 0 (kafka.log.Log)
[2014-10-26 15:44:02,407] INFO Created log for partition [calc_events,0] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-10-26 15:44:02,408] WARN Partition [calc_events,0] on broker 0: No checkpointed highwatermark is found for partition [calc_events,0] (kafka.cluster.Partition)
[2014-10-26 15:47:05,989] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-10-26 15:47:17,479] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-10-26 15:48:53,366] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
Create a topic with 1 partition
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --topic calc_events \
Missing required argument "[zookeeper]"
Option Description
------ -----------
--alter Alter the configuration for the topic.
--config <name=value> A topic configuration override for the
topic being created or altered.
--create Create a new topic.
--deleteConfig <name> A topic configuration override to be
removed for an existing topic
--describe List details for the given topics.
--help Print usage information.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment A list of manual partition-to-broker
<broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --topic calc_events --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "calc_events".
Show the topics
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --list --zookeeper localhost:2181
calc_events
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 –-topic calc_events
Missing required argument "[topic]"
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. (default: 200)
--broker-list <broker-list> REQUIRED: The broker list string in
the form HOST1:PORT1,HOST2:PORT2.
--compress If set, messages batches are sent
compressed
--key-serializer <encoder_class> The class name of the message encoder
implementation to use for
serializing keys. (default: kafka.
serializer.StringEncoder)
--line-reader <reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: kafka.
producer.
ConsoleProducer$LineMessageReader)
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retires before the
producer give up and drop this
message. (default: 3)
--property <prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
--queue-enqueuetimeout-ms <Long: queue Timeout for event enqueue (default:
enqueuetimeout ms> 2147483647)
--queue-size <Long: queue_size> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of messages will
queue awaiting suffient batch size.
(default: 10000)
--request-required-acks <Integer: The required acks of the producer
request required acks> requests (default: 0)
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero (default: 1500)
--retry-backoff-ms <Long> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 102400)
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Long: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting suffient batch
size. The value is given in ms.
(default: 1000)
--topic <topic> REQUIRED: The topic id to produce
messages to.
--value-serializer <encoder_class> The class name of the message encoder
implementation to use for
serializing values. (default: kafka.
serializer.StringEncoder)
Send some events to the topic.
Type the messages event 1 <Enter>
event 2 etc
In the end Control D to exit and come out.
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic calc_events
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
event 1
event 2
event 3
List the events which have been sent
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-console-consumer.sh --topic calc_events --from-beginning --zookeeper localhost:2181
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
event 1
event 2
event 3
This completes the first Kafta tutorial
Download KAfta from Apahe website
What steps are being done.
Start Kafta
Create some Topic
Send some events to it
Read them as consumer
Lets do it
Extract the kafta to some place
jj@1204master64bit:~/dev/code/open$ cd '/home/jj/dev/install_local/kafka_2.10-0.8.1.1'
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ ls
bin config libs LICENSE NOTICE
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ ls
bin config libs LICENSE NOTICE
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ cd config/
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1/config$ ls
consumer.properties server.properties zookeeper.properties
log4j.properties test-log4j.properties
producer.properties tools-log4j.properties
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1/config$ cd ..
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ pwd
/home/jj/dev/install_local/kafka_2.10-0.8.1.1
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/
kafka-console-consumer.sh kafka-server-stop.sh
kafka-console-producer.sh kafka-simple-consumer-perf-test.sh
kafka-consumer-perf-test.sh kafka-simple-consumer-shell.sh
kafka-preferred-replica-election.sh kafka-topics.sh
kafka-producer-perf-test.sh windows/
kafka-reassign-partitions.sh zookeeper-server-start.sh
kafka-replay-log-producer.sh zookeeper-server-stop.sh
kafka-run-class.sh zookeeper-shell.sh
kafka-server-start.sh
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ pwd
/home/jj/dev/install_local/kafka_2.10-0.8.1.1
Start the Zookeeper
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/zookeeper-server-start.sh config/zookeeper.properties
Truncated log
[2014-10-26 15:41:29,784] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-10-26 15:41:29,784] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2014-10-26 15:41:29,804] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-10-26 15:41:29,805] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2014-10-26 15:41:29,812] INFO Server environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:host.name=1204master64bit (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.version=1.6.0_45 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.home=/home/jj/dev/tools/jdk1.6.0_45/jre (org.apache.zookeeper.server.ZooKeeperServer)
Start the Kafta server
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server.properties
Truncated log
[2014-10-26 15:42:38,869] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-10-26 15:42:39,017] INFO Registered broker 0 at path /brokers/ids/0 with address 1204master64bit:9092. (kafka.utils.ZkUtils$)
[2014-10-26 15:42:39,040] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-10-26 15:42:39,068] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-10-26 15:44:02,373] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [calc_events,0] (kafka.server.ReplicaFetcherManager)
[2014-10-26 15:44:02,403] INFO Completed load of log calc_events-0 with log end offset 0 (kafka.log.Log)
[2014-10-26 15:44:02,407] INFO Created log for partition [calc_events,0] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-10-26 15:44:02,408] WARN Partition [calc_events,0] on broker 0: No checkpointed highwatermark is found for partition [calc_events,0] (kafka.cluster.Partition)
[2014-10-26 15:47:05,989] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-10-26 15:47:17,479] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-10-26 15:48:53,366] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
Create a topic with 1 partition
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --topic calc_events \
Missing required argument "[zookeeper]"
Option Description
------ -----------
--alter Alter the configuration for the topic.
--config <name=value> A topic configuration override for the
topic being created or altered.
--create Create a new topic.
--deleteConfig <name> A topic configuration override to be
removed for an existing topic
--describe List details for the given topics.
--help Print usage information.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment A list of manual partition-to-broker
<broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --topic calc_events --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "calc_events".
Show the topics
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --list --zookeeper localhost:2181
calc_events
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 –-topic calc_events
Missing required argument "[topic]"
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. (default: 200)
--broker-list <broker-list> REQUIRED: The broker list string in
the form HOST1:PORT1,HOST2:PORT2.
--compress If set, messages batches are sent
compressed
--key-serializer <encoder_class> The class name of the message encoder
implementation to use for
serializing keys. (default: kafka.
serializer.StringEncoder)
--line-reader <reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: kafka.
producer.
ConsoleProducer$LineMessageReader)
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retires before the
producer give up and drop this
message. (default: 3)
--property <prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
--queue-enqueuetimeout-ms <Long: queue Timeout for event enqueue (default:
enqueuetimeout ms> 2147483647)
--queue-size <Long: queue_size> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of messages will
queue awaiting suffient batch size.
(default: 10000)
--request-required-acks <Integer: The required acks of the producer
request required acks> requests (default: 0)
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero (default: 1500)
--retry-backoff-ms <Long> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 102400)
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Long: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting suffient batch
size. The value is given in ms.
(default: 1000)
--topic <topic> REQUIRED: The topic id to produce
messages to.
--value-serializer <encoder_class> The class name of the message encoder
implementation to use for
serializing values. (default: kafka.
serializer.StringEncoder)
Send some events to the topic.
Type the messages event 1 <Enter>
event 2 etc
In the end Control D to exit and come out.
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic calc_events
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
event 1
event 2
event 3
List the events which have been sent
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-console-consumer.sh --topic calc_events --from-beginning --zookeeper localhost:2181
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
event 1
event 2
event 3
This completes the first Kafta tutorial