技术标签: kakfa Kerberos 安全认证 生成者与消费者 Kafka java编程
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\test.service.keytab"
useTicketCache=false
principal="test/[email protected]"
serviceName=kafka;
};
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
dns_lookup_realm = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
rdns = false
default_realm = HENGHE.COM
# default_ccache_name = KEYRING:persistent:%{uid}
[realms]
HENGHE.COM = {
kdc = henghe-047
admin_server = henghe-047
#kdc = ${kerberosSlaves}
}
[domain_realm]
.henghe.com = HENGHE.COM
henghe.com = HENGHE.COM
package com.yss.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Scanner;
/**
* @description:
* @author:
* @create: 2020-07-06 11:44
**/
public class Producer {
public static void main(String[] args) {
//在windows中设置JAAS,也可以通过-D方式传入
// System.setProperty("java.security.auth.login.config", "D:\\resources\\kafka_client_jaas.conf");
// System.setProperty("java.security.krb5.conf", "D:\\resources\\krb5.conf");
//在Linux中设置JAAS,也可以通过-D方式传入
System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "henghe-020:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
String topic = "ws_kerberos";
Scanner scan = new Scanner(System.in);
while (true){
System.out.print(">>");
String message = scan.nextLine();
producer.send(new ProducerRecord<String, String>(topic, message));
System.out.println(message);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.yss.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @description:
* @author:
* @create: 2020-07-06 12:24
**/
public class Consumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
props.put("bootstrap.servers", "henghe-020:9092");
props.put("group.id", "test132");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ws_kerberos"));
long timeMillis = System.currentTimeMillis();
long count = 0;
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("\n======================================我是分割符==============================================\n");
byte[] value = record.value();
String s = new String(value,"utf8");
System.out.println(s);
}
}
} finally {
consumer.close();
System.out.println("总记录数:" + count + " 耗时:" + (System.currentTimeMillis() - timeMillis) / 1000);
}
}
}
简单的demo示例,具体根据业务实现。
public class KerberosTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
// System.setProperty("java.security.auth.login.config", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\krb5.conf");
props.put("bootstrap.servers", "henghe-37:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "qwe");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"D:/ysstech/Medusa/runtime/src/main/resources/henghe.user.keytab\" principal=\"[email protected]\";");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
目录四种选择器类选择器基本语法ID选择器基本语法HTML元素选择器基本语法通配符选择器基本语法如何插入样式表外部样式表内部样式表内联样式行内元素和块元素CSS特点:实现网页内容与样式的分离CSS 规则由两个主要的部分构成:选择器,以及一条或多条声明:四种选择器类选择器(class选择器) id选择器 html元素选择器 通配符选择器CSS选择器优先级:ID选择器 > 类选择器 > HTML元素选择器body>通配符选择器
问题描述:在项目中引用Socket.Io,在项目部署后报错,本地运行不报错错误原因:需要在配置文件nginx.conf中配置相关信息解决方案:在nginx文件的location中添加proxy_http_version 1.1;proxy_set_...
转载https://blog.csdn.net/iEearth/article/details/49952047还有一篇博客也可以看看https://blog.csdn.net/xp5xp6/article/details/52513428https://www.cnblogs.com/openix/p/3521166.html/etc/ld.so.conf详解 :https://www.cnblogs.com/chris-cp/p/3591306.htmlLD_PRELOAD,..
本文主要介绍Python中,使用TensorFlow时,执行import Keras报错:AttributeError: module 'tensorflow.compat.v2.__internal__' has no attribute 'tf2'解决方法。原文地址:Python TensorFlow 报错'tensorflow.compat.v2.__internal__' has no attribute 'tf2'解决方法...
目前的git存在一个情况就是,在A分支有改动后,但是不方便提交,此时切换到B分支会把工作区带过来污染B分支可以通过强制拉取远程仓库代码覆盖本地分支去除污染git fetch --all //从另一个存储库下载对象和引用git reset --hard origin/master //放弃本地修改git pull //开始更新另一种解决办法是在A切换B之前,git stash一下...
实验简介实现自己的动态内存分配器(malloc、free、realloc)。预备知识阅读《CSAPP原书第3版》 9.9小节 —— 动态内存分配。阅读writeup的全部内容。分配器的设计要求处理任意请求序列,分配器不可以假设分配和释放请求的顺序。立即响应请求, 不允许分配器为了提高性能重新排列或缓冲请求。只使用堆。对齐块,以保存任何类型的数据对象。不修改已分配的块,分配器只能操作和改变空闲块。分配器的设计目标最大化吞吐率 —— 每个malloc, free执行的指令越少,
tracepath tracepath指令可以追踪数据到达目标主机的路由信息,同时还能够发现MTU值。它跟踪路径到目的地,沿着这条路径发现MTU。它使用UDP端口或一些随机端口。它类似于Traceroute,只是不需要超级用户特权,并且没有花哨的选项。tracepath 6很好地替代了tracerout 6和Linux错误队列应用程序的典型示例。tracepath的情况更糟,因为商用IP路由器在ICMP错误消息中没有返回足够的信息。很可能,当它们被更新的时候,它会改变 此命令的适用...
转换(transform)是CSS3中具有颠覆性的特征之一,可以实现元素的位移、旋转、变形、缩放缩放:scale:缩放,顾名思义,可以放大和缩小。 只要给元素添加上了这个属性就能控制它放大还是缩小。1.语法:transform:scale(x,y);2.总结:宽和高都放大一倍,相对于没有放大transform:scale(2,2) :宽和高都放大了2倍transform:scale(2) :只写一个参数,第二个参数则和第一个参数一样,相当于 scale(2,2)transform:s
环境:安装anaconda,创建虚拟环境,使用的python版本为python3.7;安装tensorflow-gpu版本(版本为2.1.0,能够正常进行import tensorflow)使用pycharm进行出现错误:这里提示在tensorflow包下并没有keras的文件可以进行引用,但实际上代码还是可以继续运行的。个人推测是遍历了tensorflow下各个文件进行搜索。去F:...
!【注意】博主原创,转载请注明出处,纯属个人经验,大佬路过勿喷一、前言不知不觉,现在已经研二了!!!在最近的2019年“华为杯”全国研究生数学建模竞赛里博主喜获二等奖,虽然按照我以往经验获奖是必然的。但是出成绩,还是小激动了一下。这也应该是我最后一次参加数学建模竞赛,所以一直想着写个竞赛经验分享给大家。很多人参加这个比赛就是为了获奖,尤其对于想在上海落户的同学。而且这个比赛也是众多官方认证比............
一、CDC 概述CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:数据同步:用于备份,容灾; 数据分发:一个数据源分发给多个下游系统; 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种: 基于查询的 C_1671465600
Could not start SASL: Error in sasl_client_start (-4) SASL(-4): no mechanism available: No worthy mechs found (code THRIFTTRANSPORT): TTransportException('Could not start SASL: Error in sasl_client_st...