利用质数随机遍历集合的一种实现

ivansli 2022/05/10 308℃ 0

有个随机遍历数据集合的需求,描述如下:
① 从集合的某个随机位置开始遍历
② 随机获取集合中剩余数据
③ 重复步骤②,直到遍历完整个集合

当看到这个需求时,不知道你是否会想到什么实现方式?

最近在追溯go底层源码时发现,在调度器从其他 P 中 steal 可用 goroutine 时利用质数的特性,采用了一种特殊算法随机遍历 allp 数组。觉得很是经典,特此记录一下。

例如:通过下标随机遍历数组中的 8 个元素,则最终遍历数组的下标可能是 [0,1,2,3,4,5,6,7] 中的任意一种随机排列。

遍历步骤:

① 计算小于等于 count (这里是8) 的所有质数,得到数组 [1,3,5,7]
② 从质数数组随机获取一个质数 inc,例如:5
③ 获取一个初识随机数 R,例如:16
④ (R+inc) % count = pos,pos 就是数组下标
⑤ 把 第④步 计算结果 pos 作为第三步中的 R,继续下一次循环,直到遍历完整个数组

按照遍历步骤,进行一轮计算,过程如下:

① R:16 inc:5 count:8    (16+5)%8 = pos:5 // 第一个R是单独获取的随机数,计算结果 pos 作为下一次 R 的输入
② R:5  inc:5 count:8    (5+5)%8  = pos:2
③ R:2  inc:5 count:8    (2+5)%8  = pos:7
④ R:7  inc:5 count:8    (7+5)%8  = pos:4
⑤ R:4  inc:5 count:8    (4+5)%8  = pos:1
⑥ R:1  inc:5 count:8    (1+5)%8  = pos:6
⑦ R:6  inc:5 count:8    (6+5)%8  = pos:3
⑧ R:3  inc:5 count:8    (3+5)%8  = pos:0

通过上面计算,最终获得的遍历下标为 [5,2,7,4,1,6,3,0]。需要注意的是:每一轮遍历,第一次传入的随机数 R 与选取的质数 inc 不同,会得到不同的遍历次序。

在 go 底层源码的 func findrunnable() (gp *g, inheritTime bool) 方法中可以看到实现逻辑,如下所示:

代码位置:src/runtime/proc.go

注意:>=go1.16 findrunnable()代码有所变动,可在 go1.12 ~ go1.15 版本中看到相关逻辑

func findrunnable() (gp *g, inheritTime bool) {
  // ...
  // omit some code

  // 没有获取到可用 goroutine 之前,整个过程会执行4次
    for i := 0; i < 4; i++ {
    // 开始遍历整个数组,这一层的 for 循环就是一次数组遍历
    // stealOrder 是提前准备好的一个数据结构
    // 其包含两个字段:1.待遍历的数组个数count  2.小于等于count的所有质数组成的数组
    // 
    // fastrand() 就是开始遍历数组时第一次需要的随机数 R
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
      // enum.position() 就是遍历数组时获得的数组下标
      // allp[enum.position()]则是数组的具体值
      //
      // runqsteal()含义:从 allp[enum.position()]找到的 P 的本地队列 steal 一批 goroutine 存放到 _p_ 
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }

  // ...
  // omit some code
}


var stealOrder randomOrder

//主要用来 存储所有小于等于数组个数 count 的质数数组 coprimes
type randomOrder struct {
    count    uint32    //将要遍历的数组的个数
    coprimes []uint32  //所有小于等于 count 的质数数组
}

//获取小于等于 count 的所有质数组成的数组
func (ord *randomOrder) reset(count uint32) {
    ord.count = count
    ord.coprimes = ord.coprimes[:0]

    for i := uint32(1); i <= count; i++ {
    //判断i是否是质数
        if gcd(i, count) == 1 {
            ord.coprimes = append(ord.coprimes, i)
        }
    }
}

// 判断 a 是否是质数
func gcd(a, b uint32) uint32 {
    for b != 0 {
        a, b = b, a%b
    }
    return a
}

//用于记录数组遍历时的一些信息
type randomEnum struct {
    i     uint32 // 已经遍历的数组个数
    count uint32 // 数组总个数
    pos   uint32 // 当前计算得到的数组下标
    inc   uint32 // 质数数组中选取的一个质数
}

//生成遍历数组时需要的信息
//i就是第一次需要的随机数
func (ord *randomOrder) start(i uint32) randomEnum {
    return randomEnum{
        count: ord.count, //待遍历数组个数
        pos:   i % ord.count,//通过随机数i,第一次获取的数组下标 pos
        inc:   ord.coprimes[i%uint32(len(ord.coprimes))], //选取一个质数
    }
}

//判断数组是否遍历完毕
func (enum *randomEnum) done() bool {
  //已经遍历的数组元素个数 等于 数组个数,则本轮遍历完毕
    return enum.i == enum.count
}

//每经过一次计算,会对已经遍历的数组元素个数i加一
func (enum *randomEnum) next() {
    enum.i++ //已经遍历的数组元素个数i加一
    enum.pos = (enum.pos + enum.inc) % enum.count //计算下一个遍历到的数组下标
}

//获取计算得到的数组下标
func (enum *randomEnum) position() uint32 {
    return enum.pos
}

这种实现方式被视为是一种公平的数据集合遍历实现方式,想必在平时的业务开发过程中有很多地方会用的到。

评论啦~