在一般情况下,静态分箱可以使用org.apache.spark.ml.feature.Bucketizer:
val df = Seq(
(1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0),
(6, 111.0), (7, 107.0), (8, 84.0), (9, 91.0), (10, 118.0)
).toDF("id", "value")
val splits = (0 to 12).map(_ * 10.0).toArray
import org.apache.spark.ml.feature.Bucketizer
val bucketizer = new Bucketizer()
.setInputCol("value")
.setOutputCol("bucket")
.setSplits(splits)
val bucketed = bucketizer.transform(df)
val solution = bucketed.groupBy($"bucket").agg(count($"id") as "count")
Result:
scala> solution.show
+------+-----+
|bucket|count|
+------+-----+
| 8.0| 2|
| 11.0| 4|
| 10.0| 2|
| 6.0| 1|
| 9.0| 1|
+------+-----+
当值位于定义的 bin 之外时,bucketizer 会抛出错误。可以将分割点定义为Double.NegativeInfinity
or Double.PositiveInfinity
捕获异常值。
Bucketizer
旨在通过对正确的存储桶执行二分搜索来有效地处理任意分割。对于像您这样的常规垃圾箱,人们可以简单地执行以下操作:
val binned = df.withColumn("bucket", (($"value" - bin_min) / bin_width) cast "int")
where bin_min
and bin_width
分别是最小 bin 的左间隔和 bin 宽度。