flink学习day05:checkpoint 原理与实践

2023-11-02

flink checkpoint

checkpointe是什么?

基于state出发,flink基于与state可以做非常多复杂的事情,但是state是存储在内存中,内存中的数据是不安全的易丢失的,所以flink为了解决这个问题就引入了checkpointed机制,所谓的checkpointe就是把整个flink job的某一瞬间的状态数据进行快照(拍照),后续可以从这个快照(持久到外部存储)。checkponite是可以周期性执行。

state 与checkpoint?

state就是某个一个task/operator的状态数据,存储在内存中

checkpoint是把state数据持久化存储的机制,可以周期性的执行。

2 checkpoint工作流程

就是希望把flinkjob 某一瞬间的所有状态数据全部持久化出来,存储到hdfs,

困难:flink job是一个分布式的程序,是不是可以考虑每个节点的task自己先拍照,然后最终合并到一起,这样也是一个大合照。

如果是多个人如何要求大家是同一瞬间和同一刻进行的这个动作,可以从时间点上控制,

但是对于程序我们如何实现每个task拍快照时所谓的时刻(不是时间)相同的呢?而是要求大家处理的数据是同一个时把状态记录下来。

因此flink的checkpoint是向数据流中加入一个barrier,每个operator收到这个barrier时把自己的state持久化出去,等到每个operator都进行这个checkpoint动作那就可以认为在这个barrier之前所有的数据都已经被所有operator处理过了。

所以从checkpoint恢复数据一定是不会丢失和重复的!!
在这里插入图片描述

一次checkpointe涉及的角色:

jobmanager–>checkpoint coordinator

所有的operator

persistent storage

checkpoint可选外部存储介质

memorystatebackend
在这里插入图片描述

基于文件系统的statebackend 推荐使用hdfsstatebackend

在这里插入图片描述

基于RocksDB的statebackend 建议如果是有超大状态数据以及希望进行增量checkpoint时使用这种方式的statebanckend

在这里插入图片描述

总结:

flink checkpoint可选的外部持久化介质有三种

memory statebackend:不建议,开发测试

fsstatebackend:生产建议使用,而且使用的hdfs

rocksdbstatebackend:生产可以使用,需要taskmanager装有rocksdb;有超大状态的时候可以考虑使用,以及需要增量checkpoint时使用。

checkpoint设置方式

1 修改flink-conf.yaml:全局修改,影响所有的程序

2 通过程序env.setstatebackend指定持久化介质类型,如果设置是rocksdb需要我们添加rocksdb的依赖。

具体的案例:

需求:自定义发送字符串数据,开启ck相关属性,查看确认是否有数据在指定目录下生成。

代码:

package cn.itcast.ck

import java.util.concurrent.TimeUnit

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

/*
演示ck相关配置
 */
object CheckPonitDemo {

  def main(args: Array[String]): Unit = {
    /*
    1 获取运行环境
    2 设置ck相关属性
    3 自定义数据源source,发送字符串数据
    4 打印
    5 启动
     */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2 设置ck属性
    // 2.1开启ck  //指定使用hdfsstatebackend
    env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/ck_demo"))
    //2.2 设置checkpoint的周期间隔  默认是没有开启ck,需要通过指定间隔来开启ck
    env.enableCheckpointing(1000)
    //2.3 设置ck的执行语义,最多一次,至少一次,精确一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //2.4 设置两次ck之间的最小时间间隔,两次ck之间时间最少差500ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // 2.5 设置ck的超时时间,如果超时则认为本次ck失败,继续一下一次ck即可,超时60s
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    //2.6 设置ck出现问题,是否让程序报错还是继续任务进行下一次ck,true:让程序报错,false:不报错进行下次ck
    //如果是false就是ck出现问题我们允许程序继续执行,如果下次ck成功则没有问题,但是如果程序下次ck也没有成功,
    //此时程序挂掉需要从ck中恢复数据时可能导致程序计算错误,或者是重复计算数据。
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    // 2.7设置任务取消时是否保留检查点  retain:则保存检查点数据,delete:删除ck作业数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //2.8 设置程序中同时允许几个ck任务同时进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    // 3自定义source
    val strDs: DataStream[String] = env.addSource(
      new RichSourceFunction[String] {
        var flag = true

        override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
          while (flag) {
            ctx.collect("hello flink")
            TimeUnit.SECONDS.sleep(1)
          }
        }

        override def cancel(): Unit = {
          flag = false
        }
      }
    )
    //4 打印
    strDs.print()
    // 4启动
    env.execute()

  }
}

总结:

相关属性需要设置如上所示,如果生产中需要结合数据量以及你对数据精确性要求,只要ck不过于频繁降低程序的实时性即可。建议使用hdfsstatebackend保证ck的稳定性。

flink重启策略

基于flink的ck进行容错处理之后,如果程序出现异常或者崩溃我们考虑可以让程序自动重启并恢复到犯错之前的状态(借助ck持久化的state)继续运行。

flink重启策略如何设置:

1 flink-conf.yaml:全局有效

2 env.setrestartStrategy:指定当前程序的重启策略

一般是与ck结合使用:

1 如果没有ck则没有重启,

2 如果设置ck,但是不设置重启策略,会进行无限的重启

重启策略介绍:三种

固定延迟重启策略

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 重启次数
  Time.of(10, TimeUnit.SECONDS) // 重启时间间隔
))

job会重启三次,每次间隔是10s

失败率重启:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 每个测量时间间隔最大失败次数
  Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
  Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
))

5分钟内重启三次,每次间隔是10s;如果5分钟内超过三次则程序退出,不再重启。

无重启策略

程序出现错误直接退出不会重启。

演示案例代码:

package cn.itcast.ck

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.ListState
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/*
演示flink重启策略
 */

object RestartDemo {
  def main(args: Array[String]): Unit = {
    /*
     * 1.获取执行环境

      2.设置检查点机制:路径,重启策略

      3.自定义数据源

        (1)需要继承SourceFunction

        (2)让程序报错,抛出异常

       4.数据打印

       5.触发执行

     */
    //1 创建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //方便观察数据,设置并行度为1
    env.setParallelism(1)
    // 2 设置检查点相关属性,重启策略
    env.enableCheckpointing(1000) //开启ck,每秒执行一次
    //设置检查点存储数据路径
    env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/ck"))
    //任务取消时,保存检查点数据(后续可以从检查点中恢复数据)
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 设置可以同时进行几个ck任务
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    //固定延迟重启策略: 程序出现异常的时候,重启3次,每次延迟5秒钟重启,超过3次,程序退出
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000))
    // 2 自定义source
    val sourceDs: DataStream[Long] = env.addSource(new MySource())

    // 3 打印数据
    sourceDs.print()
    // 4 启动
    env.execute()
  }
}

// 2 自定义source 实现sourcefunction,以及checkpointedfunction
class MySource extends SourceFunction[Long] {
  var flag = true
  //定义发送数据的初始值
  var offset = 0L
  //定义Liststate
  var offsetListState: ListState[Long] = null

  //2.1 生成数据的方法
  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while (flag) {
      //发送数据,实现发送一个数值,每次增加1 ,来表是所谓消费者的偏移量数据,offset应该从offsetListstate中获取,获取不到再从0开始

      offset += 1
      ctx.collect(offset)
      println("发送数据 offset>>" + offset)
      TimeUnit.SECONDS.sleep(1)
      //设置故障,程序遇到异常
      if (offset % 5 == 0) {
        println("程序遇到异常。。。,将要重启。。")
        throw new RuntimeException("程序遇到异常。。。,将要重启。。")
      }
    }
  }

  override def cancel(): Unit = {
    flag = false
  }

}

flink savepoint

savepoint:手动触发的checkpoint,全量持久化,操作比较重,由开发人员手动触发手动恢复,集群升级,任务代码升级等

checkpoint:程序自动管理,依据我们配置的ck的属性和重启策略来恢复task,小的错误例如网络抖动导致的超时

在这里插入图片描述

savepoint案例

需求:

我们实现一个wordcount案例,我们手动触发savepoint,然后停止程序,紧接着手动恢复程序到savepoint状态,我们判断单词是从0开始还是从savepoint中开始

代码:

package cn.itcast.sp

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/*
演示flink中savepoint使用
 */

object SavepointDemo {
  def main(args: Array[String]): Unit = {
    /*
     * 1.获取执行环境

      2.设置检查点机制:路径,重启策略

      3.自定义数据源

        (1)需要继承SourceFunction

        (2)发送字符串


       4. 单词统计 数据打印

       5.触发执行

     */
    //1 创建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //方便观察数据,设置并行度为1
    env.setParallelism(1)
    // 2 设置检查点相关属性,重启策略
    env.enableCheckpointing(1000) //开启ck,每秒执行一次
    //设置检查点存储数据路径
    env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/ck"))
    //任务取消时,保存检查点数据(后续可以从检查点中恢复数据)
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 设置可以同时进行几个ck任务
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    //固定延迟重启策略: 程序出现异常的时候,重启3次,每次延迟5秒钟重启,超过3次,程序退出
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000))
    // 2 自定义source
    val sourceDs: DataStream[String] = env.addSource(new MySource())

    // 3 打印数据
    sourceDs.flatMap(_.split(" ")).map(_ -> 1).keyBy(0).sum(1).print()
    // 4 启动
    env.execute()
  }
}

// 2 自定义source 实现sourcefunction,以及checkpointedfunction
class MySource extends SourceFunction[String] {
  var flag = true


  //2.1 生成数据的方法
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (flag) {
      ctx.collect("hello world")
      TimeUnit.SECONDS.sleep(1)
    }
  }

  override def cancel(): Unit = {
    flag = false
  }

}

总结savepoint

1 手动触发state持久化到具体指定的路径

执行savepoint

bin/flink savepoint jobid savepoint路径

恢复数据到指定的savepoint

bin/flink run -s 之前savepoint路径 --class xxx /root/sp.jar

savepoint就像是手动checkpoint!!

flink中一致性语义

讨论分布式计算程序入spark,flink的一致性语义到底在说啥呢?

通俗理解一致性语义:指的是一条数据进入处理程序之后被处理几次,程序会不会丢失或者重复计算数据。

总结

flink中端到端一致性语义

如果flink程序在两次ck之间失败了,如何恢复?因为程序重启之后是恢复到上次成功的ck出,又因为现在程序是在两次ck之间失败了,从上一次ck成功处程序又处理了部分数据并且输出到外部系统了。因此如果只是简单的把程序恢复到上次成功的ck处,会导致程序重复计算部分数据。

所以flink引出了两阶段提交:

1 上一次ck成功之后,开启一个pre-cmmit阶段,(到下一次ck成功,这期间所有的操作都是预提交,可撤回的)

2 只有等到所有operator都完成了ck,这时在进行第二阶段任务,真正提交commit.

source:如kafka可以重放数据

flink程序:通过ck机制保证一致性语义

sink:输出系统要有事务机制,可以pre-cmmit。

在这里插入图片描述

flink kafka 端到端一致性语义案例

需求:

使用socket发送数据,flink接收消息之后发送到kafka中,使用flinkkafkaproducer011(两阶段提交协议)。

两阶段提交协议是依赖ck

代码:

使用 flinkkafkaproducer生产数据(默认实现了两阶段提交的方法),直接使用即可。

package cn.itcast.twophase

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.kafka.clients.producer.ProducerConfig

//使用socket发送数据,flink接收消息之后发送到kafka中,使用flinkkafkaproducer011(两阶段提交协议)。
object SocketToKafkaTwoPhaseDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2 设置ck属性
    // 2.1开启ck  //指定使用本地fsstatebackend
    env.setStateBackend(new FsStateBackend("file:///e://data//checkpoint-demo//"))
    //2.2 设置checkpoint的周期间隔  默认是没有开启ck,需要通过指定间隔来开启ck
    env.enableCheckpointing(10000)
    //2.3 设置ck的执行语义,最多一次,至少一次,精确一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //2.4 设置两次ck之间的最小时间间隔,两次ck之间时间最少差500ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // 2.5 设置ck的超时时间,如果超时则认为本次ck失败,继续一下一次ck即可,超时60s
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    //2.6 设置ck出现问题,是否让程序报错还是继续任务进行下一次ck,true:让程序报错,false:不报错进行下次ck
    //如果是false就是ck出现问题我们允许程序继续执行,如果下次ck成功则没有问题,但是如果程序下次ck也没有成功,
    //此时程序挂掉需要从ck中恢复数据时可能导致程序计算错误,或者是重复计算数据。
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    // 2.7设置任务取消时是否保留检查点  retain:则保存检查点数据,delete:删除ck作业数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //2.8 设置程序中同时允许几个ck任务同时进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    // 3 读取socket数据
    val socketDs = env.socketTextStream("node1", 9999)
    // 4 打印
    socketDs.print()
    // 5 输出到kafka
    var topic = "test"
    val prop = new Properties()
    prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092")
    //设置连接kafka事务的超时时间,flinkproducer默认事务超时时间是1h,kafka中默认事务超时时间是15分钟
    prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000 * 15 + "") //设置客户端事务超时时间与kafka保持一致
    socketDs.addSink(new FlinkKafkaProducer011[String](
      topic,
      new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
      prop,
      //设置producer的语义,默认是at-least-once
      FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
    ))

    // 6 启动
    env.execute()
  }
}

总结:

(1)写入kafka保证exactly-once,可以使用flinkkafkaproducer011,

(2) 指定producer的一致性语义为exactly-once

注意:

1 指定flinkproducer作为kafkaclinet的事务超时时间不能大于kafka的事务超时时间!!

使用flink消费kafka数据,默认读取数据是read_uncommitted,可以读取提交和未提交的数据,我们验证效果需要指定读取已提交的数据。

代码:

package cn.itcast.twophase

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig

//测试flink+kafka 一致性语义效果
// 使用flink消费kafka中数据
object KafkaExactlyoneceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    var topic = "test"
    val prop = new Properties()
    prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092")
    //设置消费者的隔离级别,默认是读取未提交数据 read_uncommitted
    prop.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed")
    val kafkaDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String](
      topic,
      new SimpleStringSchema(),
      prop
    ))
    //打印
    kafkaDs.print("测试kafka 一致性结果数据>>")
    //启动
    env.execute()

  }
}

flink mysql端到端一致性语义案例

//需求:

从kafka中消费数据然后写入mysql,但是要求使用两阶段提交协议保证一致性语义。自定义mysqlsink并且实现两阶段提交的相关方法

代码:

package cn.itcast.twophase

import java.sql.{Connection, DriverManager}
import java.util.Properties

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeutils.base.VoidSerializer
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig

//测试flink+mysql 一致性语义效果 ,两阶段要结合ck才能实现,业务:单词计数,然后写入mysql指定表中
// 使用flink消费kafka中数据
object KafkaToMsqlDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2 设置ck属性
    // 2.1开启ck  //指定使用本地fsstatebackend
    env.setStateBackend(new FsStateBackend("file:///e://data//checkpoint-demo2//"))
    //2.2 设置checkpoint的周期间隔  默认是没有开启ck,需要通过指定间隔来开启ck
    env.enableCheckpointing(10000)
    //2.3 设置ck的执行语义,最多一次,至少一次,精确一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //2.4 设置两次ck之间的最小时间间隔,两次ck之间时间最少差500ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // 2.5 设置ck的超时时间,如果超时则认为本次ck失败,继续一下一次ck即可,超时60s
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    //2.6 设置ck出现问题,是否让程序报错还是继续任务进行下一次ck,true:让程序报错,false:不报错进行下次ck
    //如果是false就是ck出现问题我们允许程序继续执行,如果下次ck成功则没有问题,但是如果程序下次ck也没有成功,
    //此时程序挂掉需要从ck中恢复数据时可能导致程序计算错误,或者是重复计算数据。
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    // 2.7设置任务取消时是否保留检查点  retain:则保存检查点数据,delete:删除ck作业数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //2.8 设置程序中同时允许几个ck任务同时进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    var topic = "test"
    val prop = new Properties()
    prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092")
    //设置消费者的隔离级别,默认是读取未提交数据 read_uncommitted
    prop.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
    //设置程序消费kafka的便宜量提交策略
    prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    val kafkaConsumer = new FlinkKafkaConsumer011[String](
      topic,
      new SimpleStringSchema(),
      prop
    )
    //设置kafka消费者偏移量是基于ck成功时提交
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
    val kafkaDs: DataStream[String] = env.addSource(kafkaConsumer)
    //打印
    kafkaDs.print("读取到的kafka中的数据>>")
    //实现单词统计逻辑
    val wcDs = kafkaDs.flatMap(_.split(" ")).map(_ -> 1).keyBy(0).sum(1)
    //写入mysql,要求使用两阶段协议保证一致性语义为exactly-once
    wcDs.addSink(new MysqlTwoPhaseCommit)
    //启动
    env.execute()

  }
}

//自定义 sinkfunciton实现TwoPhaseCommitSinkFunction in:(单词,数量) txn:transaction对象(自定义), context:void
class MysqlTwoPhaseCommit extends TwoPhaseCommitSinkFunction[(String, Int), MysqlConnectionSate, Void](
  //自定义对象的序列化类型
  new KryoSerializer[MysqlConnectionSate](classOf[MysqlConnectionSate], new ExecutionConfig),
  //上下文件对象序列化类型
  VoidSerializer.INSTANCE
) {

  //开启事务 ,pre-commit开始阶段, 每次开始ck
  override def beginTransaction(): MysqlConnectionSate = {
    // 获取一个连接
    val connection = DriverManager.getConnection("jdbc:mysql://node1:3306/test", "root", "123456")
    //执行 ,关闭mysql的事务自动提交动作
    connection.setAutoCommit(false) //mysql中默认提交事务是自动提交,改为手动提交,ck成功时flink自动提交
    new MysqlConnectionSate(connection)
  }

  //执行你的动作
  override def invoke(transaction: MysqlConnectionSate, value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    //读取数据并写入mysql中
    val connection = transaction.connection
    //sql语句  word是主键,可以根据主键更新,duplicate key update:主键存在则更新指定字段的值,否则插入新数据
    var sql = "insert into t_wordcount(word,count) values(?,?) on duplicate key update count= ? "
    //获取preparestatement
    val ps = connection.prepareStatement(sql)
    //赋值
    ps.setString(1, value._1)
    ps.setInt(2, value._2)
    ps.setInt(3, value._2)

    ps.executeUpdate() //不是预提交,
    // 关闭
    ps.close()
  }
  //预提交
  override def preCommit(transaction: MysqlConnectionSate): Unit = {
    //invoke中完成
  }

  //真正提交
  override def commit(transaction: MysqlConnectionSate): Unit = {
    transaction.connection.commit()
    transaction.connection.close()
  }

  //回滚
  override def abort(transaction: MysqlConnectionSate): Unit = {
    transaction.connection.rollback()
    transaction.connection.close()
  }


}

//要求 该connect无需被序列化,加上transient关键字可以保证该属性不会随着对象序列化
class MysqlConnectionSate(@transient val connection: Connection) {
}

总结

1 实现twophasecommitsinkfunction,需要注意数据的泛型,以及序列化类型,我们自定义的mysqlconnectionstate的主要作用其实就是存放一个connection对象,供其他方法使用

2 mysqlconnectionstate对象中的connection属性需要添加transient关键字,保证不被序列化

一致性总结:

1 flink程序结合自身ck机制加上外部输出系统和数据源具有重放功能实现了端到端的一致性,但是这种一致性带来的问题就是程序的实时性下降明显,除非对数据的精确性要求很高才使用,一般我们使用语义就是at-least-once即可(这种语义效率最高但是有可能丢失数据)。

flink 异步IO

什么场景使用异步IO?

我们flink在流式计算过程需要与外部存储系统进行交互,默认是同步操作,就是发送请求等待响应,等待过程什么也不做,单纯等待,因此会造成吞吐量下降。同步IO的弊端就是如此。

为了解决同步io,与外部系统交互吞吐量下降明显的问题,出现异步IO,异步io无需单纯等待响应,发送请求之后可以继续发送请求,如果那个请求由返回则处理哪个结果,大大提高吞吐能力。

使用前提

1 要求数据库支持异步请求的api,

2 如果没有异步请求的api,可以变相通过线程池来实现,但是这种方式效率不如上一种。

在这里插入图片描述

异步IO的简单步骤

在这里插入图片描述

关于异步IO 请求发送顺序

在这里插入图片描述

flink中两种顺序

无序:完全不关心发送和响应的顺序 unorderedwatit

有序: orderwait api;processtime 其实是无无序,eventime保证watermark与消息之间不乱序,但是watermark与另一个watermark之间的消息是无序。

异步IO案例

需求

读取集合中的城市id使用异步IO去redis中读取城市名称

redis 准备数据

hset AsyncReadRedis  1 beijing
 hset AsyncReadRedis  2 shanghai
 hset AsyncReadRedis  3 guangzhou
 hset AsyncReadRedis  4 shenzhen
 hset AsyncReadRedis  5 hangzhou
 hset AsyncReadRedis  6 wuhan
 hset AsyncReadRedis  7 chengdu
 hset AsyncReadRedis  8 tianjin
 hset AsyncReadRedis  9 chongqing

代码:

package cn.itcast.async


import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.concurrent.Executors
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}

//读取集合中的城市id使用异步IO去redis中读取城市名称
object AsyncReadRedisDemo {
  def main(args: Array[String]): Unit = {
    /*
    1 创建运行环境
    2 加载集合中的城市id数据
    3 使用异步io读取redis
    4 打印数据
    5 启动
     */
    //    1 创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    2 加载集合中的城市id数据
    val cityIdDs = env.fromCollection(List("1", "2", "3", "4", "5", "6", "7", "8", "9"))
    // 3 使用异步io
    val resDs: DataStream[String] = AsyncDataStream.unorderedWait(
      cityIdDs,
      new MyAsyncFunciton,
      1000,
      TimeUnit.MILLISECONDS
    )
    // 4 打印结果
    resDs.print()
    // 5启动
    env.execute()

  }
}

//使用 异步IO的步骤
/*
1 准备一个类实现asyncrichfunction
 1.1 open 创建或者打开数据连接,一般使用连接池获取连接
 1.2  重写timeout方法,处理异步请求超时的问题
 1.3 发送异步请求的方法 asyncinvoke
2 使用AsyncDataStream工具类,调用orderedwait等方法实现异步操作流
 */

//@param <IN> The type of the input elements.
//*@ param <OUT>
//The type of the returned elements.  修改flink版本,改为flink.1.8即可
class MyAsyncFunciton extends RichAsyncFunction[String, String] {
  var jedis: Jedis = null

  //创建redis连接池
  override def open(parameters: Configuration): Unit = {
    val config = new JedisPoolConfig
    config.setLifo(true)
    config.setMaxTotal(10)
    config.setMaxIdle(10)
    //获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1
    config.setMaxWaitMillis(-1)
    //逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
    config.setMinEvictableIdleTimeMillis(1800000)
    //最小空闲连接数, 默认0
    config.setMinIdle(0)
    //每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
    config.setNumTestsPerEvictionRun(3)
    //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断  (默认逐出策略)
    config.setSoftMinEvictableIdleTimeMillis(1800000)
    //在获取连接的时候检查有效性, 默认false
    config.setTestOnBorrow(false)
    //在空闲时检查有效性, 默认false
    config.setTestWhileIdle(false)
    //初始化连接池对象
    val pool = new JedisPool(config, "node2", 6379)
    jedis = pool.getResource
  }

  //关闭连接
  override def close(): Unit = {
    jedis.close()
  }

  //处理异步请求超时的方法,否则默认超时会报出异常
  override def timeout(input: String, resultFuture: ResultFuture[String]): Unit = {
    println("input >>" + input + ",,,请求超时。。。")
  }

  //定义一个异步回调的上下文件
  implicit lazy val executor: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.directExecutor())

  //异步请求方法:input:输入数据, resultfutrue:异步请求结果对象
  def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
    //执行异步请求,请求结果封装在resultfuture中
    Future {
      val result = jedis.hget("AsyncReadRedis", input)
      //异步回调返回
      resultFuture.complete(Array(input+">>>"+result))
    }
  }
}

总结:

异步IO适用于我们流式程序需要访问外部存储的场景,mysql,redis,hbase等,

都可以考虑使用异步IO,即使不支持异步请求,也可以变相使用线程池来模拟实现异步的效果。

异步io 原理分析

在这里插入图片描述

flink 的背压

什么是背压?

就是在流式计算中,一个任务是分多个阶段每个阶段处理的内容以及所在节点都是不一样的,那么可能就会有处理快慢之分,如果处理慢的task其它task不管不顾那最终程序一定会崩溃。

flink如何解决背压?

flink通过上下游节点的资源使用情况来找到程序中处理速度最慢的组件,然后按照这个组件的速度来处理数据,

对于背压机制我们无需做任何处理时flink程序自动实现

参考意义就是尽量保证程序中没有明显处理速度过慢的节点,否则会拖慢整个程序。

flink的内存管理

flink程序也是运行jvm之上

大数据程序运行在jvm之上有哪些问题?

1 使用java对象存储,资源利用率不高

2 jvm会有GC的问题,影响程序性能

3 oom,内存使用超过堆内存

flink针对这些问题从以下几个方面解决:

1 自己定制内存管理,memorysegment结构,并且对taskmanager堆内存进行划分,保证大数据量的对象以及消耗资源的算法等都是运行在memorymanger管理的memorysegment上,

2 flink直接把对象序列化存储,并且提供了高效访问二进制数据方法,减少了序列化和发序列的开销

3 flink 提供了自己一套序列化框架,针对自己程序的特点,

4 flink扩展使用对外内存,来扩展内存,自己直接来管理内存,大大扩充taskmanager的内存空间。

flink sql

批处理案例

package cn.itcast.sql

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.types.Row

//演示flinksql批处理
//3)创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
case class Order(id: Int, userName: String, createTime: String, money: Double)

object BatchSql {
  def main(args: Array[String]): Unit = {
    /*
    1)获取一个批处理运行环境
    2)获取一个Table运行环境
    3)创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
    4)基于本地 Order 集合创建一个DataSet source
    5)使用Table运行环境将DataSet注册为一张表
    6)使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
    7)使用TableEnv.toDataSet将Table转换为DataSet
    8)打印测试
     */
    //    1)获取一个批处理运行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //    2)获取一个Table运行环境
    val tenv = TableEnvironment.getTableEnvironment(env)
    //    4)基于本地 Order 集合创建一个DataSet source
    val orderDataSet: DataSet[Order] = env.fromElements(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    )

    //    5)使用Table运行环境将DataSet注册为一张表
    tenv.registerDataSet("t_order", orderDataSet)
    //    6)使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
    //    用户消费订单的总金额、最大金额、最小金额、订单总数。
    val sql =
    """
      | select
      | sum(money) totalMoney,
      | max(money) maxMoney,
      | min(money) minMoney,
      | count(1) totalCount
      | from t_order
      | group by
      |  userName
    """.stripMargin
    //    7)使用TableEnv.toDataSet将Table转换为DataSet
    val table: Table = tenv.sqlQuery(sql)
    table.printSchema()

    tenv.toDataSet[Row](table).print()
  }
}

总结:

flink 使用sql进行batch开发

1 通过env获取一个tableenv对象

2 数据对象最好是case class类型,这样天生就有字段信息

3 如果对某个dataset想要执行sql查询,需要注册成为一张表,

tenv.regeisterDataSet(表名,datastream)

4 执行查询 tenv.sqlquery(sql),最后返回的是table对象

5 tenv.toDataSetRow,可以获取到dataset

flink 流处理sql 案例

注意地方:

在这里插入图片描述

需求:

使用Flink SQL来统计5秒内 用户的 订单总数、订单的最大金额、订单的最小金额。

代码

package cn.itcast.sql


import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row

import scala.util.Random

//    4)创建一个订单样例类 Order ,包含四个字段(订单ID、用户ID、订单金额、时间戳)
case class Order(orderId: String, userId: Int, money: Long, createTime: Long)
//使用Flink SQL来统计5秒内 用户的 订单总数、订单的最大金额、订单的最小金额。
object StreamSql {
  def main(args: Array[String]): Unit = {
    /*
    1)获取流处理运行环境
    2)获取Table运行环境
    3)设置处理时间为 EventTime
    4)创建一个订单样例类 Order ,包含四个字段(订单ID、用户ID、订单金额、时间戳)
    5)创建一个自定义数据源
    a.使用for循环生成1000个订单
    b.随机生成订单ID(UUID)
    c.随机生成用户ID(0-2)
    d.随机生成订单金额(0-100)
    e.时间戳为当前系统时间
    f.每隔1秒生成一个订单
    6)添加水印,允许延迟2秒
    7)导入 import org.apache.flink.table.api.scala._ 隐式参数
    8)使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段
    9)编写SQL语句统计用户订单总数、最大金额、最小金额
    分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口
    10)使用 tableEnv.sqlQuery 执行sql语句
    11)将SQL的执行结果转换成DataStream再打印出来
    12)启动流处理程序
     */
    //    1)获取流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    2)获取Table运行环境
    val tenv = TableEnvironment.getTableEnvironment(env)
    //    3)设置处理时间为 EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //    5)创建一个自定义数据源
    val orderDs: DataStream[Order] = env.addSource(new RichSourceFunction[Order] {
      var flag = true

      override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
        while(flag){
          ctx.collect(Order(UUID.randomUUID().toString, Random.nextInt(3),
            Random.nextInt(101), System.currentTimeMillis()))
          TimeUnit.SECONDS.sleep(1)
        }
      }

      override def cancel(): Unit = {
        flag = false
      }
    })
    //    6)添加水印,允许延迟2秒
    val wmDs: DataStream[Order] = orderDs.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
        override def extractTimestamp(element: Order): Long = {
          element.createTime
        }
      }
    )
    //7)导入 import org.apache.flink.table.api.scala._ 隐式参数
    //    8)使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段
    import org.apache.flink.table.api.scala._
    tenv.registerDataStream(
      "t_order",
      wmDs,
      'orderId, 'userId, 'money, 'createTime.rowtime
    )
    //    9)编写SQL语句统计用户订单总数、最大金额、最小金额
    val sql =
      """
        |select
        | userId,
        | count(1) totalCount,
        |max(money) maxMoney,
        |min(money) minMoney
        |from t_order
        |group by
        |tumble(createTime, interval '5' second),
        |userId
        |
      """.stripMargin

    //    10)使用 tableEnv.sqlQuery 执行sql语句
    val table = tenv.sqlQuery(sql)

    // 11 转为datastream打印
    tenv.toRetractStream[Row](table).print()

    // 12 启动
    env.execute()

  }
}

总结:

1 *timestamp是关键字不能作为字段的名字(关键字不能作为字段名字)*

2 使用流处理sql如果使用窗口一定要指定基于eventime,还要指定rowtime

3 注册为表的时候,通过’指定字段信息,指定rowtime

new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
    override def extractTimestamp(element: Order): Long = {
      element.createTime
    }
  }
)
//7)导入 import org.apache.flink.table.api.scala._ 隐式参数
//    8)使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段
import org.apache.flink.table.api.scala._
tenv.registerDataStream(
  "t_order",
  wmDs,
  'orderId, 'userId, 'money, 'createTime.rowtime
)
//    9)编写SQL语句统计用户订单总数、最大金额、最小金额
val sql =
  """
    |select
    | userId,
    | count(1) totalCount,
    |max(money) maxMoney,
    |min(money) minMoney
    |from t_order
    |group by
    |tumble(createTime, interval '5' second),
    |userId
    |
  """.stripMargin

//    10)使用 tableEnv.sqlQuery 执行sql语句
val table = tenv.sqlQuery(sql)

// 11 转为datastream打印
tenv.toRetractStream[Row](table).print()

// 12 启动
env.execute()

}
}


总结:

1 ***\*timestamp是关键字不能作为字段的名字(关键字不能作为字段名字)\****

2 使用流处理sql如果使用窗口一定要指定基于eventime,还要指定rowtime

3 注册为表的时候,通过'指定字段信息,指定rowtime

4 使用window操作,本案例使用滚动窗口,放置在group by之后。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink学习day05:checkpoint 原理与实践 的相关文章

  • 小学二三年级入门信奥赛,如何从Scratch进入C++的学习

    小学生几年级适宜开始学习C 这是讨论的比较热烈 也是比较热门的话题 小学生适宜几年级开始学C 小学生适宜几年级开始学C CSDN博客 simple happiness 信息学规划 北京二年级学生图形化过二级想往信奥靠拢如何准备 信息学规划
  • 1.常用单词学习

    1 1 听力练习 第一课 Av264771740 P1 Av736460000 P1 哔哩哔哩 bilibili 有推荐的吗 这个和这个都很推荐 这个多少钱 请给我这个 全部 这些一共多少钱 卫生间在哪呢 一度 願 麻烦再来一次 英語話 会
  • 通过一个寒假能学会黑客技术吗?看完你就知道了

    一个寒假能成为黑客吗 资深白帽子来告诉你 如果你想的是学完去美国五角大楼内网随意溜达几圈 想顺走一点机密文件的话 劝你还是趁早放弃 但是成为一名初级黑客还是绰绰有余的 你只需要掌握好渗透测试 Web安全 数据库 搞懂web安全防护 SQL注
  • 【OpenCV学习笔记02】- 图像入门

    内容 这里介绍了图像处理的入门操作 你将学习如何读取图像 如何显示图像以及如何将其保存回去 你将学习以下功能 cv imread cv imshow cv imwrite 简单使用OpenCV 读取图像 使用 cv imread 函数读取图
  • 机器学习与人类智能的融合:未来趋势与挑战

    1 背景介绍 人工智能 Artificial Intelligence AI 是指一种以计算机程序为代表的智能方法 可以理解 学习和应用人类智能的某些方面 机器学习 Machine Learning ML 是人工智能的一个子领域 它涉及到计
  • 慢思维大脑:SOP流程的心理学背景

    1 背景介绍 慢思维大脑 SOP流程的心理学背景 慢思维是指人类大脑在处理复杂问题 做出重要决策时所采用的思考方式 它与快速 自动的快思维相对 主要通过以下几种方式表现 深入思考 慢思维会让人类大脑深入思考问题的本质 从而找出更深层次的解决
  • 线性代数在深度学习中的角色

    1 背景介绍 深度学习是一种人工智能技术 它主要通过神经网络来学习和模拟人类大脑的思维过程 线性代数是一门数学分支 它研究的是向量和矩阵的运算 在深度学习中 线性代数起着非常重要的作用 因为它为神经网络提供了数学模型和计算方法 在这篇文章中
  • 慢思维的力量:如何解决复杂问题

    1 背景介绍 在当今的快速发展和竞争激烈的环境中 我们需要更有效地解决复杂问题 这需要我们具备一种称为慢思维的思考方式 它可以帮助我们更好地理解问题 制定更好的解决方案 本文将介绍慢思维的核心概念 算法原理 具体操作步骤以及数学模型公式 并
  • 如何成为一名数据科学家:必须掌握的技能和知识

    1 背景介绍 数据科学家是一种新兴的职业 它结合了计算机科学 统计学 数学和领域知识等多个领域的知识和技能 以解决实际问题 数据科学家的主要任务是收集 清洗 分析和解释大量数据 从中挖掘有价值的信息和知识 并将其应用于决策和预测 数据科学家
  • 用CHAT如何写大学生会计综合模拟实训报告

    CHAT回复 标题 大学生会计综合模拟实训报告 一 前言 随着信息化时代的发展 现代会计工作不再只依赖手动运算和记录 而是更加倚重电脑软件系统的配合运用 因此 对我们大学生来说 把握会计理论知识的同时 积极掌握相关的实践应用技能变得非常重要
  • 扬帆证券:产业化破题在即 人形机器人超预期演进

    大模型助力下的拐点 特斯拉A股产业链上 两笔重磅出资几乎一起现身 总规划超百亿元 1月4日 拓普集团公告 与宁波经济技能开发区办理委员会签署了 机器人电驱系统研发生产基地项目出资协议书 公司拟出资50亿元 建设机器人核心部件生产基地 此次出
  • 【VUE毕业设计】基于SSM的在线课堂学习设计与实现(含源码+论文)

    文章目录 1 项目简介 2 实现效果 2 1 界面展示 3 设计方案 3 1 概述 3 2 系统流程 3 3 系统结构设计 4 项目获取
  • 用CHAT写一份标题为职业教育教师教学能力提升培训总结

    CHAT回复 标题 职业教育教师教学能力提升培训总结 一 活动概述 本次由学校组织的职业教育教师教学能力提升培训于8月15日至8月20日顺利进行 来自全校的60位职业教育教师参与了此次培训 主讲人为享有盛名的教育专家马丁先生 二 培训内容与
  • 电商数据api拼多多接口获取商品实时数据价格比价api代码演示案例

    拼多多商品详情接口 接口接入入口 它的主要功能是允许卖家从自己的系统中快速获取商品详细信息 通过这个接口 卖家可以提取到商品的各类数据 包括但不限于商品标题 价格 优惠价 收藏数 下单人数 月销售量等 此外 还可以获取到商品的SKU图 详情
  • 跨平台UI自动化框架:Airtest,游戏开发和应用测试的利器

    2024软件测试面试刷题 这个小程序 永久刷题 靠它快速找到工作了 刷题APP的天花板 CSDN博客 文章浏览阅读2 3k次 点赞85次 收藏11次 你知不知道有这么一个软件测试面试的刷题小程序 里面包含了面试常问的软件测试基础题 web自
  • 渗透测试常用工具汇总_渗透测试实战

    1 Wireshark Wireshark 前称Ethereal 是一个网络分包分析软件 是世界上使用最多的网络协议分析器 Wireshark 兼容所有主要的操作系统 如 Windows Linux macOS 和 Solaris kali
  • 【计算机毕业设计】OA公文发文管理系统_xtv98

    近年来 人们的生活方式以网络为主题不断进化 OA公文发文管理就是其中的一部分 现在 无论是大型的还是小型的网站 都随处可见 不知不觉中已经成为我们生活中不可或缺的存在 随着社会的发展 除了对系统的需求外 我们还要促进经济发展 提高工作效率
  • 【GRNN-RBFNN-ILC算法】【轨迹跟踪】基于神经网络的迭代学习控制用于未知SISO非线性系统的轨迹跟踪(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 2 1 第1部分 2 2 第2部分
  • 两个月进口猛增10倍,买近百台光刻机,难怪ASML不舍中国市场

    据统计数据显示 2023年11月和12月 中国从荷兰进口的光刻机设备同比猛增10倍 进口金额超过19亿美元 让ASML赚得盆满钵满 ASML早前表示中国客户在2023年订购的光刻机全数交付 2023年11月中国进口的光刻机达到42台 进口金
  • 项目文章 | IF=8.4&转录因子Egr-1是脑膜炎型大肠杆菌引起的血脑屏障损伤的关键调节因子

    2024年1月17日华中农业大学动科动医学院陈焕春院士 王湘如教授团队在期刊 Cell Communication and Signaling IF 8 4 发表了题为 Egr 1 is a key regulator of the blo

随机推荐

  • Vue 将数组的字段取出组成key:value形式的对象+key:value 形式的对象组成数组

    1 将数组的字段取出组成key value形式的对象 let list key php value 1 key asp value 2 key aspx value 3 key jsp value 4 let header list for
  • jdk11生成jre

    1 在jdk11的解压目录中没有jre 2 执行 cmd D java jdk 11 0 17 3 执行 bin jlink exe module path jmods add modules java desktop output jre
  • Apollo代码学习(一)—控制模块概述

    Apollo代码学习 控制模块概述 补充 2018 11 08更新 2018 11 15更新 2018 11 20更新 前言 控制 纵向控制 标定表的生成 横向控制 控制信号 仿真 仿真平台及工具 Apollo 阿波罗 是一个开放的 完整的
  • spring 整合 mybatis 中数据源的几种配置总结

    spring 整合 mybatis 中数据源的几种配置方式 因为spring 整合mybatis的过程中 有好几种整合方式 尤其是数据源那块 经常看到不一样的配置方式 总感觉有点乱 所以今天有空总结下 一 采用org mybatis spr
  • 图数据库Neo4j在GIS系统的应用

    1 概述 1 1 图数据库简介 图数据库 Graph Database 是基于图论实现的一种新型的NoSQL数据库 他的数据存储结构和数据的查询方式都是以图论为基础的 图论中图的基本元素为节点和边 在图数据库中对应的就是节点和关系 在图数据
  • layui table is not a valid module

    table is not a valid module 后来发现 是因为layui是个库 引用的时候需要引用整个库 而不是只引用单个文件
  • 在MaxOSX系统上安装gdb并给gdb制作证书

    在MacOSX系统上是默认没有安装gdb的 这给MacOSX应用开发人员调试OSX应用程序带来一定困难 因此在MacOSX上安装gdb对应用程序进行调试是一个很好的选择 但是在MacOSX上安装gdb并不想在Linux上那么容易安装 他有一
  • GD32_时钟配置解析

    GD32 时钟配置解析 本文以GD32F303型号为基础 依照标准库GD32F30x Firmware Library V2 1 5为例 作为笔记简单记录个人对其时钟配置的理解 后续会持续更新本篇笔记内容 文章目录 GD32 时钟配置解析
  • Source insight 4.0 暗色主题,模仿Atom one-darkv配色方案

    我是在MAC OS 10 12下使用crossover安装的 在wine环境下装4 0有个无法解决的bug是toolbar非常的宽 所以我取消了 反正用快捷键可以代替 关于wine安装之后界面模糊的问题请参考我这个帖子http blog c
  • consul安装

    consul安装 1 什么是consul 2 安装过程 2 1 下载 2 2 解压 使用unzip命令解压 在linux上这个时候就代表已经安装成功了 2 3 启动consul 2 4 访问consul的后台管理页面 其端口默认是8500
  • 【PTA】A-B

    本题要求你计算A B 不过麻烦的是 A和B都是字符串 即从字符串A中把字符串B所包含的字符全删掉 剩下的字符组成的就是字符串A B 输入格式 输入在2行中先后给出字符串A和B 两字符串的长度都不超过104 并且保证每个字符串都是由可见的AS
  • spring工程的单元测试用例加载配置方法

    spring开发时可以使用基于java注解的配置 也可以使用基于xml配置文件的配置方法 大多数情况下在开发过程中需要编写单元测试用例针对不同的模块进行独立测试用于验证独立的功能 本文介绍在基于不同的配置项目中单元测试加载配置的方法 1 基
  • springboot 使用 logback 进行日志记录、并对文件日志以日期和大小进行拆分的 demo 示例

    1 描述 一个springboot项目使用 logback 记录程序运行过程中的日志 配置 logback 生成控制台日志和文件日志记录 以及对文件日志以日期和大小进行拆分的 demo示例 环境 IDE idea 2021 3 JDK 1
  • 电源纹波的测量

    测试手法 A 使用纹波探头 DC 50ohm 用弹簧地连接到地上 然后将探头点到测试电源上 示波器打开20MHz的带宽限制 打开余晖模式 将测试通道的输出电压值移到示波器屏幕中间 让让示波器采样1 3分钟 最后看示波器上的峰峰值即可 B 使
  • CCSpriteFrameCache的用法

    转自 https www cnblogs com pengyingh articles 2436648 html 让我们首先创建一个工程骨架 使用cocos2d工程模板创建一个新的项目并取名为AnimBear 接下来 下载一些由我的老婆制作
  • qt day 5

    1 gt 实现闹钟功能 pro QT core gui texttospeech greaterThan QT MAJOR VERSION 4 QT widgets CONFIG c 11 The following define make
  • C# USB通讯

    关注 星标公众号 及时获取更多技术分享 作者 冰茶奥利奥 微信公众号 嵌入式电子创客街 项目工程文件下载 工程文件下载地址 看了很多网上的博客 讲述如何用C 进行USB设备操作 很多都是不对的 以至于南辕北辙 我们可以使用usb库 在c下有
  • 打开方式中无法添加程序,无法用指定程序打开

    用T32打开 ts2文件时 右击 打开方式 中 浏览 到t32start exe的安装目录 点击 确定 可是在 打开方式 中找不到t32start exe程序 可能是因为注册表中t32start exe程序的路径指定错误 原因 解压安装包
  • 通过自定义 Vue 指令实现前端曝光埋点

    前言 互联网发展至今 数据的重要性已经不言而喻 尤其是在电商公司 数据的统计分析尤为重要 通过数据分析可以提升用户的购买体验 方便运营和产品调整销售策略等等 埋点就是网站分析的一种常用的数据采集方法 埋点按照获取数据的方式一般可以分为以下
  • flink学习day05:checkpoint 原理与实践

    flink checkpoint checkpointe是什么 基于state出发 flink基于与state可以做非常多复杂的事情 但是state是存储在内存中 内存中的数据是不安全的易丢失的 所以flink为了解决这个问题就引入了che