Kafta getting started tutorial

Kafta getting started tutorial

Download KAfta from Apahe website

What steps are being done.

Start Kafta
Create some Topic
Send some events to it
Read them as consumer

Lets do it


Extract the kafta to some place

jj@1204master64bit:~/dev/code/open$ cd '/home/jj/dev/install_local/kafka_2.10-0.8.1.1'


jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ ls
bin  config  libs  LICENSE  NOTICE
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ ls
bin  config  libs  LICENSE  NOTICE


jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ cd config/
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1/config$ ls
consumer.properties  server.properties       zookeeper.properties
log4j.properties     test-log4j.properties
producer.properties  tools-log4j.properties


jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1/config$ cd ..
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ pwd
/home/jj/dev/install_local/kafka_2.10-0.8.1.1
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/
kafka-console-consumer.sh            kafka-server-stop.sh
kafka-console-producer.sh            kafka-simple-consumer-perf-test.sh
kafka-consumer-perf-test.sh          kafka-simple-consumer-shell.sh
kafka-preferred-replica-election.sh  kafka-topics.sh
kafka-producer-perf-test.sh          windows/
kafka-reassign-partitions.sh         zookeeper-server-start.sh
kafka-replay-log-producer.sh         zookeeper-server-stop.sh
kafka-run-class.sh                   zookeeper-shell.sh
kafka-server-start.sh              

jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ pwd
/home/jj/dev/install_local/kafka_2.10-0.8.1.1

Start the Zookeeper


jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/zookeeper-server-start.sh config/zookeeper.properties

Truncated log

[2014-10-26 15:41:29,784] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-10-26 15:41:29,784] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2014-10-26 15:41:29,804] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-10-26 15:41:29,805] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2014-10-26 15:41:29,812] INFO Server environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:host.name=1204master64bit (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.version=1.6.0_45 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2014-10-26 15:41:29,812] INFO Server environment:java.home=/home/jj/dev/tools/jdk1.6.0_45/jre (org.apache.zookeeper.server.ZooKeeperServer)


Start the Kafta server


jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$  bin/kafka-server-start.sh config/server.properties

Truncated log


[2014-10-26 15:42:38,869] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-10-26 15:42:39,017] INFO Registered broker 0 at path /brokers/ids/0 with address 1204master64bit:9092. (kafka.utils.ZkUtils$)
[2014-10-26 15:42:39,040] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-10-26 15:42:39,068] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-10-26 15:44:02,373] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [calc_events,0] (kafka.server.ReplicaFetcherManager)
[2014-10-26 15:44:02,403] INFO Completed load of log calc_events-0 with log end offset 0 (kafka.log.Log)
[2014-10-26 15:44:02,407] INFO Created log for partition [calc_events,0] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-10-26 15:44:02,408] WARN Partition [calc_events,0] on broker 0: No checkpointed highwatermark is found for partition [calc_events,0] (kafka.cluster.Partition)
[2014-10-26 15:47:05,989] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-10-26 15:47:17,479] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-10-26 15:48:53,366] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)



Create a topic with 1 partition



jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$  bin/kafka-topics.sh --create --topic calc_events \
Missing required argument "[zookeeper]"
Option                                  Description                          
------                                  -----------                          
--alter                                 Alter the configuration for the topic.
--config <name=value>                   A topic configuration override for the
                                          topic being created or altered.    
--create                                Create a new topic.                  
--deleteConfig <name>                   A topic configuration override to be  
                                          removed for an existing topic      
--describe                              List details for the given topics.    
--help                                  Print usage information.              
--list                                  List all available topics.            
--partitions <Integer: # of partitions> The number of partitions for the topic
                                          being created or altered (WARNING:  
                                          If partitions are increased for a  
                                          topic that has a key, the partition
                                          logic or ordering of the messages  
                                          will be affected                    
--replica-assignment                    A list of manual partition-to-broker  
  <broker_id_for_part1_replica1 :         assignments for the topic being    
  broker_id_for_part1_replica2 ,          created or altered.                
  broker_id_for_part2_replica1 :                                              
  broker_id_for_part2_replica2 , ...>                                        
--replication-factor <Integer:          The replication factor for each      
  replication factor>                     partition in the topic being created.
--topic <topic>                         The topic to be create, alter or      
                                          describe. Can also accept a regular
                                          expression except for --create option
--topics-with-overrides                 if set when describing topics, only  
                                          show topics that have overridden    
                                          configs                            
--unavailable-partitions                if set when describing topics, only  
                                          show partitions whose leader is not
                                          available                          
--under-replicated-partitions           if set when describing topics, only  
                                          show under replicated partitions    
--zookeeper <urls>                      REQUIRED: The connection string for  
                                          the zookeeper connection in the form
                                          host:port. Multiple URLS can be    
                                          given to allow fail-over.



         





jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$  bin/kafka-topics.sh --create --topic calc_events --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "calc_events".

Show the topics

jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$  bin/kafka-topics.sh --list --zookeeper localhost:2181
calc_events



jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$  bin/kafka-console-producer.sh --broker-list localhost:9092 –-topic calc_events
Missing required argument "[topic]"
Option                                  Description                          
------                                  -----------                          
--batch-size <Integer: size>            Number of messages to send in a single
                                          batch if they are not being sent    
                                          synchronously. (default: 200)      
--broker-list <broker-list>             REQUIRED: The broker list string in  
                                          the form HOST1:PORT1,HOST2:PORT2.  
--compress                              If set, messages batches are sent    
                                          compressed                          
--key-serializer <encoder_class>        The class name of the message encoder
                                          implementation to use for          
                                          serializing keys. (default: kafka.  
                                          serializer.StringEncoder)          
--line-reader <reader_class>            The class name of the class to use for
                                          reading lines from standard in. By  
                                          default each line is read as a      
                                          separate message. (default: kafka.  
                                          producer.                          
                                          ConsoleProducer$LineMessageReader)  
--message-send-max-retries <Integer>    Brokers can fail receiving the message
                                          for multiple reasons, and being    
                                          unavailable transiently is just one
                                          of them. This property specifies the
                                          number of retires before the        
                                          producer give up and drop this      
                                          message. (default: 3)              
--property <prop>                       A mechanism to pass user-defined      
                                          properties in the form key=value to
                                          the message reader. This allows    
                                          custom configuration for a user-    
                                          defined message reader.            
--queue-enqueuetimeout-ms <Long: queue  Timeout for event enqueue (default:  
  enqueuetimeout ms>                      2147483647)                        
--queue-size <Long: queue_size>         If set and the producer is running in
                                          asynchronous mode, this gives the  
                                          maximum amount of  messages will    
                                          queue awaiting suffient batch size.
                                          (default: 10000)                    
--request-required-acks <Integer:       The required acks of the producer    
  request required acks>                  requests (default: 0)              
--request-timeout-ms <Integer: request  The ack timeout of the producer      
  timeout ms>                             requests. Value must be non-negative
                                          and non-zero (default: 1500)        
--retry-backoff-ms <Long>               Before each retry, the producer      
                                          refreshes the metadata of relevant  
                                          topics. Since leader election takes
                                          a bit of time, this property        
                                          specifies the amount of time that  
                                          the producer waits before refreshing
                                          the metadata. (default: 100)        
--socket-buffer-size <Integer: size>    The size of the tcp RECV size.        
                                          (default: 102400)                  
--sync                                  If set message send requests to the  
                                          brokers are synchronously, one at a
                                          time as they arrive.                
--timeout <Long: timeout_ms>            If set and the producer is running in
                                          asynchronous mode, this gives the  
                                          maximum amount of time a message    
                                          will queue awaiting suffient batch  
                                          size. The value is given in ms.    
                                          (default: 1000)                    
--topic <topic>                         REQUIRED: The topic id to produce    
                                          messages to.                        
--value-serializer <encoder_class>      The class name of the message encoder
                                          implementation to use for          
                                          serializing values. (default: kafka.
                                          serializer.StringEncoder)          

Send some events to the topic.

Type the messages event 1 <Enter>

event 2 etc

In the end Control D to exit and come out.


 
jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic calc_events
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
event 1
event 2
event 3




List the events which have been sent





jj@1204master64bit:~/dev/install_local/kafka_2.10-0.8.1.1$  bin/kafka-console-consumer.sh --topic calc_events --from-beginning --zookeeper localhost:2181
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
event 1
event 2
event 3


This completes the first Kafta tutorial

No comments:

Post a Comment

Please share your views and comments below.

Thank You.