[root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --list --zookeeper server1:2181,server2:2181,server3:2181 test
5、发送一些消息
1 2 3 4 5 6 7 8
[root@server1 kafka_2.10-0.8.2-beta]# bin/kafka-console-producer.sh --broker-list server1:9092 --topic test This is a message This is another message
[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/test/partitions/3/state {"controller_epoch":1,"leader":3,"version":1,"leader_epoch":0,"isr":[3,1]} [zk: localhost:2181(CONNECTED) 12] get /brokers/ids/3 {"jmx_port":-1,"timestamp":"1431447378690","host":"server3","version":1,"port":9092}
6、开始消费信息
1 2 3 4 5 6 7 8 9 10 11 12 13
[root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic test --from-beginning This is a message This is another message
[root@server3 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic test --from-beginning This is a message This is another message kafaka to slave msg...
[root@server4 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic test --from-beginning This is a message This is another message kafaka to slave msg.
消息被写到目录: [root@server2 bin]# ls /tmp/kafka-logs/test-2 00000000000000000000.index 00000000000000000000.log
9、删除topic
1 2 3 4 5
$ kafka-topics.sh --zookeeper server1:2181,server2:2181,server3:2181 --delete --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
Running It This is a small webapp, you can run it locally or on a server, as long as you have access to the ZooKeeper nodes controlling kafka. java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk zk-server1,zk-server2 \ --port 8080 \ --refresh 10.seconds \ --retain 2.days The arguments are: - zk the ZooKeeper hosts - port on what port will the app be available - refresh how often should the app refresh and store a point in the DB - retain how long should points be kept in the DB - dbName where to store the history (default 'offsetapp')
[2016-06-02 05:08:59,151] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn't match stored brokerId 1 in meta.properties at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630) at kafka.server.KafkaServer.startup(KafkaServer.scala:175) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) at kafka.Kafka$.main(Kafka.scala:67) at kafka.Kafka.main(Kafka.scala) [2016-06-02 05:08:59,152] INFO shutting down (kafka.server.KafkaServer)