kafka开启Kerberos安全认证Java编程生成者与消费组示例_聆听金生的博客-程序员信息网

技术标签: kakfa  Kerberos  安全认证  生成者与消费者  Kafka  java编程  

一、相关配置文件

  1. kafka_client_jaas.conf配置项
KafkaClient {
    
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\test.service.keytab"
    useTicketCache=false
    principal="test/[email protected]"
    serviceName=kafka;
};
  1. krb5.conf配置项
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 dns_lookup_realm = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true
 rdns = false
 default_realm = HENGHE.COM
# default_ccache_name = KEYRING:persistent:%{uid}

[realms]
 HENGHE.COM = {
    
  kdc = henghe-047
  admin_server = henghe-047
  #kdc = ${kerberosSlaves}
 }

[domain_realm]
 .henghe.com = HENGHE.COM
 henghe.com = HENGHE.COM

  1. 密钥配置项相关导出的keytab文件

二、生成者代码示例

package com.yss.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Scanner;

/**
 * @description:
 * @author: 
 * @create: 2020-07-06 11:44
 **/
public class Producer {
    
    public static void main(String[] args) {
    

        //在windows中设置JAAS,也可以通过-D方式传入
//        System.setProperty("java.security.auth.login.config", "D:\\resources\\kafka_client_jaas.conf");
//        System.setProperty("java.security.krb5.conf", "D:\\resources\\krb5.conf");
        //在Linux中设置JAAS,也可以通过-D方式传入
        System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");


        Properties props = new Properties();
        props.put("bootstrap.servers", "henghe-020:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");




        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        String topic = "ws_kerberos";

        Scanner scan  = new Scanner(System.in);

        while (true){
    
            System.out.print(">>");
            String message = scan.nextLine();
            producer.send(new ProducerRecord<String, String>(topic, message));
            System.out.println(message);
            try {
    
                Thread.sleep(200);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        }

    }
}

三、消费者代码示例

package com.yss.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @description:
 * @author: 
 * @create: 2020-07-06 12:24
 **/
public class Consumer {
    
    public static void main(String[] args) throws Exception {
    

        Properties props = new Properties();

        System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");


        props.put("bootstrap.servers", "henghe-020:9092");
        props.put("group.id", "test132");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("ws_kerberos"));
        long timeMillis = System.currentTimeMillis();
        long count = 0;
        try {
    
            while (true) {
    
                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, byte[]> record : records) {
    
                    System.out.println("\n======================================我是分割符==============================================\n");
                    byte[] value = record.value();
                    String s     = new String(value,"utf8");
                    System.out.println(s);
                }
            }
        } finally {
    
            consumer.close();
            System.out.println("总记录数:" + count + "   耗时:" + (System.currentTimeMillis() - timeMillis) / 1000);
        }
    }
}

简单的demo示例,具体根据业务实现。

四、动态参数示例

public class KerberosTest {
    
    public static void main(String[] args) throws InterruptedException {
    
        Properties props = new Properties();
//        System.setProperty("java.security.auth.login.config", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\krb5.conf");
        props.put("bootstrap.servers", "henghe-37:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "qwe");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,
                "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"D:/ysstech/Medusa/runtime/src/main/resources/henghe.user.keytab\" principal=\"[email protected]\";");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Collections.singletonList("test-topic"));
        while (true) {
    
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> record : records) {
    
                System.out.println(record.value());
            }
        }
    }
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_41609807/article/details/107155530

智能推荐

div+css的入门知识_小小张自由—>张有博的博客-程序员信息网_div+css入门

目录四种选择器类选择器基本语法ID选择器基本语法HTML元素选择器基本语法通配符选择器基本语法如何插入样式表外部样式表内部样式表内联样式行内元素和块元素CSS特点:实现网页内容与样式的分离CSS 规则由两个主要的部分构成:选择器,以及一条或多条声明:四种选择器类选择器(class选择器) id选择器 html元素选择器 通配符选择器CSS选择器优先级:ID选择器 &gt; 类选择器 &gt; HTML元素选择器body&gt;通配符选择器

failed: Error during WebSocket handshake: Unexpected response code: 400_qq_36759433的博客-程序员信息网

问题描述:在项目中引用Socket.Io,在项目部署后报错,本地运行不报错错误原因:需要在配置文件nginx.conf中配置相关信息解决方案:在nginx文件的location中添加proxy_http_version 1.1;proxy_set_...

Linux动态库加载之LD_PRELOAD用法_dr.xun的博客-程序员信息网_ld_preload

转载https://blog.csdn.net/iEearth/article/details/49952047还有一篇博客也可以看看https://blog.csdn.net/xp5xp6/article/details/52513428https://www.cnblogs.com/openix/p/3521166.html/etc/ld.so.conf详解 :https://www.cnblogs.com/chris-cp/p/3591306.htmlLD_PRELOAD,..

Python TensorFlow 报错‘tensorflow.compat.v2.__internal__‘ has no attribute ‘tf2‘解决方法_m0_60105488的博客-程序员信息网

本文主要介绍Python中,使用TensorFlow时,执行import Keras报错:AttributeError: module 'tensorflow.compat.v2.__internal__' has no attribute 'tf2'解决方法。原文地址:Python TensorFlow 报错'tensorflow.compat.v2.__internal__' has no attribute 'tf2'解决方法...

Git强制使用远程仓库代码覆盖本地代码_Debug陈缘圈的博客-程序员信息网_git强制使用远程仓库代码

目前的git存在一个情况就是,在A分支有改动后,但是不方便提交,此时切换到B分支会把工作区带过来污染B分支可以通过强制拉取远程仓库代码覆盖本地分支去除污染git fetch --all //从另一个存储库下载对象和引用git reset --hard origin/master //放弃本地修改git pull //开始更新另一种解决办法是在A切换B之前,git stash一下...

CSAPP malloc实验_pcj_888的博客-程序员信息网_malloc 实验

实验简介实现自己的动态内存分配器(malloc、free、realloc)。预备知识阅读《CSAPP原书第3版》 9.9小节 —— 动态内存分配。阅读writeup的全部内容。分配器的设计要求处理任意请求序列,分配器不可以假设分配和释放请求的顺序。立即响应请求, 不允许分配器为了提高性能重新排列或缓冲请求。只使用堆。对齐块,以保存任何类型的数据对象。不修改已分配的块,分配器只能操作和改变空闲块。分配器的设计目标最大化吞吐率 —— 每个malloc, free执行的指令越少,

随便推点

tracepath 追踪路由信息 linux 命令_DemonHunter211的博客-程序员信息网

tracepath tracepath指令可以追踪数据到达目标主机的路由信息,同时还能够发现MTU值。它跟踪路径到目的地,沿着这条路径发现MTU。它使用UDP端口或一些随机端口。它类似于Traceroute,只是不需要超级用户特权,并且没有花哨的选项。tracepath 6很好地替代了tracerout 6和Linux错误队列应用程序的典型示例。tracepath的情况更糟,因为商用IP路由器在ICMP错误消息中没有返回足够的信息。很可能,当它们被更新的时候,它会改变 此命令的适用...

不会做动画的程序猿不是好的动画师(如何用css3动画做动画)_代森森的博客-程序员信息网_css动画师

转换(transform)是CSS3中具有颠覆性的特征之一,可以实现元素的位移、旋转、变形、缩放缩放:scale:缩放,顾名思义,可以放大和缩小。 只要给元素添加上了这个属性就能控制它放大还是缩小。1.语法:transform:scale(x,y);2.总结:宽和高都放大一倍,相对于没有放大transform:scale(2,2) :宽和高都放大了2倍transform:scale(2) :只写一个参数,第二个参数则和第一个参数一样,相当于 scale(2,2)transform:s

from tensorflow.keras import无法引用_张=小红=的博客-程序员信息网

环境:安装anaconda,创建虚拟环境,使用的python版本为python3.7;安装tensorflow-gpu版本(版本为2.1.0,能够正常进行import tensorflow)使用pycharm进行出现错误:这里提示在tensorflow包下并没有keras的文件可以进行引用,但实际上代码还是可以继续运行的。个人推测是遍历了tensorflow下各个文件进行搜索。去F:...

华为杯数学建模竞赛百分百获奖经验分享(获奖 == 四分经验,三分运气,三分实力)_maligebilaowang的博客-程序员信息网_华为杯数学建模成绩

!【注意】博主原创,转载请注明出处,纯属个人经验,大佬路过勿喷一、前言不知不觉,现在已经研二了!!!在最近的2019年“华为杯”全国研究生数学建模竞赛里博主喜获二等奖,虽然按照我以往经验获奖是必然的。但是出成绩,还是小激动了一下。这也应该是我最后一次参加数学建模竞赛,所以一直想着写个竞赛经验分享给大家。很多人参加这个比赛就是为了获奖,尤其对于想在上海落户的同学。而且这个比赛也是众多官方认证比............

Flink CDC 2.0 详解_000X000的博客-程序员信息网_flinkcdc安装详解

一、CDC 概述CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:数据同步:用于备份,容灾; 数据分发:一个数据源分发给多个下游系统; 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种: 基于查询的 C_1671465600

Python连接hive报错Could not start SASL: Error in sasl_client_start (-4) SASL(-4): no mechanism available_醉糊涂仙的博客-程序员信息网_could not start sasl

Could not start SASL: Error in sasl_client_start (-4) SASL(-4): no mechanism available: No worthy mechs found (code THRIFTTRANSPORT): TTransportException('Could not start SASL: Error in sasl_client_st...

推荐文章

热门文章

相关标签