Spark中用于对称运算的笛卡尔上三角:`x*(x+1)//2`而不是`x**2`

2024-01-08

我需要计算 Spark 中列表项的成对对称分数。 IE。score(x[i],x[j]) = score(x[j], x[i])。一种解决方案是使用x.cartesian(x)。然而它会执行x**2操作而不是最低限度的必要操作x*(x+1)//2.

Spark 中解决此问题最有效的方法是什么?

附言。在纯 Python 中,我会使用迭代器,例如:

class uptrsq_range(object):

    def __init__(self, n):

        self._n_ = n
        self._length = n*(n+1) // 2

    def __iter__(self):
        for ii in range(self._n_):
            for jj in range(ii+1):
                yield (ii,jj)

    def __len__(self):
        """
        recepe by sleblanc @ stackoverflow
        """
        "This method returns the total number of elements"
        if self._length:
            return self._length
        else:
            raise NotImplementedError("Infinite sequence has no length")
            # or simply return None / 0 depending
            # on implementation

for i,j in uptrsq_range(len(x)):
    score(x[i], x[j])

最通用的方法是遵循cartesian with filter。例如:

rdd = sc.parallelize(range(10))

pairs = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])
pairs.count()

## 45

如果RDD比较小,你可以收集、广播和flatMap:

xs = sc.broadcast(rdd.collect())
pairs = rdd.flatMap(lambda y: [(x, y) for x in xs.value if x < y])
pairs.count()

## 45

如果可以在内部进一步过滤数据,这尤其有用flatMap以减少产生值的数量。

如果数据太大而无法收集/存储在内存中,但可以轻松计算(例如一系列数字)或可以从工作人员(本地可访问的数据库)有效访问,您可以flatMap如上所述或使用mapPartitions例如这样:

def some_function(iter):
    import sqlite3
    conn = sqlite3.connect('example.db')
    c = conn.cursor()
    query = ...  

    for x in iter:
        # fetch some data from a database
        c.execute(query, (x, ))
        for y in c.fetchall():
            yield (x, y)

rdd.mapPartitions(some_function)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark中用于对称运算的笛卡尔上三角:`x*(x+1)//2`而不是`x**2` 的相关文章

随机推荐