无法在已停止的 SparkContext 上调用方法

2023-11-26

当我运行以下测试时,它会抛出“无法在已停止的 SparkContext 上调用方法”。可能的问题是我使用TestSuiteBase和流 Spark 上下文。在行val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)我需要使用SparkContext我通过访问ssc.sparkContext这就是我遇到问题的地方(请参阅下面的警告和错误消息)

class StreamingTest extends TestSuiteBase with BeforeAndAfter {

test("Test 1") {
//...
    val gridEvals = for (initialWeights <- gridParams("initialWeights");
                         stepSize <- gridParams("stepSize");
                         numIterations <- gridParams("numIterations")) yield {
      val lr = new StreamingLinearRegressionWithSGD()
        .setInitialWeights(initialWeights.asInstanceOf[Vector])
        .setStepSize(stepSize.asInstanceOf[Double])
        .setNumIterations(numIterations.asInstanceOf[Int])

      ssc = setupStreams(inputData, (inputDStream: DStream[LabeledPoint]) => {
        lr.trainOn(inputDStream)
        lr.predictOnValues(inputDStream.map(x => (x.label, x.features)))
      })

      val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
      val cvRMSE = calculateRMSE(output, nPoints)
      println(s"RMSE = $cvRMSE")
      (initialWeights, stepSize, numIterations, cvRMSE)

    }

     val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)

}

}

16/04/27 10:40:17 警告 StreamingContext:StreamingContext 已经 已停止 16/04/27 10:40:17 INFO SparkContext:SparkContext 已停止 停了下来。

无法在已停止的 SparkContext 上调用方法

UPDATE:

这是基类TestSuiteBase:

trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {

  // Name of the framework for Spark context
  def framework: String = this.getClass.getSimpleName

  // Master for Spark context
  def master: String = "local[2]"

  // Batch duration
  def batchDuration: Duration = Seconds(1)

  // Directory where the checkpoint data will be saved
  lazy val checkpointDir: String = {
    val dir = Utils.createTempDir()
    logDebug(s"checkpointDir: $dir")
    dir.toString
  }

  // Number of partitions of the input parallel collections created for testing
  def numInputPartitions: Int = 2

  // Maximum time to wait before the test times out
  def maxWaitTimeMillis: Int = 10000

  // Whether to use manual clock or not
  def useManualClock: Boolean = true

  // Whether to actually wait in real time before changing manual clock
  def actuallyWait: Boolean = false

  // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things.
  val conf = new SparkConf()
    .setMaster(master)
    .setAppName(framework)

  // Timeout for use in ScalaTest `eventually` blocks
  val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))

  // Default before function for any streaming test suite. Override this
  // if you want to add your stuff to "before" (i.e., don't call before { } )
  def beforeFunction() {
    if (useManualClock) {
      logInfo("Using manual clock")
      conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
    } else {
      logInfo("Using real clock")
      conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock")
    }
  }

  // Default after function for any streaming test suite. Override this
  // if you want to add your stuff to "after" (i.e., don't call after { } )
  def afterFunction() {
    System.clearProperty("spark.streaming.clock")
  }

  before(beforeFunction)
  after(afterFunction)

  /**
   * Run a block of code with the given StreamingContext and automatically
   * stop the context when the block completes or when an exception is thrown.
   */
  def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): R = {
    try {
      block(ssc)
    } finally {
      try {
        ssc.stop(stopSparkContext = true)
      } catch {
        case e: Exception =>
          logError("Error stopping StreamingContext", e)
      }
    }
  }

  /**
   * Run a block of code with the given TestServer and automatically
   * stop the server when the block completes or when an exception is thrown.
   */
  def withTestServer[R](testServer: TestServer)(block: TestServer => R): R = {
    try {
      block(testServer)
    } finally {
      try {
        testServer.stop()
      } catch {
        case e: Exception =>
          logError("Error stopping TestServer", e)
      }
    }
  }

  /**
   * Set up required DStreams to test the DStream operation using the two sequences
   * of input collections.
   */
  def setupStreams[U: ClassTag, V: ClassTag](
      input: Seq[Seq[U]],
      operation: DStream[U] => DStream[V],
      numPartitions: Int = numInputPartitions
    ): StreamingContext = {
    // Create StreamingContext
    val ssc = new StreamingContext(conf, batchDuration)
    if (checkpointDir != null) {
      ssc.checkpoint(checkpointDir)
    }

    // Setup the stream computation
    val inputStream = new TestInputStream(ssc, input, numPartitions)
    val operatedStream = operation(inputStream)
    val outputStream = new TestOutputStreamWithPartitions(operatedStream,
      new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
    outputStream.register()
    ssc
  }

  /**
   * Set up required DStreams to test the binary operation using the sequence
   * of input collections.
   */
  def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
      input1: Seq[Seq[U]],
      input2: Seq[Seq[V]],
      operation: (DStream[U], DStream[V]) => DStream[W]
    ): StreamingContext = {
    // Create StreamingContext
    val ssc = new StreamingContext(conf, batchDuration)
    if (checkpointDir != null) {
      ssc.checkpoint(checkpointDir)
    }

    // Setup the stream computation
    val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions)
    val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
    val operatedStream = operation(inputStream1, inputStream2)
    val outputStream = new TestOutputStreamWithPartitions(operatedStream,
      new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
    outputStream.register()
    ssc
  }

  /**
   * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
   * returns the collected output. It will wait until `numExpectedOutput` number of
   * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
   *
   * Returns a sequence of items for each RDD.
   */
  def runStreams[V: ClassTag](
      ssc: StreamingContext,
      numBatches: Int,
      numExpectedOutput: Int
    ): Seq[Seq[V]] = {
    // Flatten each RDD into a single Seq
    runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
  }

  /**
   * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
   * returns the collected output. It will wait until `numExpectedOutput` number of
   * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
   *
   * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
   * representing one partition.
   */
  def runStreamsWithPartitions[V: ClassTag](
      ssc: StreamingContext,
      numBatches: Int,
      numExpectedOutput: Int
    ): Seq[Seq[Seq[V]]] = {
    assert(numBatches > 0, "Number of batches to run stream computation is zero")
    assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
    logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)

    // Get the output buffer
    val outputStream = ssc.graph.getOutputStreams.
      filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]).
      head.asInstanceOf[TestOutputStreamWithPartitions[V]]
    val output = outputStream.output

    try {
      // Start computation
      ssc.start()

      // Advance manual clock
      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
      logInfo("Manual clock before advancing = " + clock.getTimeMillis())
      if (actuallyWait) {
        for (i <- 1 to numBatches) {
          logInfo("Actually waiting for " + batchDuration)
          clock.advance(batchDuration.milliseconds)
          Thread.sleep(batchDuration.milliseconds)
        }
      } else {
        clock.advance(numBatches * batchDuration.milliseconds)
      }
      logInfo("Manual clock after advancing = " + clock.getTimeMillis())

      // Wait until expected number of output items have been generated
      val startTime = System.currentTimeMillis()
      while (output.size < numExpectedOutput &&
        System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
        logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
        ssc.awaitTerminationOrTimeout(50)
      }
      val timeTaken = System.currentTimeMillis() - startTime
      logInfo("Output generated in " + timeTaken + " milliseconds")
      output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
      assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
      assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")

      Thread.sleep(100) // Give some time for the forgetting old RDDs to complete
    } finally {
      ssc.stop(stopSparkContext = true)
    }
    output
  }

  /**
   * Verify whether the output values after running a DStream operation
   * is same as the expected output values, by comparing the output
   * collections either as lists (order matters) or sets (order does not matter)
   */
  def verifyOutput[V: ClassTag](
      output: Seq[Seq[V]],
      expectedOutput: Seq[Seq[V]],
      useSet: Boolean
    ) {
    logInfo("--------------------------------")
    logInfo("output.size = " + output.size)
    logInfo("output")
    output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    logInfo("expected output.size = " + expectedOutput.size)
    logInfo("expected output")
    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    logInfo("--------------------------------")

    // Match the output with the expected output
    for (i <- 0 until output.size) {
      if (useSet) {
        assert(
          output(i).toSet === expectedOutput(i).toSet,
          s"Set comparison failed\n" +
            s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
            s"Generated output (${output.size} items): ${output.mkString("\n")}"
        )
      } else {
        assert(
          output(i).toList === expectedOutput(i).toList,
          s"Ordered list comparison failed\n" +
            s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
            s"Generated output (${output.size} items): ${output.mkString("\n")}"
        )
      }
    }
    logInfo("Output verified successfully")
  }

  /**
   * Test unary DStream operation with a list of inputs, with number of
   * batches to run same as the number of expected output values
   */
  def testOperation[U: ClassTag, V: ClassTag](
      input: Seq[Seq[U]],
      operation: DStream[U] => DStream[V],
      expectedOutput: Seq[Seq[V]],
      useSet: Boolean = false
    ) {
    testOperation[U, V](input, operation, expectedOutput, -1, useSet)
  }

  /**
   * Test unary DStream operation with a list of inputs
   * @param input      Sequence of input collections
   * @param operation  Binary DStream operation to be applied to the 2 inputs
   * @param expectedOutput Sequence of expected output collections
   * @param numBatches Number of batches to run the operation for
   * @param useSet     Compare the output values with the expected output values
   *                   as sets (order matters) or as lists (order does not matter)
   */
  def testOperation[U: ClassTag, V: ClassTag](
      input: Seq[Seq[U]],
      operation: DStream[U] => DStream[V],
      expectedOutput: Seq[Seq[V]],
      numBatches: Int,
      useSet: Boolean
    ) {
    val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
    withStreamingContext(setupStreams[U, V](input, operation)) { ssc =>
      val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
      verifyOutput[V](output, expectedOutput, useSet)
    }
  }

  /**
   * Test binary DStream operation with two lists of inputs, with number of
   * batches to run same as the number of expected output values
   */
  def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
      input1: Seq[Seq[U]],
      input2: Seq[Seq[V]],
      operation: (DStream[U], DStream[V]) => DStream[W],
      expectedOutput: Seq[Seq[W]],
      useSet: Boolean
    ) {
    testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
  }

  /**
   * Test binary DStream operation with two lists of inputs
   * @param input1     First sequence of input collections
   * @param input2     Second sequence of input collections
   * @param operation  Binary DStream operation to be applied to the 2 inputs
   * @param expectedOutput Sequence of expected output collections
   * @param numBatches Number of batches to run the operation for
   * @param useSet     Compare the output values with the expected output values
   *                   as sets (order matters) or as lists (order does not matter)
   */
  def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
      input1: Seq[Seq[U]],
      input2: Seq[Seq[V]],
      operation: (DStream[U], DStream[V]) => DStream[W],
      expectedOutput: Seq[Seq[W]],
      numBatches: Int,
      useSet: Boolean
    ) {
    val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
    withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc =>
      val output = runStreams[W](ssc, numBatches_, expectedOutput.size)
      verifyOutput[W](output, expectedOutput, useSet)
    }
  }
}

您应该检查以下几件事 -

  1. 验证您是否有在 Spark-config 中指定的可用资源

  2. 搜索一下stop()代码库中的关键字并检查它不应该出现在 SparkContext 上

  3. Spark 有 Spark-UI 组件,您可以在其中查看运行的作业(失败或成功)及其日志。这会告诉你为什么失败。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法在已停止的 SparkContext 上调用方法 的相关文章

随机推荐

  • 使用 dyn.load 在 R x64 中加载已编译的 C 代码时出现问题

    我最近从32位笔记本换成了64位台式机 都是win7 我刚刚发现使用加载 dll 时出现错误dyn load 我想这是一个简单的错误 我忽略了一些事情 例如 我编写了这个简单的 c 函数 foo c void foo int x x x 1
  • pandas 按两列分组并按平均值汇总

    我有一个像这样的数据框 df pd DataFrame df id 1 1 1 2 2 3 3 3 3 4 4 5 df view A B A A B A B A A B A B df value np random random 12 i
  • WebMethod 返回 JSON 格式的值

    如何将Webmethod的值以JSON格式返回给客户端 我想返回两个静态 int 值 我是否需要使用这两个属性创建新对象并返回它 GetStatus 方法被频繁调用 我不喜欢每次只是为了 json 格式创建一个特殊对象的想法 WebMeth
  • 为什么绝对定位元素显示在静态元素之上?

    我知道绝对定位会破坏正常流程 但由于 HTML 中的顺序是先是绝对元素 然后是静态元素 所以我希望它也能反映在显示顺序中 absolute position absolute width 100px height 100px backgro
  • 为什么Apple推荐使用dispatch_once来实现ARC下的单例模式?

    在ARC下单例的共享实例访问器中使用dispatch once的确切原因是什么 MyClass sharedInstance Static local predicate must be initialized to 0 static My
  • 在 AVAudioEngine 中从网络传输数据,可能吗?

    我有一个使用的应用程序AVAudioEngine用于从本地文件系统播放文件AVAudioPlayerNodes and AVAudioFiles 这工作得很好 现在我想让我的设置也支持从互联网上的服务器流式传输 MP3 文件 到目前为止我尝
  • 为什么安装VS 2017后命令行csc无法编译C#7?

    我刚刚下载了 Visual Studio 2017 RC 这是几天前发布并支持 C 7 我可以使用 IDE 中的 C 7 功能 然而 这种行为似乎不适用于命令行 我正在从事一个项目 需要csc用于处理 C 7 及更高版本的可执行文件 但是
  • 当我在 Windows 上克隆带有符号链接的存储库时会发生什么?

    关于在 Windows 上添加符号链接的支持存在很多问题 但是 当我克隆时实际发生了什么带有符号链接的存储库在 Windows 上 Since 版本1 5 3本机 Git 客户端git clone and git init将探测目标文件系统
  • 当程序崩溃且没有异常时如何调试程序?

    我的一个程序定期崩溃 但我不知道为什么 我在调试模式下运行它 但没有弹出任何内容 程序突然退出了 我遇到过其他确实引发异常的错误 但不是这个 有什么神奇的方法可以抓住它吗 假设您是从 Visual Studio 中运行它 并且由于某种原因
  • ADB 无法识别 Samsung Gear Live 或 LG G 手表

    我正在尝试执行adb在 Windows 7 上的 Samsung Gear Live 手表上 我似乎没有找到设备驱动程序来让它被 ADB 识别 这是我已经拥有的 我已经在 Gear Live 中启用了 ADB 调试 我转到设备管理器并将 g
  • 如何使用 log4net 记录 Threadpool 线程的正确上下文?

    我正在尝试找到一种方法来从一堆线程中记录有用的上下文 问题是 许多代码是在通过线程池线程到达的事件上处理的 据我所知 因此它们的名称与任何上下文都没有关系 该问题可以通过以下代码来演示 class Program private stati
  • Haxe - 打印命令行参数

    使用 Haxe 编程语言 是否可以打印传递给应用程序的命令行参数 我正在尝试在 Haxe 中重写这个 Java 程序 它只是打印命令行参数 public class JavaExample public static void main S
  • 在没有身份的情况下使用 Bearer/Jwt 授权

    我正在使用 Asp 5 开发 Web API 并阅读了一些有关 Web API 的文档 意识到我需要 Bearer 授权 经过查找 没有发现任何未经授权使用的文档或样本Aspnet Identity 我有自己的会员资格 但我不想使用Iden
  • 实现命令模式

    我正在设计一个应用程序 我想使用命令模式用于撤消 重做目的 我对命令模式做了一些研究 但我唯一不明白的是 命令是否应该具有撤消和重做方法 或者我应该创建两个单独的命令 一个用于撤消 一个用于重做 并从主命令本身 命令对象本身应该实现撤消 重
  • 单击 Facebook Like 按钮后显示内容

    我有一个网页部分 我只希望人们在单击 Facebook Like 按钮后能够访问该部分 如何隐藏该特定区域 然后仅在有人单击 赞 按钮后才显示它 至于 Facebook Like 代码 如下所示
  • 了解 PHP &(与号、按位与)运算符

    我经常使用 var 1 在我的代码中 如果 var是奇数 如果是偶数则为 false 但 实际上有什么作用呢 是二进制的and 如果你有一个二进制值 并且你and与另一个二进制值 则结果将是按位and两者之中 一个例子 01101010 0
  • 如何检索原始函数的形式?

    至少目前 这对我来说是一个学习练习 所以实际功能或其复杂性不是问题 假设我编写一个函数 其参数列表包含一些输入变量和函数名称 以字符串形式传递 然后 该函数在内部计算一些变量 并 决定 如何将它们提供给我传入的函数名称 对于非原始函数 我可
  • Chrome 扩展:webRequest.onBeforeSendHeaders 行为奇怪

    我正在尝试向 Chrome 扩展程序中的某些 AJAX 请求添加 Referer HTTP 标头 您无法直接在 AJAX 请求中更改它 因此我尝试使用网络请求 API chrome webRequest onBeforeSendHeader
  • Laravel Mail::send() 发送到多个收件人或密件抄送地址

    我似乎无法成功发送至multiple使用 Laravel 时的地址Mail send 回调 但是当我只指定时 代码确实有效one接受者 我尝试过链接 for example emails array email protected emai
  • 无法在已停止的 SparkContext 上调用方法

    当我运行以下测试时 它会抛出 无法在已停止的 SparkContext 上调用方法 可能的问题是我使用TestSuiteBase和流 Spark 上下文 在行val gridEvalsRDD ssc sparkContext paralle