kafka use

kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
(1)message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
(2)Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.

1、kafka安装

http://kafka.apache.org/documentation.html#quickstart

wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

[root@server1 local]# tar -zxvf software/kafka_2.10-0.8.2-beta.tgz

#TCP TEST
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

# vim config/server.properties
brokerid:这个每个server(broker)必须唯一,写数字
host.name:这个也是唯一的,写ip或者hostname
[root@server1 config]# egrep -v "^$|#" server.properties  
broker.id=1
port=9092
host.name=server1  #所在节点的ip地址,如果不想配置全为localhost即可

num.network.threads=3

num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=65536
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=server1:2181,server2:2181,server3:2181
zookeeper.connection.timeout.ms=2000

[root@server1 config]# egrep -v "^$|#" zookeeper.properties 
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

[root@server1 local]# scp -rq kafka_2.10-0.8.2-beta server2:/usr/local/

2、启动kafka

#启动之前zookeeper必须启动!zookeeper请参考[zookeeper cluster deploy](http://itweet.github.io/2015/07/12/zk-cluster-deploy/)
[root@server1 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

--建议下面方式启动否则关闭终端kafka就挂了
# nohup kafka/bin/kafka-server-start.sh kafka/config/server.properties > kafka/kafka-logs/kafka-server.log &

[root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

[root@server3 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

[root@server4 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

[zk: localhost:2181(CONNECTED) 3] get /brokers/ids   
null
cZxid = 0x1d00000939
ctime = Tue May 12 23:52:57 CST 2015
mZxid = 0x1d00000939
mtime = Tue May 12 23:52:57 CST 2015
pZxid = 0x1d00000ca7
cversion = 4
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0

3、创建一个topic

[root@server1 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --create --zookeeper server1:2181,server2:2181,server3:2181 --replication-factor 3 --partitions 1 --topic test

4、命令行查看topic

[root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --list --zookeeper server1:2181,server2:2181,server3:2181
test

5、发送一些消息

[root@server1 kafka_2.10-0.8.2-beta]# bin/kafka-console-producer.sh --broker-list server1:9092 --topic test         
This is a message
This is another message

[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/test/partitions/3/state
{"controller_epoch":1,"leader":3,"version":1,"leader_epoch":0,"isr":[3,1]}
[zk: localhost:2181(CONNECTED) 12] get /brokers/ids/3
{"jmx_port":-1,"timestamp":"1431447378690","host":"server3","version":1,"port":9092}

6、开始消费信息

[root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic test --from-beginning
This is a message
This is another message

[root@server3 kafka_2.10-0.8.2-beta]#  bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg...

[root@server4 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg.

8、查看集群topic详细信息

[root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --describe --zookeeper server1:2181,server2:2181,server3:2181 --topic test
Topic:test      PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: test     Partition: 0    Leader: 4       Replicas: 4,2   Isr: 4,2
        Topic: test     Partition: 1    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: test     Partition: 2    Leader: 2       Replicas: 2,4   Isr: 2,4
        Topic: test     Partition: 3    Leader: 3       Replicas: 3,1   Isr: 3,1

消息被写到目录:
[root@server2 bin]# ls /tmp/kafka-logs/test-2
00000000000000000000.index  00000000000000000000.log

9、删除topic

$ kafka-topics.sh --zookeeper server1:2181,server2:2181,server3:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

配置server.config , delete.topic.enable=true即可删除

10、kafka的webui

wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar

Running It
This is a small webapp, you can run it locally or on a server, as long as you have access to the ZooKeeper nodes controlling kafka.
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
—zk zk-server1,zk-server2 \
—port 8080 \
—refresh 10.seconds \
—retain 2.days
The arguments are:

  • zk the ZooKeeper hosts
  • port on what port will the app be available
  • refresh how often should the app refresh and store a point in the DB
  • retain how long should points be kept in the DB
  • dbName where to store the history (default ‘offsetapp’)

[root@server1 local]# mkdir kafka-offset-console
[root@server1 local]# cd kafka-offset-console/
[root@server1 kafka-offset-console]# cat mobile_start_en.sh

!/bin/bash

cd /usr/local/kafka-offset-console
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp ./KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb —zk server1,server2,server3/config/mobile/kafka-offset-console —port 9999 —refresh 10.seconds —retain 7.days 1>./stdout.log 2>./stderr.log &

[root@server1 kafka-offset-console]# chmod +x mobile_start_en.sh

[root@server1 kafka-offset-console]# sh mobile_start_en.sh

[root@server1 kafka-offset-console]# tail -f stdout.log
serving resources from: jar:file:/usr/local/kafka-offset-console/KafkaOffsetMonitor-assembly-0.2.1.jar!/offsetapp


- Kafka监控工具二

https://github.com/yahoo/kafka-manager
[root@server1 kafka-manager]# echo $JAVA_HOME
/usr/local/jdk1.7.0_45
[root@server1 sbt]# sbt -version
Getting org.scala-sbt sbt 0.13.8 …

https://github.com/yahoo/kafka-manager
[root@server1 hsu]# yum install git -y
[root@server1 hsu]# git clone https://github.com/yahoo/kafka-manager.git
[root@server1 hsu]# cd kafka-manager/
[root@server1 kafka-manager]# sbt clean dist #这里生产zip包

Configuration
$ unzip kafka-manager-1.0-SNAPSHOT.zip
在kafka-manager安装包的conf目录下面的application.conf文件中进行配置
kafka-manager.zkhosts=”server1:2181,server2:2181,server3:2181”

Starting the service
$ bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=8081 > manager-ui.log &


![](http://itweet.github.io/screenshots/kafka-manager.png)

这个工具需要自己编译,也可以直接找我获取编译包!

- Kafka监控工具三

https://github.com/shunfei/DCMonitor

druid:
https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_StatViewServlet%E9%85%8D%E7%BD%AE


# 11、kafka和MQ区别

https://github.com/alibaba/RocketMQ/wiki/rmq_vs_kafka

http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines


- 性能对比
•   `Kafka单机写入TPS约在百万条/秒,消息大小10个字节`
•   RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节
总结:Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker。

# 12、项目分析

数据源:
Oracle订单表

写代码
发送数据给kafka

接收:
[root@server1 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh —zookeeper server1:2181,server2:2181,server3:2181 —topic order —from-beginning
998801026 11118 731 44 0 2015-05-17 00:36:08
46300226 68178 556 14 0 2015-05-17 00:36:08
367575834 42812 820 21 0 2015-05-17 00:36:08
97829386 96289 583 67 1 2015-05-17 00:36:08

需要拷贝kafka的包到storm相应的lib目录下

zk创建节点

[zk: localhost:2181(CONNECTED) 11] create /order 1
Created /order
[zk: localhost:2181(CONNECTED) 12] create /order/id 1
Created /order/id
[zk: localhost:2181(CONNECTED) 13] ls /order
[id]

创建topic

kafka-topics.sh —zookeeper server1:2181,server2:2181,server3:2181 —delete —topic order
bin/kafka-topics.sh —create —zookeeper server1:2181,server2:2181,server3:2181 —replication-factor 3 —partitions 4 —topic order
bin/kafka-topics.sh —describe —zookeeper server1:2181,server2:2181,server3:2181 —topic order

打包测试1

storm-0.9.2/bin/storm jar storm-code-1.0-SNAPSHOT-jar-with-dependencies.jar com.kafka_storm.topology.CounterTopology

打包测试2

$ storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology

优化数据结构保证数据不丢失!通过存储到第三方内存系统中如redis/memcached

zk分布式锁保证多线程处理数据,数据一致性,使用第三方封装zkcli包,保证每次只有一个线程去操作mysql,而不是多个线程导致数据混乱!保证同一条数据不会被多个线程同时处理![这样就可以topology开启多线程处理数据!]

http://repo1.maven.org/maven2/com/netflix/curator/

集群模式测试,多线程

storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology OrderTopology > order.log


# ambari 自动安装Kafka启动报错
1、由于log.dirs目录下的meta.properties中配置的broker.id和配置目录下的server.properties中的broker.id不一致了,解决问题的方法是将两者修改一致后再重启。

$ cat /var/log/kafka/server.log

[2016-06-02 05:08:59,151] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn’t match stored brokerId 1 in meta.properties
at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630)
at kafka.server.KafkaServer.startup(KafkaServer.scala:175)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
at kafka.Kafka$.main(Kafka.scala:67)
at kafka.Kafka.main(Kafka.scala)
[2016-06-02 05:08:59,152] INFO shutting down (kafka.server.KafkaServer)

解决,如下可以看到broker.id不一致,修改后者重启即可:
[root@bigdata-hadoop-3 kafka-logs]# cat /etc/kafka/conf/server.properties |grep broker.id
broker.id=2

[root@bigdata-hadoop-3 kafka-logs]# cat /kafka-logs/meta.properties |grep id
broker.id=1
```

参考:

原创文章,转载请注明: 转载自Itweet的博客
本博客的文章集合: http://www.itweet.cn/blog/archive/