火花笛卡尔积

2023-12-04

我必须比较坐标才能获得距离。因此,我使用 sc.textFile() 加载数据并制作笛卡尔积。文本文件中大约有 2.000.000 行,因此需要比较 2.000.000 x 2.000.000 坐标。

我用大约 2000 个坐标测试了代码,几秒钟内就可以正常工作。但使用大文件似乎会在某个点停止,我不知道为什么。代码如下:

def concat(x,y):
    if(isinstance(y, list)&(isinstance(x,list))):
        return x + y
    if(isinstance(x,list)&isinstance(y,tuple)):
        return x + [y]
    if(isinstance(x,tuple)&isinstance(y,list)):
        return [x] + y
    else: return [x,y]

def haversian_dist(tuple):
    lat1 = float(tuple[0][0])
    lat2 = float(tuple[1][0])
    lon1 = float(tuple[0][2])
    lon2 = float(tuple[1][2])
    p = 0.017453292519943295
    a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
    print(tuple[0][1])
    return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))

def sort_val(tuple):
    dtype = [("globalid", int),("distance",float)]
    a = np.array(tuple[1], dtype=dtype)
    sorted_mins = np.sort(a, order="distance",kind="mergesort")
    return (tuple[0], sorted_mins)


def calc_matrix(sc, path, rangeval, savepath, name):
    data = sc.textFile(path)
    data = data.map(lambda x: x.split(";"))
    data = data.repartition(100).cache()
    data.collect()
    matrix = data.cartesian(data)
    values = matrix.map(haversian_dist)
    values = values.reduceByKey(concat)
    values = values.map(sort_val)
    values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
    values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
    dicti = values.collectAsMap()
    hp.save_pickle(dicti, savepath, name)

即使包含大约 15,000 个条目的文件也不起作用。我知道笛卡尔导致 O(n^2) 运行时间。但 Spark 不应该处理这个问题吗?或者有什么问题吗?唯一的起点是错误消息,但我不知道它是否与实际问题相关:

16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes)
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

你用过data.collect()在您的代码中,基本上将所有数据调用到一台机器中。根据该机器上的内存,2,000,000 行数据可能不太适合。

另外,我尝试通过连接而不是使用来减少要完成的计算数量cartesian。 (请注意,我只是使用 numpy 生成随机数,这里的格式可能与您的格式不同。不过,主要思想是相同的。)

import numpy as np
from numpy import arcsin, cos, sqrt

# suppose my data consists of latlong pairs
# we will use the indices for pairing up values
data = sc.parallelize(np.random.rand(10,2)).zipWithIndex()
data = data.map(lambda (val, idx): (idx, val))

# generate pairs (e.g. if i have 3 pairs with indices [0,1,2],
# I only have to compute for distances of pairs (0,1), (0,2) & (1,2)
idxs = range(data.count())
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j])

# haversian func (i took the liberty of editing some parts of it)
def haversian_dist(latlong1, latlong2):
    lat1, lon1 = latlong1
    lat2, lon2 = latlong2
    p = 0.017453292519943295
    def hav(theta): return (1 - cos(p * theta))/2
    a = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1)
    return 12742 * arcsin(sqrt(a))

joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val)))
joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2))
haversianRDD = joined2.mapValues(lambda (x, y): haversian_dist(x, y))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

火花笛卡尔积 的相关文章

  • Python - 比较同一字典中的值

    我有一本字典 d Trump MAGA FollowTheMoney Clinton dems Clinton Stein FollowTheMoney Atlanta 我想删除字符串列表中的重复字符串 该字符串是键的值 对于这个例子 期望
  • pandas DataFrame.join 的运行时间是多少(大“O”顺序)?

    这个问题更具概念性 理论性 与非常大的数据集的运行时间有关 所以我很抱歉没有一个最小的例子来展示 我有一堆来自两个不同传感器的数据帧 我需要最终将它们连接成两个very来自两个不同传感器的大数据帧 df snsr1 and df snsr2
  • 我应该使用 Python 双端队列还是列表作为堆栈? [复制]

    这个问题在这里已经有答案了 我想要一个可以用作堆栈的 Python 对象 使用双端队列还是列表更好 元素数量较少还是数量较多有什么区别 您的情况可能会根据您的应用程序和具体用例而有所不同 但在一般情况下 列表非常适合堆栈 append is
  • 使用主题交换运行多个 Celery 任务

    我正在用 Celery 替换一些自制代码 但很难复制当前的行为 我期望的行为如下 创建新用户时 应向tasks与交换user created路由键 该消息应该触发两个 Celery 任务 即send user activate email
  • 从Django中具有外键关系的两个表中检索数据? [复制]

    这个问题在这里已经有答案了 This is my models py file from django db import models class Author models Model first name models CharFie
  • 更好地相当于这个疯狂的嵌套 python for 循环

    for a in map for b in map a for c in map b for d in map c for e in map d print a b c d e 上面的代码用于创建图中一定长度的所有路径 map a 表示从
  • MongoEngine 查询具有以列表中指定的前缀开头的属性的对象的列表

    我需要在 Mongo 数据库中查询具有以列表中任何前缀开头的特定属性的元素 现在我有一段这样的代码 query mymodel terms term in query terms 并且这会匹配在列表 term 上有一个项目的对象 该列表中的
  • Tensorboard SyntaxError:语法无效

    当我尝试制作张量板时 出现语法错误 尽管开源代码我还是无法理解 我尝试搜索张量板的代码 但不清楚 即使我不擅长Python 我这样写路径C Users jh902 Documents logs因为我正在使用 Windows 10 但我不确定
  • GUI(输入和输出矩阵)?

    我需要创建一个 GUI 将数据输入到矩阵或表格中并读取此表单数据 完美的解决方案是限制输入表单仅允许float 例如 A 1 02 0 25 0 30 0 515 0 41 1 13 0 15 1 555 0 25 0 14 1 21 2
  • 从 Powershell 脚本安装 Python

    当以管理员身份从 PowerShell 命令行运行以下命令时 可以在 Windows 11 上成功安装 Python c temp python 3 11 4 amd64 exe quiet InstallAllUsers 0 Instal
  • 使用 Python Oauthlib 通过服务帐户验证 Google API

    我不想使用适用于 Python 的 Google API 客户端库 但仍想使用 Python 访问 Google APIOauthlib https github com idan oauthlib 创建服务帐户后谷歌开发者控制台 http
  • python的shutil.move()在linux上是原子的吗?

    我想知道python的shutil move在linux上是否是原子的 如果源文件和目标文件位于两个不同的分区上 行为是否不同 或者与它们存在于同一分区上时的行为相同吗 我更关心的是如果源文件和目标文件位于同一分区上 shutil move
  • 将 Matlab 的 datenum 格式转换为 Python

    我刚刚开始从 Matlab 迁移到 Python 2 7 在读取 mat 文件时遇到一些问题 时间信息以 Matlab 的日期数字格式存储 对于那些不熟悉它的人 日期序列号将日历日期表示为自固定基准日期以来已经过去的天数 在 MATLAB
  • 如何使用 Python 3 检查目录是否包含文件

    我到处寻找这个答案但找不到 我正在尝试编写一个脚本来搜索特定的子文件夹 然后检查它是否包含任何文件 如果包含 则写出该文件夹的路径 我已经弄清楚了子文件夹搜索部分 但检查文件却难倒了我 我发现了有关如何检查文件夹是否为空的多个建议 并且我尝
  • 找到一个数字所属的一组范围

    我有一个 200k 行的数字范围列表 例如开始位置 停止位置 该列表包括除了非重叠的重叠之外的所有类型的重叠 列表看起来像这样 3 5 10 30 15 25 5 15 25 35 我需要找到给定数字所属的范围 并对 100k 个数字重复该
  • 带有 LSTM 的 GridSearchCV/RandomizedSearchCV

    我一直在尝试通过 RandomizedSearchCV 调整 LSTM 的超参数 我的代码如下 X train X train reshape X train shape 0 1 X train shape 1 X test X test
  • 如何使用 AWS Lambda Python 读取 AWS S3 存储的 Word 文档(.doc 和 .docx)文件内容?

    我的场景是 我尝试使用 python 实现从 Aws Lambda 读取 AWS 存储的 S3 word 文档 doc 和 docx 文件内容 下面的代码是我使用的 我的问题是我可以获取文件名 但无法读取内容 def lambda hand
  • 将索引与值交换的最快方法

    考虑pd Series s s pd Series list abcdefghij list ABCDEFGHIJ s A a B b C c D d E e F f G g H h I i J j dtype object 交换索引和值并
  • 如何将Python3设置为Mac上的默认Python版本?

    有没有办法将 Python 3 8 3 设置为 macOS Catalina 版本 10 15 2 上的默认 Python 版本 我已经完成的步骤 看看它安装在哪里 ls l usr local bin python 我得到的输出是这样的
  • JSON:TypeError:Decimal('34.3')不是JSON可序列化的[重复]

    这个问题在这里已经有答案了 我正在运行一个 SQL 查询 它返回一个小数列表 当我尝试将其转换为 JSON 时 出现类型错误 查询 res db execute SELECT CAST SUM r SalesVolume 1000 0 AS

随机推荐

  • fpdf“UnicodeEncodeError:'latin-1'编解码器无法对位置 88 中的字符 '\u2013' 进行编码:序数不在范围内(256)”

    我正在尝试在 Python 中将文本文件转换为 pdf 但出现错误 为什么会发生这种情况以及如何解决 这是我的代码 import fpdf from fpdf import FPDF pdf FPDF pdf add page pdf se
  • PHP:将本地时间转换为 UTC

    假设我得到一个像这样的字符串08 22 2015 10 56 PM并且该日期 时间字符串始终仅指一个特定时区 我需要能够将其转换为这种格式 Ymd THis Z 这是 iCal 格式 如何将该字符串转换为祖鲁时间并转换为 Ymd THis
  • 如何在 Xamarin iOS 上执行简单的后台任务

    在我们的应用程序中 用户可以跟踪并提交他们记录的旅程 我需要一种在 iOS 中创建任务的简单方法 我已经在 Android 上创建并测试了它 它的工作原理是 用户选择他们想要提交的旅程 点击同步并创建一个前台服务 将旅程同步到我们的 API
  • 用于仅插入/仅查询应用程序的 ORM 框架

    我已经使用 Hibernate 多年了 从来没有遇到过任何问题 但我刚刚意识到我的大部分工作都涉及 CRUD 方法 其中我需要数据保持持久化并随意修改 这样做的问题是 有人想要制作 2 个独立的应用程序 一个用于批量插入 另一个对插入的数据
  • 格式化斯坦福 Corenlp 的 NER 输出

    我正在与斯坦福 CoreNLP 合作并将其用于 NER 但是当我提取组织名称时 我看到每个单词都标有注释 因此 如果实体是 NEW YORK TIMES 那么它会被记录为三个不同的实体 NEW YORK 和 TIMES 我们是否可以在斯坦福
  • 重用PreparedStatement

    我在我们的代码库上运行了 findbugs 它指出还有两个语句仍然需要关闭 在这部分代码中 我们运行 preparedStatement connection prepareStatement query 对于3个不同的查询 重用prepa
  • 如何使用 Greasemonkey 脚本通过 XSLT 转换 XML 文件?

    我有一个搜索服务器 它提供一个测试页面 我可以在其中输入查询并以 XML 形式返回结果 我希望能够以更加用户友好的方式浏览结果 因此我开始使用 XSLT 现在我有了一个简单的样式表 可以将不知何故臃肿的 XML 转换为仅显示部分数据的简单表
  • 仅获取白色屏幕截图

    我可以读取条形码 但无法获取屏幕快照 getScreenImage 函数获取白屏 如何获取屏幕截图 包括我看到的相机视图的屏幕 谢谢 interface igViewController
  • 处理器如何读取内存?

    我正在尝试重新实现 malloc 我需要了解对齐的目的 据我了解 如果内存对齐 代码将执行得更快 因为处理器不必采取额外的步骤来恢复被剪切的内存位 我想我明白 64 位处理器读取 64 位乘 64 位内存 现在 让我们想象一下我有一个按顺序
  • 使用 BitBlt 进行的屏幕截图会在 Windows 10 上显示黑色图像

    我正在使用下面的代码来捕获当前活动窗口的屏幕截图 这段代码来自捕获屏幕截图 包括 NET 中的半透明窗口 有一些小的添加 即它使用 GetForegroundWindow 和一个计时器 以便我可以选择所需的窗口 在 Windows 10 x
  • 在 Java 8 流中捕获 UncheckedIOException

    编辑 这似乎不可能 请参阅https bugs openjdk java net browse JDK 8039910 我有一个帮助类 它提供了Stream
  • 类型错误:“datetime.date”对象没有属性“__getitem__”

    我在我的 models py 中使用 class Pedido models Model data pedido models DateField Data do pedido cliente models ForeignKey Clien
  • 谷歌地理编码不适用于数据库中带有特殊字符的地址

    我的谷歌地理编码数据库中的地址特殊字符有问题 但如果我对它们进行硬编码则不会 简单的地理编码代码 url http maps googleapis com maps api geocode json address address sens
  • TabControl 处理非活动选项卡上的控件

    我正在为我的应用程序使用 MVVM 模式 主窗口包括一个TabControl与DataContext映射到 ViewModel
  • 如何将 Lua 模块作为字符串而不是文件加载?

    我正在使用 LuaJava 和 Lua 的 C 代码 我想做的是读取在Android应用程序中存储为资源字符串的Lua源代码 以便可以执行读入的Lua源代码 我需要知道如何使用 LuaJava 或 C 语言来做到这一点 我想知道如何使用字符
  • Compact Framework 中的 MAC 地址

    如何仅使用紧凑框架获取 MAC 地址 1 4 的 OpenNETCF 代码从以下 P Invoke 调用中获取信息 DllImport iphlpapi dll SetLastError true public static extern
  • NgAnimate 页面加载 hack

    在更新 1 4 1 中 AngularJs Animate 不再像以前那样在页面加载时触发 我的旧解决方案类似对此 笨蛋 found here并一直工作到 v1 3 9
  • CSS 字体 Unicode 范围

    font face font family Nanum Barun Gothic src url NanumBarunGothic ttf unicode range U AC00 D7A3 U 1100 11FF U 3130 318F
  • 将新的拟合阶段添加到现有 PipelineModel 中,无需再次拟合

    我想将几个经过训练的管道连接到一个 这类似于 Spark 将新的拟合阶段添加到现有 PipelineModel 中 无需再次拟合 但是下面的解决方案适用于 PySpark gt pipe model new PipelineModel st
  • 火花笛卡尔积

    我必须比较坐标才能获得距离 因此 我使用 sc textFile 加载数据并制作笛卡尔积 文本文件中大约有 2 000 000 行 因此需要比较 2 000 000 x 2 000 000 坐标 我用大约 2000 个坐标测试了代码 几秒钟