Kafka使用工具封装

2023-11-01

maven依赖

        <!--kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <!-- log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

log4j.properties

log4j.rootLogger=INFO,R,stdout    

log4j.appender.stdout=org.apache.log4j.ConsoleAppender    
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [%t] [%c]-[%p] %m%n

log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=./logs/info.log
#log4j.appender.R.File=./logs/compSearcher.log
log4j.appender.R.DatePattern ='.'yyyy-MM-dd
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [%c]-[%p] %m%n

KafkaUtils封装

package com.wg.utils;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.FileReader;
import java.io.IOException;
import java.util.*;


public class KafkaUtils {

    private static Producer<String,String> producer =null;
    private static String broker_ip="xx.xx.10.182:9092";
    private static String zk_ip="xx.xx.10.182:2181";
    public static String group_id="test";

    static {
        Properties prop = loadOutProp("./conf/kafka.properties");
        broker_ip= prop.getProperty("broker.list");
        zk_ip= prop.getProperty("zookeeper.connect");
        group_id=prop.getProperty("group.id");
    }

    /**
     * 实际上调用send方法并不能保证消息被成功写入到kafka。为了实现同步的发送消息,
     * 并监控每条消息是否发送成功,需要对每次调用send方法后返回的Future对象调用get方法。
     * (get方法的调用会导致当前线程block,直到发送结果返回,不管是成功还是失败)
     * @param topic
     * @param key
     * @param value
     * @return
     */
    public static boolean send(String topic,String key,String value){
        ProducerRecord<String,String> r = new ProducerRecord<String,String>(topic,key,value);
        try {
            if(producer==null){
                producer = new KafkaProducer<String,String>(getProducerProp());
            }
            producer.send(r).get();
            System.out.println("send to topic "+topic);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    private static Properties getProducerProp() {
        // 构造一个java.util.Properties对象
        Properties props = new Properties();
        // 指定bootstrap.servers属性。必填,无默认值。用于创建向kafka broker服务器的连接。
        props.put("bootstrap.servers", broker_ip);
        // 指定key.serializer属性。必填,无默认值。被发送到broker端的任何消息的格式都必须是字节数组。
        // 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 指定value.serializer属性。必填,无默认值。和key.serializer类似。此被用来对消息体即消息value部分做序列化。
        // 将消息value部分转换成字节数组。
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //acks参数用于控制producer生产消息的持久性(durability)。参数可选值,0、1、-1(all)。
        props.put("acks", "-1");
        //props.put(ProducerConfig.ACKS_CONFIG, "1");
        //在producer内部自动实现了消息重新发送。默认值0代表不进行重试。
        props.put("retries", 3);
        //props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //调优producer吞吐量和延时性能指标都有非常重要作用。默认值16384即16KB。
        props.put("batch.size", 16384);
        //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840);
        //控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
        props.put("linger.ms", 10);
        //props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        //指定了producer端用于缓存消息的缓冲区的大小,单位是字节,默认值是33554432即32M。
        props.put("buffer.memory", 33554432);
        //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put("max.block.ms", 3000);
        //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);
        //设置producer段是否压缩消息,默认值是none。即不压缩消息。GZIP、Snappy、LZ4
        //props.put("compression.type", "none");
        //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
        //该参数用于控制producer发送请求的大小。producer端能够发送的最大消息大小。
        //props.put("max.request.size", 10485760);
        //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        //producer发送请求给broker后,broker需要在规定时间范围内将处理结果返还给producer。默认30s
        //props.put("request.timeout.ms", 60000);
        //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);

        // 使用上面创建的Properties对象构造KafkaProducer对象
        //如果采用这种方式创建producer,那么就不需要显示的在Properties中指定key和value序列化类了呢。
        // Serializer<String> keySerializer = new StringSerializer();
        // Serializer<String> valueSerializer = new StringSerializer();
        // Producer<String, String> producer = new KafkaProducer<String, String>(props,
        // keySerializer, valueSerializer);
        return props;
    }

    /**
     * 不能容忍回写到kafka的日志丢失,因此必须使用同步的方式发送消息的配置
     * @return
     */
    private Producer<Integer, String> initKafkaProducer(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker_ip);//格式:host1:port1,host2:port2,....
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);//a batch size of zero will disable batching entirely
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//send message without delay
        props.put(ProducerConfig.ACKS_CONFIG, "1");//对应partition的leader写到本地后即返回成功。极端情况下,可能导致失败
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
        return kafkaProducer;
    }

    private static Properties loadInProp(String fileName) {
        Properties properties = new Properties();
        try {
            properties.load(MyIOUtils.class.getClassLoader().getResourceAsStream(fileName));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return properties;
    }

    private static Properties loadOutProp(String fileName) {
        Properties properties = new Properties();
        try {
            properties.load(new FileReader(fileName));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return properties;
    }



    private static Properties getConsumerProp(){
        Properties props = new Properties();
        //bootstrap.servers   必要
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,broker_ip);
        //group id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        //是否后台自动提交offset 到kafka
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //消费者偏移自动提交到Kafka的频率(以毫秒为单位enable.auto.commit)设置为true
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //故障检测,心跳检测机制 的间隔时间,,在该值范围内,没有接收到心跳,则会删除该消费者
        //并启动再平衡(rebanlance),值必须在group.min.session.timeout 和 group.max.session.timeout.ms之间
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        //key - value 的序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //从头开始消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,  "earliest");
        //从最新数据开始消费
//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,  "latest");


        //默认一次poll 多少条消息,要合理设置这个值
        //kafka如果超过 SESSION_TIMEOUT_MS_CONFIG 没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
        //如果没有设置好,就会出现 消费者不断被剔除,不断重新消费的情况
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, props.getProperty("max.poll.records","20"));
        return props;
    }

    private static kafka.consumer.ConsumerConfig initJavaConsumerConfig(String group_id) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zk_ip);
        props.put("group.id", group_id );
        props.put("auto.offset.reset", "smallest");
        props.put("zookeeper.session.timeout.ms",  "400000");
        props.put("zookeeper.sync.time.ms",  "200");
        props.put("auto.commit.interval.ms",  "1000");
        props.put("fetch.message.max.bytes",  "100000000");
        props.put("max.poll.records","5000");
        props.put("rebalance.backoff.ms","8000");
        props.put("rebalance.max.retries","7");
        kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
        return consumerConfig;
    }


    public static KafkaConsumer getConsumer(){
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getConsumerProp());
        return consumer;
    }



    /**
     * 适用于多线程消费
     * @param topic
     * @param thread_num 和 topic的 partition 保持一直
     * @return
     */
    public static List<KafkaStream<byte[], byte[]>> getConsumerStreams(String topic,String group_Id,int thread_num) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        ConsumerConnector consumerConnector = getConsumerConnector(group_Id);
        topicCountMap.put(topic,thread_num);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> kafkaStreams = consumerMap.get(topic);
        return kafkaStreams;
    }

    /**
     * 单线程消费
     * @param topic
     * @return
     */
    public static  KafkaStream<byte[], byte[]>  getConsumerStream(String topic,String group_id) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        ConsumerConnector consumerConnector = getConsumerConnector(group_id);
        topicCountMap.put(topic,1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        return stream;
    }

    public static void main(String[] args) {
        KafkaStream<byte[], byte[]> stream = getConsumerStream("test", "test0");
        testConsumerSream(stream);
    }


    private static ConsumerConnector getConsumerConnector(String group_id) {
        kafka.consumer.ConsumerConfig consumerProp = initJavaConsumerConfig(group_id);
        ConsumerConnector javaConsumerConnector = Consumer
                .createJavaConsumerConnector(consumerProp);
        return javaConsumerConnector;
    }

    /**
     * 一次取多条 使用样例
     * @param topic
     */
    public static void testConsumer(String topic){
        KafkaConsumer consumer = getConsumer();
        consumer.subscribe(Arrays.asList(topic));
        // consumer.subscribe(Collections.singletonList(this.topic));
        while (true) {
            //consumer.poll()
            long t1 = System.currentTimeMillis();
            ConsumerRecords<String, String> records = consumer.poll(2000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value()
                        + ") offset " + record.offset()
                        + " partition " + record.partition() + ")");
            }
            long t2 = System.currentTimeMillis();
            System.out.println("one poll cost==="+(t2-t1)+" ms ");
        }
    }

    public static void testConsumerSream(KafkaStream<byte[], byte[]> stream){
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String str = new String(it.next().message());
            System.out.println("str======>>>>"+str);
        }
    }

    /** @deprecated */
    private static kafka.javaapi.producer.Producer<String, String> getOldProducer(){
        Properties props = new Properties();
        props.put("metadata.broker.list", broker_ip);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("producer.type",  "sync");
        props.put("queue.buffering.max.ms", "10000");
        props.put("request.timeout.ms", "10000");
        // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
        // 值为0,1,-1,可以参考
        props.put("request.required.acks", "1");
        kafka.producer.ProducerConfig producerConfig = new kafka.producer.ProducerConfig(props);
        kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<>(producerConfig);
        return producer;
    }


    /**
     * 向kafka中插入数据
     *@deprecated
     * @param topic
     * @param input
     * @return 这个方法的问题是发送失败时候不会报错
     */
    private static kafka.javaapi.producer.Producer<String, String> oldProducer=null;
    public static void  oldSend(String topic, String input) {
        if(oldProducer==null){
            oldProducer = getOldProducer();
        }
        KeyedMessage<String, String> message = new KeyedMessage<String, String>(
                topic, input);
        //为了能够随机发送到不同partition
           /* String key = UUID.randomUUID().toString().substring(0, 5);
            KeyedMessage<String, String> message = new KeyedMessage<>(topic, key, input);*/
        oldProducer.send(message);
    }

    public static String getKey(int partionNum){
        return  UUID.randomUUID().toString().substring(0, partionNum);
    }
}

引用

Kafka Producer API的使用

kafka消费者Consumer参数设置及参数调优

基于kafka_2.11-2.1.0实现的生产者和消费者代码样例

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka使用工具封装 的相关文章

  • 计算广告读书笔记

    计算广告 广告主 媒体 用户 用户画像 ROI 进化 合约广告 多个合约在线分配问题 gt 竞价广告 交易终端TD 广告网络ADN gt 实时竞价RTB 广告交易平台ADX 需求方平台DSP 品牌广告 效果广告 点击率CTR 点击价值 到达
  • Zookeeper的常见面试题

    1 Zookeeper 1 1 Zookeeper基本概念 Zookeeper作为一个优秀高效且可靠的分布式协调框架 ZooKeeper 在解决分布式数据一致性问题时并没有直接使用Paxos算法 而是专门定制了一致性协议叫做 ZAB Zoo
  • Kafka面试必问几个概念 与 使用场景

    介绍下我写的这个kafka项目 里面做了详细的配置注释已经代码的demo 可供大家学习 项目 地址 springboot kafka集群项目实战 kafka集群批量消费数据去重和一致性 kafka的几个重要概念 接下来围绕下面几个概念来进行
  • MQ如何保证消息不丢失

    如何保证消息不丢失 哪些环节会造成消息丢失 其实主要就是跨网络的环境中需要考虑消息的丢失 主要是有以下几个方面 生产者往MQ发送消息 MQ的Broker是集群有主从的 主节点把消息同步到从节点时也需要考虑消息丢失问题 消息从内存持久化到硬盘
  • Kafka——javaAPI

    文章目录 Kafka的JavaAPI 1 未整合版的Kafka的API 1 1 Producer 消息发送端代码 1 2 Consumer 消息消费端代码 2 Spring Boot整合Kafka 2 1 发送者代码 Producer 2
  • kafka中partition数量与消费者对应关系

    kafka是由Apache软件基金会开发的一个开源流处理平台 kafka是一种高吞吐量的分布式发布订阅消息系统 它可以处理消费者在网站中的所有动作流数据 kafka中partition类似数据库中的分表数据 可以起到水平扩展数据的目的 比如
  • Kafka原理分析

    在基础篇中我们介绍MQ的一些基础原理 这篇文章 我们针对kafka进行较深入的分析 上篇文章中我们提到了kafka中一个名词broker 其实broker可以理解成为一台kafa服务器 kafka的特性和功能 在kafka设计之初是为了实时
  • Kafka:主题创建、分区修改查看、生产者、消费者

    文章目录 Kafka后台操作 1 主题 2 分区 3 生产者 4 消费者组 Kafka后台操作 1 主题 1 创建主题 bin kafka topics sh create bootstrap server hadoop102 9092 r
  • kafka + zookeeper下载/安装/使用(超详细)

    kafka是需要zk来支持 所以先下载zk 1 下载安装zookeeper 下载地址 选择不带source的 下载下来解压2次 进入到 D zookeeper apache zookeeper 3 6 1 bin conf 目录下 把zoo
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • Kafka入门基础知识学习笔记-Kafka只是消息引擎吗

    学习极客时间 Kafka核心技术与实战 入门 03 05 作者 胡夕 Apache Kafka 的一名代码贡献者 目前在社区的 Patch 提交总数位列第 22 位 应该说算是国内比较活跃的贡献者了 胡夕老师 赠言 聪明人也要下死功夫 最近
  • kafka问题解决:org.apache.kafka.common.errors.TimeoutException

    记录使用kafka遇到的问题 1 Caused by java nio channels UnresolvedAddressException null 2 org apache kafka common errors TimeoutExc
  • kafka 监控工具--CMAK

    CMAK previously known as Kafka Manager is a tool for managing Apache Kafka clusters See below for details about the name
  • [Docker]使用Docker部署Kafka

    Kafka 是一个分布式流处理平台 它依赖于 ZooKeeper 作为其协调服务 在 Kafka 集群中 ZooKeeper 负责管理和协调 Kafka 的各个节点 因此 要在 Docker 容器中启动 Kafka 通常需要同时启动一个 Z
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • Kafka 架构及原理分析

    Kafka 架构及原理分析 文章目录 Kafka 架构及原理分析 简介 使用场景 架构 Broker Topic 副本机制 存储 消费分组 消费编号 数据多写支持 基于 binlog 实现主从复制 Kafka 的进阶功能 消息幂等性 事务
  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • Kafka性能保证和延时队列实现原理

    数据不丢不漏 不重不错 一 不丢 生产写入消息不丢失 数据组织形式 首先 从数据组织形式来说 kafka有三层形式 kafka有多个主题 topic 每个主题有多个分区 分区分为主分区和副本分区 每个分区又有多条消息 而每个分区可以分布到不
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们

随机推荐

  • sqli-labs靶机训练(11-15)

    less 11 POST型 1 寻找漏洞类型 外链图片转存失败 源站可能有防盗链机制 建议将图片保存下来直接上传 img YrAkcwyg 1641296909427 C Users 84305 AppData Roaming Typora
  • 攻防世界web

    攻防世界web 前言 准备ctf比赛 这里把攻防世界分值低于5分的基本刷了一遍 分值再高刷不动了 练习 view source 没难度知识禁用了右键点击 ctrl u查看源码拿到flag get post 这题没什么好说的 按着提示来就能拿
  • 又一个开源工具搞完了,工作效率直接翻倍

    博客首页 派 大 星 欢迎关注 点赞 收藏 留言 本文由派大星原创编撰 系列专栏 开源专栏 本系列主要输出作者自创的开源项目 作品 低代码生成器平台 大家好 我是派大星 距离上一次开发出开源项目的时间已经过去一段时间了 也不知道大家有没有使
  • 使用本地mysql+linux实现mysql主从同步

    1 配置linux 保证linux已经安装好了mysql 1 1修改该linux配置文件 vim etc my cnf 1 2重启linux的mysql systemctl restart mysqld 1 3使用账户密码登录linux中的
  • Arduino小车资料整理

    目录 一 小车简介 二 材料清单 三 Arduino UNO R3简介及使用说明 四 各模块安装接线及测试 1 驱动模块接线及测试 1 减速直流电机 2 L298N电机驱动模块 3 具体接线 4 代码及测试 2 巡线模块接线及测试 1 TC
  • 解决 font-weight 无效的问题

    近期调页面时有几个 font weight 需要修改 无论怎么调整字体粗细都没有变化 深入研究后总结下文 初探 本地写个例子 代码如下 p class thin This is a paragraph p p class normal Th
  • springboot 定时任务@Scheduled 和 异步@Async

    使用 EnableScheduling开启功能 Configuration EnableScheduling public class ScheduleConfig 编写任务 Component public class Scheduled
  • linux下使用动态壁纸(fantascene)

    让你的linux桌面动起来 幻梦动态壁纸 我也是突发奇想 做了这么一个程序 目前在多个linux下可以运行 支持双屏 理论上说支持mpv gt 27 0 qt gt 5 6的系统版本可编译 ubuntu16 04可以自行删减代码和编译 或者
  • java玫瑰花代码_Java版给爱人表白的玫瑰花程序代码

    1 书写表白语句的frame 渐入功能 package com wanju blessing import java awt Color import java awt Container import java awt Dimension
  • 常用眼底图像数据集简介及下载--糖尿病视网膜病变 EyePacs,APTOS2019,STARE数据集

    1 糖尿病视网膜病变图像介绍 1 微动脉瘤通常出现在病变早期 它是由于眼部毛细血管缺氧导致血管壁变薄 从而在视网膜上呈现出深红色的点状物 2 出血点一般出现在血管附近 它是由于血管阻塞导致血液渗出形成的 呈现暗斑状 3 软性和硬性渗出物的形
  • python中变量,python中变量的概念

    python中变量的概念 在python中 变量就是一种标识符 它是数据的名字 更专业的理解 变量是内存中数据的引用 编程语言里的变量和初中学习代数时的方程变量很相似 前面学习数字类型 bool类型时 我们一直在交互式解释器里进行操作 目的
  • java springboot 实现从数据库查询数据下载为md格式文件

    java springboot 实现从数据库查询数据下载为md格式文件 param param response 功能描述 下载文件 标题 byId getTitle 内容 byId getTextContent 格式 response s
  • 增强型PWM(EPWM)如何输出互补功能?

    1 概念 互补 两根线 输出的PWM 只有一端导通 和死区概念类似 死区时间 指在这段时间 上下都没有输出 带死区的PWM波可以防止上下两个器件同时导通 也就是说 当一个器件导通后关闭 再经过一段死区 这时才能让另一个导通 例如 红色线条的
  • nuxt百度收录

    import cheerio from cheerio export default Global page headers https go nuxtjs dev config head mode universal 修改百度收录 hoo
  • 04 ImageView中图片保存到文件

    最近做的一个小App中的一个功能 把ImageView中的图片保存为一个 jpg文件 如果设备上有SDCard 图片会被保存到SD卡上 如果没有则保存在设备的存储空间中 这里主要包含了两个要点 一是 Android文件保存时文件夹的创建 二
  • detectron2概述

    目录 detectron2框架 configs datasets README md prepare for tests sh prepare panoptic fpn py demo demo py predictor py detect
  • 关于Docker如何安装nginx

    目录 1 Nginx 1 2 安装nginx 2 容器之间相互通信 2 1 两个容器在同一网段 2 2 两个容器在不同网段 1 Nginx Nginx也是一款服务器 我们常用它做如 反向代理 负载均衡 动态与静态资源的分离的工作 反向代理
  • C语言-数据结构-栈(静态栈与动态栈)

    一 简介 在哔哩哔哩看视频学的 赫斌老师数据结构入门的内容 b站搜索 av6159200 P33 通过学习 能独立把赫斌老师教的敲出来 由于动态栈 链表阉割版 的功能很少 我并没有增加什么其它功能 但是我自己实现了静态栈 数组阉割版 还有就
  • 卸载联软UniAccess,删除UniAccess Agent记录

    UniAccess 卸载 公司假以安全上网为由 让公司员工安装所谓的 XX上网助手 实则是内嵌了联软的UniAccess监控系统 有关这个软件的用途就不用多介绍了 能找到这里的 我想已经对这个 流氓 软件有了基本的认识 话不多说 赶紧想办法
  • Kafka使用工具封装

    maven依赖