java连接开启Kerberos认证的Kafka

  使用配置
内容纲要

概要描述


本文描述如何通过java连接开启Kerberos认证的Kafka

详细说明


maven项目添加依赖

在kafka的pod中获取kafka-clients-xxxx.jar与federation-utils-xxxx.jar

kubectl cp :/usr/lib/kafka/libs/kafka-client-xxxxxx.jar ./kafka-client-xxxxxx.jar

kubectl cp :/usr/lib/kafka/libs/federation-utils-xxxx.jar ./federation-utils-xxxx.jar

将上述两个jar包获取到本机,并放在项目根目录的lib文件夹中,并在pom.xml中添加依赖

 
    io.transwarp//随便写
    kafka-client //随便写
    1.0// 随便写
    system
    ${project.basedir}/lib/kafka-clients-0.10.2.0-transwarp-6.2.2.jar


    io.transwarp//随便写
    guardian //随便写
    1.0// 随便写
    system
    ${project.basedir}/lib/federation-utils-guardian-3.1.3.jar

创建生产者示例

package io.transwarp.demo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class TestKafkaProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
        //krb5.conf从集群服务器中获取到本机,路径填本机上该文件的绝对路径
        Properties properties = new Properties();
        String topic = "demoxltest";//此处为topic名称
        properties.put("bootstrap.servers", "ip:port");//填写kafka的ip与端口
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.kerberos.service.name","kafka");
        properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab=\"C:/Users/Transwarp/Desktop/xxll.keytab\" principal=\"xxll@TDH\";");
        // keytab去guardian页面下载,用户需要有kafka的admin权限,principal使用"klist -ket xxx.keytab"查看,路径同样为本机上keytab文件的绝对路径。
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++){
            String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
            Future  future = producer.send(new ProducerRecord<>(topic,s ));
            future.get();
            System.out.println("Success producer :" + s);
            Thread.sleep(500);
        }
    }
}

执行成功结果如下图所示
file

创建消费者示例

package io.transwarp.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

//import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class TestKafkaConsumer {

    public static void main(String[] args) {

        System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "ip:port");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.kerberos.service.name","kafka");
        properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab=\"C:/Users/Transwarp/Desktop/xxll.keytab\" principal=\"xxll@TDH\";");
        properties.put("group.id", "test-consumer-group");
        KafkaConsumer consumer = new KafkaConsumer(properties);
        Map> topics = consumer.listTopics();
        consumer.subscribe(Collections.singleton("demoxltest"));
        boolean flag = true;

        while(flag) {
            ConsumerRecords records = consumer.poll(3000);
            System.out.println("Get record size : " + records.count());
            for (ConsumerRecord record : records){
                // 循环打印消息记录
                //处理消息
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                consumer.commitAsync();
            }
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        consumer.close();
    }
}

执行成功结果如下图所示
file

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。