内容纲要
概要描述
本文描述如何通过java连接开启Kerberos认证的Kafka
详细说明
maven项目添加依赖
在kafka的pod中获取kafka-clients-xxxx.jar与federation-utils-xxxx.jar
kubectl cp :/usr/lib/kafka/libs/kafka-client-xxxxxx.jar ./kafka-client-xxxxxx.jar
kubectl cp :/usr/lib/kafka/libs/federation-utils-xxxx.jar ./federation-utils-xxxx.jar
将上述两个jar包获取到本机,并放在项目根目录的lib文件夹中,并在pom.xml中添加依赖
io.transwarp //随便写
kafka-client //随便写
1.0 // 随便写
system
${project.basedir}/lib/kafka-clients-0.10.2.0-transwarp-6.2.2.jar
io.transwarp //随便写
guardian //随便写
1.0 // 随便写
system
${project.basedir}/lib/federation-utils-guardian-3.1.3.jar
创建生产者示例
package io.transwarp.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class TestKafkaProducer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
//krb5.conf从集群服务器中获取到本机,路径填本机上该文件的绝对路径
Properties properties = new Properties();
String topic = "demoxltest";//此处为topic名称
properties.put("bootstrap.servers", "ip:port");//填写kafka的ip与端口
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name","kafka");
properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab=\"C:/Users/Transwarp/Desktop/xxll.keytab\" principal=\"xxll@TDH\";");
// keytab去guardian页面下载,用户需要有kafka的admin权限,principal使用"klist -ket xxx.keytab"查看,路径同样为本机上keytab文件的绝对路径。
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++){
String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
Future future = producer.send(new ProducerRecord<>(topic,s ));
future.get();
System.out.println("Success producer :" + s);
Thread.sleep(500);
}
}
}
执行成功结果如下图所示
创建消费者示例
package io.transwarp.demo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
//import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class TestKafkaConsumer {
public static void main(String[] args) {
System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
Properties properties = new Properties();
properties.put("bootstrap.servers", "ip:port");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name","kafka");
properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab=\"C:/Users/Transwarp/Desktop/xxll.keytab\" principal=\"xxll@TDH\";");
properties.put("group.id", "test-consumer-group");
KafkaConsumer consumer = new KafkaConsumer(properties);
Map> topics = consumer.listTopics();
consumer.subscribe(Collections.singleton("demoxltest"));
boolean flag = true;
while(flag) {
ConsumerRecords records = consumer.poll(3000);
System.out.println("Get record size : " + records.count());
for (ConsumerRecord record : records){
// 循环打印消息记录
//处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
consumer.commitAsync();
}
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
consumer.close();
}
}
执行成功结果如下图所示