PySpark 中的随机数生成

您所在的位置:网站首页 利用函数随机产生10个数 PySpark 中的随机数生成

PySpark 中的随机数生成

2023-03-24 10:28| 来源: 网络整理| 查看: 265

回答问题

让我们从一个总是返回随机整数的简单函数开始:

import numpy as np def f(x): return np.random.randint(1000)

和一个用零填充并使用f映射的 RDD:

rdd = sc.parallelize([0] * 10).map(f)

由于上述 RDD 没有持久化,我希望每次收集时都会得到不同的输出:

> rdd.collect() [255, 512, 512, 512, 255, 512, 255, 512, 512, 255]

如果我们忽略值的分布看起来并不真正随机的事实,那么或多或少会发生这种情况。当我们只取第一个元素时,问题就开始了:

assert len(set(rdd.first() for _ in xrange(100))) == 1

或者

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1

似乎每次都返回相同的数字。我已经能够使用 Spark 1.2、1.3 和 1.4 在两台不同的机器上重现这种行为。这里我使用的是np.random.randint,但它与random.randint的行为方式相同。

这个问题与collect的非完全随机结果相同,似乎是 Python 特有的,我无法使用 Scala 重现它:

def f(x: Int) = scala.util.Random.nextInt(1000) val rdd = sc.parallelize(List.fill(10)(0)).map(f) (1 to 100).map(x => rdd.first).toSet.size rdd.collect()

我在这里错过了什么明显的东西吗?

编辑:

原来问题的根源是 Python RNG 实现。引用官方文档:

此模块提供的函数实际上是 random.Random 类的隐藏实例的绑定方法。您可以实例化您自己的 Random 实例以获取不共享状态的生成器。

我假设 NumPy 的工作方式相同,并使用RandomState实例重写f如下

import os import binascii def f(x, seed=None): seed = ( seed if seed is not None else int(binascii.hexlify(os.urandom(4)), 16)) rs = np.random.RandomState(seed) return rs.randint(1000)

使它变慢但解决了问题。

虽然上面解释了收集的不是随机结果,但我仍然不明白它如何影响first/take(1)在多个操作之间。

Answers

所以这里的实际问题比较简单。 Python 中的每个子进程都从其父进程继承其状态:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) # 1

由于在这种特定情况下父状态没有理由改变并且工人的寿命有限,因此每个孩子的状态在每次运行时都将完全相同。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3