我想用类似 SQL 的方法过滤 Pyspark DataFrameIN
子句,如
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
where a
是元组(1, 2, 3)
。我收到此错误:
java.lang.RuntimeException: [1.67] 失败: ``('' 预期但找到了标识符
这基本上是说它期待类似的事情'(1,2,3)'代替。
问题是我无法手动写入 a 中的值,因为它是从另一个作业中提取的。
在这种情况下我将如何过滤?
您传递给的字符串SQLContext
它在 SQL 环境的范围内进行评估。它没有捕获闭包。如果您想传递变量,则必须使用字符串格式显式执行此操作:
df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
## 2
显然,出于安全考虑,这不是您在“真实”SQL 环境中使用的内容,但在这里应该不重要。
在实践中DataFrame
当您想要创建动态查询时,DSL 是更好的选择:
from pyspark.sql.functions import col
df.where(col("v").isin({"foo", "bar"})).count()
## 2
它很容易构建和编写,并为您处理 HiveQL / Spark SQL 的所有细节。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)