具有 kerberos 的 Kafka Java 生产者

2023-12-10

在 kerberos 环境中向 kafka 主题发送消息时出现错误。我们在 hdp 2.3 上有集群

我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java- Producer-with-kerberos/

但是为了发送消息,我必须首先显式执行 kinit,然后只有我才能将消息发送到 kafka 主题。 我尝试通过 java 类进行编织,但这也不起作用。 PFB代码:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

    public static void main(String[] args) {

        String principalName = "ctadmin";
        String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
        boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

        if (!authStatus) {
            System.out.println("Authntication fails, try something else  "  + authStatus);
        } else {
            System.out.println("Authntication successfull " + authStatus);
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        System.setProperty("sun.security.krb5.debug", "true");

        try {
            long events = Long.parseLong("3");
            Random rnd = new Random();

            Properties props = new Properties();
            System.out.println("After broker list- " + args[0]);

            props.put("metadata.broker.list", args[0]);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            props.put("security.protocol", "PLAINTEXTSASL");

            //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


            System.out.println("After config prop -1");

            ProducerConfig config = new ProducerConfig(props);

            System.out.println("After config prop -2 config" + config);

            Producer<String, String> producer = new Producer<String, String>(config);

            System.out.println("After config prop -3");

            for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                Date runtime = new Date();
                String ip = "192.168.2" + rnd.nextInt(255);
                String msg = runtime + " www.example.com, " + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);

                System.out.println("After config prop -1 data" + data);

                producer.send(data);
            }
            producer.close();

        } catch (Throwable th) {
            th.printStackTrace();

        }
    }
}

Pom.xml :从 hortonworks 存储库下载的所有依赖项。

        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.jasypt</groupId>
                <artifactId>jasypt-spring31</artifactId>
                <version>1.9.2</version>
                <scope>compile</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.1.2.3.4.0-3485</version>
            </dependency>

        </dependencies>

错误 : Case1:当我指定 myuser kafka_jass.conf 时

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
        at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
        at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)

MyUser_Kafka_jass.conf

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   doNotPrompt=true
   useTicketCache=true
   renewTicket=true
   principal="ctadmin/[email protected]";
   useKeyTab=true
   serviceName="kafka"
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   client=true;
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   storeKey=true
   useTicketCache=true
   serviceName="zookeeper"
   principal="ctadmin/[email protected]";
};

case2:当我指定Kafkas自己的jaas文件时

Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)

如果我在运行此应用程序之前执行 kinit,则效果很好,否则它将出现上述错误。 我无法在我的生产环境中执行此操作,如果我们的应用程序本身有任何方法可以执行此操作,请帮助我。 如果您需要更多详细信息,请告诉我。

Thanks:)


该错误位于 jaas 文件中的分号中,正如您在这段输出中看到的那样:

Line 6: expected [controlFlag]

该行不能有分号:

principal="ctadmin/[email protected]";

它只能存在于最后一行:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有 kerberos 的 Kafka Java 生产者 的相关文章

  • 如何通过 javaconfig 使用 SchedulerFactoryBean.schedulerContextAsMap

    我使用 Spring 4 0 并将项目从 xml 移至 java config 除了访问 Service scheduleService 带注释的类来自QuartzJobBean executeInternal 我必须让它工作的 xml 位
  • 在内存中使用 byte[] 创建 zip 文件。 Zip 文件总是损坏

    我创建的 zip 文件有问题 我正在使用 Java 7 我尝试从字节数组创建一个 zip 文件 其中包含两个或多个 Excel 文件 应用程序始终完成 没有任何异常 所以 我以为一切都好 当我尝试打开 zip 文件后 Windows 7 出
  • 为什么 JTables 使 TableModel 在呈现时不可序列化?

    所以最近我正在开发一个工具 供我们配置某些应用程序 它不需要是什么真正令人敬畏的东西 只是一个具有一些 SQL 脚本生成功能并创建几个 XML 文件的基本工具 在此期间 我使用自己的 AbstractTableModel 实现创建了一系列
  • 使用 LinkedList 实现下一个和上一个按钮

    这可能是一个愚蠢的问题 但我很难思考清楚 我编写了一个使用 LinkedList 来移动加载的 MIDI 乐器的方法 我想制作一个下一个和一个上一个按钮 以便每次单击该按钮时都会遍历 LinkedList 如果我硬编码itr next or
  • .properties 中的通配符

    是否存在任何方法 我可以将通配符添加到属性文件中 并且具有所有含义 例如a b c d lalalala 或为所有以结尾的内容设置一个正则表达式a b c anything 普通的 Java 属性文件无法处理这个问题 不 请记住 它实际上是
  • 动态选择端口号?

    在 Java 中 我需要获取端口号以在同一程序的多个实例之间进行通信 现在 我可以简单地选择一些固定的数字并使用它 但我想知道是否有一种方法可以动态选择端口号 这样我就不必打扰我的用户设置端口号 这是我的一个想法 其工作原理如下 有一个固定
  • 没有 Spring 的自定义 Prometheus 指标

    我需要为 Web 应用程序提供自定义指标 问题是我不能使用 Spring 但我必须使用 jax rs 端点 要求非常简单 想象一下 您有一个包含键值对的映射 其中键是指标名称 值是一个简单的整数 它是一个计数器 代码会是这样的 public
  • jdbc mysql loginTimeout 不起作用

    有人可以解释一下为什么下面的程序在 3 秒后超时 因为我将其设置为在 3 秒后超时 12秒 我特意关闭了mysql服务器来测试mysql服务器无法访问的这种场景 import java sql Connection import java
  • Hibernate 的 PersistentSet 不使用 hashCode/equals 的自定义实现

    所以我有一本实体书 public class Book private String id private String name private String description private Image coverImage pr
  • logcat 中 mSecurityInputMethodService 为 null

    我写了一点android应显示智能手机当前位置 最后已知位置 的应用程序 尽管我复制了示例代码 并尝试了其他几种解决方案 但似乎每次都有相同的错误 我的应用程序由一个按钮组成 按下按钮应该log经度和纬度 但仅对数 mSecurityInp
  • java for windows 中的文件图标叠加

    我正在尝试像 Tortoise SVN 或 Dropbox 一样在文件和文件夹上实现图标叠加 我在网上查了很多资料 但没有找到Java的解决方案 Can anyone help me with this 很抱歉确认您的担忧 但这无法在 Ja
  • 不接受任何内容也不返回任何内容的函数接口[重复]

    这个问题在这里已经有答案了 JDK中是否有一个标准的函数式接口 不接受也不返回任何内容 我找不到一个 像下面这样 FunctionalInterface interface Action void execute 可运行怎么样 Functi
  • 专门针对 JSP 的测试驱动开发

    在理解 TDD 到底是什么之前 我就已经开始编写测试驱动的代码了 在没有实现的情况下调用函数和类可以帮助我以更快 更有效的方式理解和构建我的应用程序 所以我非常习惯编写代码 gt 编译它 gt 看到它失败 gt 通过构建其实现来修复它的过程
  • 最新的 Hibernate 和 Derby:无法建立 JDBC 连接

    我正在尝试创建一个使用 Hibernate 连接到 Derby 数据库的准系统项目 我正在使用 Hibernate 和 Derby 的最新版本 但我得到的是通用的Unable to make JDBC Connection error 这是
  • 干净构建 Java 命令行

    我正在使用命令行编译使用 eclipse 编写的项目 如下所示 javac file java 然后运行 java file args here 我将如何运行干净的构建或编译 每当我重新编译时 除非删除所有内容 否则更改不会受到影响 cla
  • 如何使用mockito模拟构建器

    我有一个建造者 class Builder private String name private String address public Builder setName String name this name name retur
  • 使用反射覆盖最终静态字段是否有限制?

    在我的一些单元测试中 我在最终静态字段上的反射中遇到了奇怪的行为 下面是说明我的问题的示例 我有一个基本的 Singleton 类 其中包含一个 Integer public class BasicHolder private static
  • 使用 CXF-RS 组件时,为什么我们使用 而不是普通的

    作为后续这个问题 https stackoverflow com questions 20598199 对于如何正确使用CXF RS组件我还是有点困惑 我很困惑为什么我们需要
  • 使用 svn 1.8.x、subclise 1.10 的 m2e-subclipse 连接器在哪里?

    我读到 m2e 的生产商已经停止生产 svn 1 7 以外的任何版本的 m2e 连接器 Tigris 显然已经填补了维护 m2e subclipse 连接器的空缺 Q1 我的问题是 使用 svn 1 8 x 的 eclipse 更新 url
  • Spring Boot 无法更新 azure cosmos db(MongoDb) 上的分片集合

    我的数据库中存在一个集合 documentDev 其分片键为 dNumber 样本文件 id 12831221wadaee23 dNumber 115 processed false 如果我尝试使用以下命令通过任何查询工具更新此文档 db

随机推荐

  • 使用自定义属性有效吗?

    我想取消任何链接并为每个链接添加额外的属性 下面是我是如何实现这一目标的 function anularEnlaces nav a each function var href this attr href var id this attr
  • C# 按字母顺序和长度对 Arraylist 字符串进行排序

    我正在尝试排序ArrayList of String Given A C AA B CC BB Arraylist Sort gives A AA B BB C CC 我需要的是 A B C AA BB CC ArrayList list
  • +0和-0一样吗?

    阅读通过ECMAScript 5 1 规范 0 and 0是杰出的 那么为什么呢 0 0评估为true JavaScript 使用IEEE 754 标准来表示数字 从维基百科 签名零为零并带有相关符号 在普通算术中 0 0 0 然而 在计算
  • Android 应用程序 - 如何获取联系人的生日

    我正在开发一个 Android 应用程序 我需要将每个联系人的生日与当前日期进行匹配 如果是的话 则处理一些业务逻辑 这需要完整的联系人详细信息 我找到了分别读取联系人生日或联系人本身的方法 但对如何将两者结合起来感到困惑 有人可以提供一些
  • 如何用段落标签包围所有文本片段? [关闭]

    Closed 这个问题需要多问focused 目前不接受答案 我想在任何文本项周围放置段落标签 因此应该避免表格和其他元素 我怎么做 我想它可以用某种方式制成preg replace 以下是一些可以帮助您完成您想做的事情的函数 nl2p T
  • Yii2 gridview - 模式仅在单击第一行时显示

    我正在使用 kartik grid GridView 和 kartik grid ExpandRowColumn 来显示摘要信息 在 ExpandRowColumn 上 我有另一个 gridview 来列出详细信息 这部分工作正常 在 Ex
  • swift 3 Playground 中的回调[重复]

    这个问题在这里已经有答案了 您好 我正在尝试在操场上执行这行代码 但得到任何响应输出 我的代码如下 func testCallbackEmpty callback escaping gt Void DispatchQueue main as
  • 如何在 WordPress 插件中使用多媒体上传器?

    我尝试在 WordPress 插件中添加多重上传选项 我在插件中重复了此代码 两次 仅更改了 id 名称
  • 我可以使用反射访问 ItemsControl 的 ItemsHost 吗?

    我正在创建自定义ItemsControl这是源自DataGrid 我需要访问 ItemsHost 这是Panel实际上包含行DataGrid 我见过一些丑陋的技巧来做到这一点 但我认为它们比使用反射更糟糕 那么我可以使用反射访问 Items
  • 如何使用/导入http模块?

    我一直在玩Angular 2 快速入门 如何在 Angular 2 中使用 导入 http 模块 我看过Angular 2 Todo s js 但它不使用 http 模块 我已经添加 ngHttp angular http to depen
  • to_number Oracle SQL 中数字格式的动态长度

    我有一个表 其中的数字存储为varchar2和 作为小数点分隔符 例如 5 92843 我想使用 来计算这些数字 因为这是系统默认值 并且使用了以下内容to number去做这个 TO NUMBER number 99999D9999 NL
  • Neo4j 桌面打开时出错 - “无法读取未定义的属性“名称”

    我有一个运行 Windows Server 2016 的盒子 我在这里找到了有关 MAC 的问题的答案 neo4j 初始化错误 TypeError 无法读取未定义的属性 名称 这个答案似乎在 Windows 中不起作用 因为删除 AppDa
  • PHP 通过键和值创建数组

    我在下面的 PHP 代码中有一个数组 我想将该数组转换为按数据值分组 简化数组总是很困难 Array 0 gt Array video id gt 14 video title gt test1 video category name gt
  • 如何从聚合物组件访问父模型

    我将 my app 作为 index html 文件中的主要应用程序组件 并使用 model dart 作为其模型 这是我的应用程序模型 my app 以 my component 作为其内容 当用户与 my component 交互时 我
  • 如何在Python中获取字符串的大小(长度)

    例如 我得到一个字符串 str please answer my question 我想把它写入一个文件 但在将字符串写入文件之前我需要知道字符串的大小 我可以使用什么函数来计算字符串的大小 如果你正在谈论字符串的长度 你可以使用len g
  • 如何自动将当前路由中的特定值添加到所有生成的链接?

    我的 URL 中有网站文化 如下所示 routes MapRoute Default language controller action id languageDefaults languageConstraints 它的工作方式就像一个
  • 结合 lapply、svyby、svyratio 计算许多具有置信区间的比率

    我正在使用surveyR 中的包可与美国人口普查局的 PUMS 人口数据集配合使用 我为每个广泛的行业创建了一个布尔值和一个字符变量MigrationStatus具有三个值 Stayed Left Entered 我想按移民身份检查每个行业
  • 将数据从 Rails 视图传递到 webpacker 中的 VueJS 组件

    我正在尝试摆弄 Rails 5 1 的新 webpacker gem 以及 VueJS 但无法让我的 erb 视图将数据传递给 VueJS 组件 假设我有一个用户显示视图 view users show html erb 还有我的 Java
  • 使用VBA生成短哈希字符串

    我正在寻找一个 VBA 函数 它可以根据字符串内容生成非常短的哈希码 例如 3 个字符 From http www lammertbies nl forum viewtopic php t 302 Sub CRC16 Dim x As Lo
  • 具有 kerberos 的 Kafka Java 生产者

    在 kerberos 环境中向 kafka 主题发送消息时出现错误 我们在 hdp 2 3 上有集群 我跟着这个http henning kropponline de 2016 02 21 secure kafka java Produce