基于HDP3.1平台Kafka使用上跟早期版本没有太大差异,不过目前HDP3.1 bug还比较多,官方文档在使用上也有些未及时更新的地方,导致初学者无法跑通情况,下面会做基本使用介绍:
- 1. 权限配置
本示例集群使用的是kerberos认证,kafka通过ranger控制权限,这样Kafka本身的acl授权就无需再配置。
使用Ranger授权如下, 这里我们创建3个主题,分别配置不同的权限测试:
- 2. 创建主题topic
创建topic要写zookeeper, 默认kafka就有读写 zookeeper 的权限,另外上面我们已经配置了kafka账号具有kafka服务所有权限, 所以要使用Kafka keytab来创建topic, 如果创建的topic提示没有leader就是没有权限的大原因.
--kafka_jaas.conf 是安装完kafka后自动生成的,这里要先在jvm配置该变量信息
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_jaas.conf"
--进入kafka 客户端命令目录
cd /usr/hdp/current/kafka-broker
$ kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/hadoop3.lts.local@LTS.LOCAL
--创建topic
$ bin/kafka-topics.sh --create --zookeeper hadoop1.lts.local:2181,hadoop2.lts.local:2181,hadoop3.lts.local:2181 --partitions 3 --replication-factor 2 --topic test-topic
--列出所有topic
$ bin/kafka-topics.sh --list --zookeeper hadoop1.lts.local:2181,hadoop2.lts.local:2181,hadoop3.lts.local:2181
--显示指定topic详细信息
$ bin/kafka-topics.sh --describe --zookeeper hadoop1.lts.local:2181,hadoop2.lts.local:2181,hadoop3.lts.local:2181 --topic test-topic
--删除topic
$ bin/kafka-topics.sh --delete --zookeeper hadoop1.lts.local:2181,hadoop2.lts.local:2181,hadoop3.lts.local:2181 --topic test-topic
--如果因错误导致无法删除,可从zookeeper上删除
[root@hadoop3 bin]# cd /usr/hdp/current/zookeeper-client/bin/
[root@hadoop3 bin]# ./zkCli.sh -server hadoop3:2181
[zk: hadoop3:2181(CONNECTED) 1] ls /brokers/topics
[zk: hadoop3:2181(CONNECTED) 2] rmr /brokers/topics/test
--提示:Error while executing topic command : KeeperErrorCode = NoAuth for /config/topics/test3 先删除rmr /config/topics/test3
[zk: hadoop3:2181(CONNECTED) 5] rmr /config/topics/test
- 3. 发送信息到kafka topic
kafka_client_jaas.conf 是安装kafka时自动生成的, 使用kafka cli前要export一下, 以下的producer-property security.protocol=SASL_PLAINTEXT写法跟之前版本不太一样, HDP官方文档还是使用老的写法, 需要更正过来:
--kafka producer
$ cd /usr/hdp/current/kafka-broker
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf"
--输入以下命令后就可以,就可以随意输入一些字符发送的kafka
$ bin/kafka-console-producer.sh --broker-list hadoop3.lts.local:6667,hadoop4.lts.local:6667,hadoop5.lts.local:6667 --producer-property security.protocol=SASL_PLAINTEXT --topic test-topic
- 4. 消费kafka topic信息
--kafka consumer
$ cd /usr/hdp/current/kafka-broker
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf"
--可能有点慢,等一会信息就显示出来了
$ bin/kafka-console-consumer.sh --bootstrap-server hadoop3.lts.local:6667,hadoop4.lts.local:6667,hadoop5.lts.local:6667 --from-beginning --consumer-property security.protocol=SASL_PLAI
- 5. 使用java进行kafka topic生产和消费
因为集群做了kerberos认证, 在代码要配置jaas文件和keytab文件认证.
示例代码已经上传github, 有兴趣可下载查看:https://github.com/d0pang/kafkaDemo
Leave a Reply