PySpark 中的随机数生成 |
您所在的位置:网站首页 › 利用函数随机产生10个数 › PySpark 中的随机数生成 |
回答问题
让我们从一个总是返回随机整数的简单函数开始: import numpy as np def f(x): return np.random.randint(1000)和一个用零填充并使用f映射的 RDD: rdd = sc.parallelize([0] * 10).map(f)由于上述 RDD 没有持久化,我希望每次收集时都会得到不同的输出: > rdd.collect() [255, 512, 512, 512, 255, 512, 255, 512, 512, 255]如果我们忽略值的分布看起来并不真正随机的事实,那么或多或少会发生这种情况。当我们只取第一个元素时,问题就开始了: assert len(set(rdd.first() for _ in xrange(100))) == 1或者 assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1似乎每次都返回相同的数字。我已经能够使用 Spark 1.2、1.3 和 1.4 在两台不同的机器上重现这种行为。这里我使用的是np.random.randint,但它与random.randint的行为方式相同。 这个问题与collect的非完全随机结果相同,似乎是 Python 特有的,我无法使用 Scala 重现它: def f(x: Int) = scala.util.Random.nextInt(1000) val rdd = sc.parallelize(List.fill(10)(0)).map(f) (1 to 100).map(x => rdd.first).toSet.size rdd.collect()我在这里错过了什么明显的东西吗? 编辑: 原来问题的根源是 Python RNG 实现。引用官方文档: 此模块提供的函数实际上是 random.Random 类的隐藏实例的绑定方法。您可以实例化您自己的 Random 实例以获取不共享状态的生成器。 我假设 NumPy 的工作方式相同,并使用RandomState实例重写f如下 import os import binascii def f(x, seed=None): seed = ( seed if seed is not None else int(binascii.hexlify(os.urandom(4)), 16)) rs = np.random.RandomState(seed) return rs.randint(1000)使它变慢但解决了问题。 虽然上面解释了收集的不是随机结果,但我仍然不明白它如何影响first/take(1)在多个操作之间。 Answers所以这里的实际问题比较简单。 Python 中的每个子进程都从其父进程继承其状态: len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) # 1由于在这种特定情况下父状态没有理由改变并且工人的寿命有限,因此每个孩子的状态在每次运行时都将完全相同。 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |