doris stream load

2023-05-16

package uhp;


import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.*;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;




public class HttpClientUtilStreamLoadDoris {


    private final static String DORIS_HOST_DEV = "xxx";

    //public static Logger logger = LoggerFactory.getLogger(HttpClientUtilStreamLoadDoris.class);

    //    private final static String DORIS_TABLE = "join_test";
    private final static String DORIS_USER = "xxx";
    private final static String DORIS_PASSWORD = "xxx";
    private final static int DORIS_HTTP_PORT = 8030;

    public void sendData(String content, String db, String table) throws Exception {

        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                DORIS_HOST_DEV,
                DORIS_HTTP_PORT,
                db,
                table);
        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom().addInterceptorFirst(new ContentLengthHeaderRemover())
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });
        CloseableHttpClient client = httpClientBuilder.build();


        HttpPut put = new HttpPut(loadUrl);
        StringEntity entity = new StringEntity(content, "UTF-8");
        put.setHeader(HttpHeaders.EXPECT, "100-continue");
        put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(DORIS_USER, DORIS_PASSWORD));

        //put.setHeader("strip_outer_array", "true");
        put.setHeader("format", "json");
        //put.setHeader("merge_type", "MERGE");
        //put.setHeader("delete", "canal_type=\"DELETE\"");
        put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81dd");
        put.setEntity(entity);
        put.removeHeaders("Content-Length");


        reConnect(client, put);

    }

    private void reConnect(CloseableHttpClient client, HttpPut put) throws Exception {


        String loadResult = "";
        CloseableHttpResponse response = client.execute(put);
        //todo 调用方法
        if (response.getEntity() != null) {

            loadResult = EntityUtils.toString(response.getEntity());
        }
        final int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode != 200) {
            System.out.println("写入失败");
        } else {
            if (loadResult.contains("OK") && loadResult.contains("Success")) {
                System.out.println(loadResult);
            } else if (loadResult.contains("Fail")) {
                throw new Exception(loadResult + ",抛出异常,任务失败,当前时间: " + System.currentTimeMillis());
            } else {
                throw new Exception(loadResult + ",抛出异常,任务失败,当前时间: " + System.currentTimeMillis());
            }

        }


    }

    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    public static void main(String[] args) throws Exception {
        HttpClientUtilStreamLoadDoris httpClientUtilStreamLoadDoris = new HttpClientUtilStreamLoadDoris();
        httpClientUtilStreamLoadDoris.sendData("{\"event_day\":\"2021-03-24\",\"siteid\":1,\"citycode\":1,\"username\":\"杰a森\",\"pv\":23,\"uv\":66}","aac_test","aac_test_table2");

    }

    private static class ContentLengthHeaderRemover implements HttpRequestInterceptor{
        @Override
        public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
            request.removeHeaders("Content-Length");// fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");
        }
    }
}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

doris stream load 的相关文章

  • 在 HTML5 websocket 服务器中切割媒体流,用于基于网络的聊天/视频会议应用程序

    我们目前正在开发一个聊天 文件共享 视频会议应用程序使用 HTML5 websocket https stackoverflow com questions 4220672 implementing webbased real time v
  • 确定 std::istream 长度的更好方法?

    有没有比以下更好的方法来确定 std istream 的长度 std istream pcStream GetSomeStream pcStream gt seekg 0 ios end unsigned int uiLength pcSt
  • 将 ionic Zip 读取为内存流 C#

    我正在使用 Ionic Zip 通过以下方法将 ZipFile 提取到内存流 private MemoryStream GetReplayZipMemoryStream MemoryStream zipMs new MemoryStream
  • 获取 Youtube 上的游戏直播列表

    我正在尝试使用 Youtube 数据 API 来获取当前与游戏相关的直播流列表 但我找不到任何符合我需要的端点并返回每个频道的观看者数量 你们知道我该如何做到这一点吗 Thanks 游戏直播列表 videoCategoryId 20 是 游
  • 如何将整个流读入 std::string ?

    我正在尝试将整个流 多行 读入字符串中 我正在使用这段代码 它有效 但它冒犯了我的风格感 当然有更简单的方法吗 也许使用字符串流 void Obj loadFromStream std istream stream std string s
  • nginx server_name 在流块内可能吗?

    目前设置如下 stream server listen 9987 udp server name subdomain EXAMPLE com this line is resulting in an error proxy pass loc
  • LC3 LEA指令和存储的值

    我对这个问题感到困惑 指令后寄存器0中存储的值是多少 LEA R0 A 被处决了吗 为什么答案是x370C 我认为应该将A的地址加载到R0中 如果是这样我们怎么知道地址 有人可以帮忙吗 非常感谢 ORIG X3700 LEA R0 A LD
  • 加载数据infile,Windows和Linux的区别

    我有一个需要导入到 MySQL 表的文件 这是我的命令 LOAD DATA LOCAL INFILE C test csv INTO TABLE logs fields terminated by LINES terminated BY n
  • 如何从 C++ std::basic_ostream 派生并使 << 运算符虚拟?

    我正在编写一个具有各种消息输出的类 我想让这个类变得通用并且独立于平台 所以我正在考虑通过一个基本流引用它 它可以将所有消息转储到流中 通过这样做 如果该类在控制台程序中使用 我可以通过std cout并显示在控制台窗口中 或者我可以将派生
  • 如何使用 ffmpeg 设置默认流

    我有一些 m4v 文件 我想用 ffmpeg 添加字幕 我知道我需要映射流以将它们放入输出文件中 但如何确保此字幕流将是默认流 字幕是 srt 人们似乎说它们与 mp4 容器不兼容 我需要先将字幕转换为什么 另外 各种流的顺序重要吗 视频流
  • Java 中序列化的目的是什么?

    我读过很多关于序列化的文章 以及它如何如此美好和伟大 但没有一个论点足够令人信服 我想知道是否有人能真正告诉我通过序列化一个类我们真正可以实现什么 让我们先定义序列化 然后我们才能讨论它为什么如此有用 序列化只是将现有对象转换为字节数组 该
  • 将网站加载到 DIV 中

    当我在文本框中写入 URL 然后单击提交按钮时 如何实际从网站检索数据 我希望将数据放入我拥有的 div 中 这可能吗 我已经尝试过这个 但它不起作用
  • Cypher Neo4j 无法加载外部资源

    在 Windows 环境中 我尝试加载带有以下语句的 csv 文件 LOAD CSV WITH HEADERS FROM file E Neo4j customers csv AS row 它似乎无法正常工作并返回 无法加载外部资源 文件
  • 使用任何节点模块在内存中创建 ZIP 文件

    是否有任何节点模块可以在内存中创建 zip 我不想将 zip 文件保存在磁盘上 以便我们可以将这个创建的 zip 文件发送到其他服务器 从内存 做这个的最好方式是什么 这是我的例子 var file system require fs va
  • 如何使用 Spring Boot 传输音频

    我想让用户能够播放声音 我的实现在 Firefox 上运行良好 在 Safari 上 不播放声音 我验证了音频控制可以在 Safari 中与其他网站一起使用 所以 我认为我必须更改控制器中的某些内容 控制器 RequestMapping v
  • 在 C# 中使用流读取大文本文件

    我有一项可爱的任务 就是研究如何处理加载到我们应用程序的脚本编辑器中的大文件 就像VBA http en wikipedia org wiki Visual Basic for Applications用于我们内部产品的快速宏 大多数文件约
  • 如何在 Java 中读取/转换 InputStream 为字符串?

    如果你有一个java io InputStream对象 您应该如何处理该对象并生成一个String 假设我有一个InputStream包含文本数据 我想将其转换为String 例如我可以将其写入日志文件 最简单的方法是什么InputStre
  • C# List 处置/关闭

    我正在设置订阅服务 以便按计划向我们公司的各个人员发送报告 我计划通过电子邮件发送报告 我使用的报告系统能够导出为 PDF 流 而不是写入临时文件 大多数人会收到不止一份报告 因此我尝试将它们全部附加到一封电子邮件中 执行以下操作 List
  • 在 Python 中使用音频流 RTMP 通过管道和 OpenCV 到 FFmpeg

    我正在尝试使用音频流式传输 FFmpeg 我将在下面展示我的代码 导入模块 import subprocess as sp 创建变量 rtmpUrl rtmp a rtmp youtube com live2 key camera path
  • 如何将 MemoryStream 写入 byte[] [重复]

    这个问题在这里已经有答案了 可能的重复 从流创建字节数组 https stackoverflow com questions 221925 creating a byte array from a stream 我正在尝试在内存中创建文本文

随机推荐

  • Python中str与bytes互相转换

    快速转换方式 str to bytes my str 61 34 hello world 34 my str as bytes 61 str encode my str type my str as bytes ensure it is b
  • Python关于%matplotlib inline

    在github代码中经常会看到这样的代码 xff1a import numpy import matplotlib pyplot as plt from pandas import read csv import math from ker
  • Jupyter Notebook介绍、安装及使用教程

    目录 一 什么是Jupyter Notebook xff1f 1 简介 Jupyter Notebook是基于网页的用于交互计算的应用程序 其可被应用于全过程计算 xff1a 开发 文档编写 运行代码和展示结果 Jupyter Notebo
  • Python读取XML

    From http www cnblogs com fnng p 3581433 html 关于python读取xml文章很多 xff0c 但大多文章都是贴一个xml文件 xff0c 然后再贴个处理文件的代码 这样并不利于初学者的学习 xf
  • matlab解决中文显示乱码

    matlab很多函数在读取中文后显示乱码 xff0c 为了显示中文 xff0c 应改为UTF 8方式或其他支持中文的编码方式 xff0c 这在Matlab中的操作为 xff1a slCharacterEncoding 39 UTF 8 39
  • Matlab写TIFF格式文件(多于3波段)

    1 起因 通常情况下 xff0c 使用MATLAB做图像处理后 xff0c 使用下面的命令就可以保存处理结果为图片 imwrite im 39 im bmp 39 而如果需要保存的图像为single或者double类型 xff0c 或保存的
  • Python包设置清华源(pip, anaconda等)

    pip设置清华源 pypi 镜像每 5 分钟同步一次 临时使用 pip install i https pypi tuna tsinghua edu cn simple some package 注意 xff0c simple 不能少 是
  • shapefile字符集编码设置

    http zhihu esrichina com cn article 3 在 ArcGIS Desktop ArcMap ArcCatalog and ArcToolbox 中 xff0c 有编码页转换功能 xff08 CODE PAGE
  • pyhton 遍历文件夹,筛选文件

    如果我们需要遍历一个文件夹下的所有文件 xff0c 子文件夹里的内容 xff0c 用Python来实现 xff0c 很方便 xff0c 主要使用os walk folder xff0c 其中folder 是文件夹的路径 xff1a 先看代码
  • VINS 详解

    VINS是视觉与IMU融合SLAM的代表 xff0c 其实现了一个较为完整的SLAM工作 xff0c 开源地址为 xff1a GitHub HKUST Aerial Robotics VINS Mono A Robust and Versa
  • Python OS 文件/目录方法

    From http www runoob com python os file methods html os 模块提供了非常丰富的方法用来处理文件和目录 常用的方法如下表所示 xff1a 序号方法及描述1 os access path m
  • Python 异常处理

    From http www runoob com python python exceptions html python提供了两个非常重要的功能来处理python程序在运行中出现的异常和错误 你可以使用该功能来调试python程序 异常处
  • deeplabV3+源码分解学习

    From horsetif https www jianshu com p d0cc35b3f100 github上deeplabV3 43 的源码是基于tensorflow xff08 slim xff09 简化的代码 xff0c 是一款
  • 常用颜色名称与RGB数值对照表

    From http xh 5156edu com page z1015m9220j18754 html 颜色名 中文名称 Hex RGB 十进制 Decimal LightPink 浅粉红 FFB6C1 255 182 193 Pink 粉
  • c#调用C++DLL EntryPointNotFoundException 找不到入口点

    From http www voidcn com article p kqogmify rh html c 程序调用C 43 43 的dll的时候 xff0c 经常出现这样的问题 xff1a System EntryPointNotFoun
  • 混洗numpy.random.shuffle()与numpy.random.permutation()的区别

    参考API xff1a https docs scipy org doc numpy reference routines random html 1 numpy random shuffle API中关于该函数是这样描述的 xff1a M
  • 以time/gettimeofday系统调用为例分析ARM64 Linux 5.4.34

    目录 1 准备工作 2 触发系统调用 2 1依据 amp 分析 2 2构造代码 2 3触发系统调用 3 分析系统调用 3 1中断处理分析 xff08 保存现场 xff09 3 2内核堆栈pt regs xff08 保存现场 xff09 3
  • Kettle连接Access抽取数据到MS SQLServer

    软件准备 xff1a kettle5 1 access xff08 32位 xff09 jdk1 7 xff08 32位 xff09 软件位数需要一致 xff0c 不要求操作系统位数 搭建流程 xff1a 1 access新建表 2 准备a
  • python2 linux 解析文本乱码或UnicodeDecodeError: ‘ascii’ codec can’t decode byte

    linux乱码 xff0c 加下面两行 reload sys sys setdefaultencoding 39 utf 8 39 open 加参数errors 61 39 ignore 39 file init 61 io open fi
  • doris stream load

    package uhp import java io IOException import java nio charset StandardCharsets import org apache commons codec binary B