您的位置 首页 golang

万字长文深入解析Golang中的map设计(中)

Go Map实现

同python与java一样,Go语言中的map是也基于 哈希表 实现的,它解决哈希冲突的方式是链地址法,即通过使用数组+链表的数据结构来表达map。

注意:本文后续出现的map统一代指Go中实现的map类型。

map数据结构

map中的数据被存放于一个数组中的,数组的元素是桶(bucket),每个桶至多包含8个键值对数据。 哈希值低位(low-order bits)用于选择桶,哈希值高位(high-order bits)用于在一个独立的桶中区别出键。 哈希值高低位示意图如下

本文基于go 1.15.2 darwin/amd64分析,源码位于src/runtime/map.go.

  • map的结构体为hmap
 // A header for a Go map.
type hmap struct {
    count     int // 代表哈希表中的元素个数,调用len(map)时,返回的就是该字段值。
     flags      uint8 // 状态标志,下文常量中会解释四种状态位含义。
    B         uint8  // buckets(桶)的对数log_2(哈希表元素数量最大可达到装载因子*2^B)
    noverflow uint16 // 溢出桶的大概数量。
    hash0     uint32 // 哈希种子。

    buckets    unsafe.Pointer // 指向buckets数组的指针,数组大小为2^B,如果元素个数为0,它为nil。
    oldbuckets unsafe.Pointer // 如果发生扩容,oldbuckets是指向老的buckets数组的指针,老的buckets数组大小是新的buckets的1/2。非扩容状态下,它为nil。
    nevacuate  uintptr        // 表示扩容进度,小于此地址的buckets代表已搬迁完成。

    extra *mapextra // 这个字段是为了优化GC扫描而设计的。当key和value均不包含指针,并且都可以inline时使用。extra是指向mapextra类型的指针。  
  • mapextra的结构体
 // mapextra holds fields that are not present on all maps.
type mapextra struct {
    // 如果 key 和 value 都不包含指针,并且可以被 inline(<=128 字节)
    // 就使用 hmap的extra字段 来存储 overflow buckets,这样可以避免 GC 扫描整个 map
    // 然而 bmap.overflow 也是个指针。这时候我们只能把这些 overflow 的指针
    // 都放在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中了
    // overflow 包含的是 hmap.buckets 的 overflow 的 buckets
    // oldoverflow 包含扩容时的 hmap.oldbuckets 的 overflow 的 bucket
    overflow    *[]*bmap
    oldoverflow *[]*bmap

    // 指向空闲的 overflow bucket 的指针
    nextOverflow *bmap
}  
  • bmap结构体
 // A bucket for a Go map.
type bmap struct {
    // tophash包含此桶中每个键的哈希值最高字节(高8位)信息(也就是前面所述的high-order bits)。
    // 如果tophash[0] < minTopHash,tophash[0]则代表桶的搬迁(evacuation)状态。
    tophash [bucketCnt]uint8
}  

bmap也就是bucket(桶)的内存模型图解如下(相关代码逻辑可查看src/cmd/compile/internal/gc/reflect.go中的bmap()函数)。

在以上图解示例中,该桶的第7位cell和第8位cell还未有对应键值对。需要注意的是,key和value是各自存储起来的,并非想象中的key/value/key/value…的形式。这样做虽然会让代码组织稍显复杂,但是它的好处是能让我们消除例如map[int64]int所需要的填充(padding)。此外,在8个键值对数据后面有一个overflow指针,因为桶中最多只能装8个键值对,如果有多余的键值对落到了当前桶,那么就需要再构建一个桶(称为溢出桶),通过overflow指针链接起来。

  • 重要常量标志
 const (
    // 一个桶中最多能装载的键值对(key-value)的个数为8
    bucketCntBits = 3
    bucketCnt     = 1 << bucketCntBits

  // 触发扩容的装载因子为13/2=6.5
    loadFactorNum = 13
    loadFactorDen = 2

    // 键和值超过128个字节,就会被转换为指针
    maxKeySize  = 128
    maxElemSize = 128

    // 数据偏移量应该是bmap结构体的大小,它需要正确地对齐。
  // 对于amd64p32而言,这意味着:即使指针是32位的,也是64位对齐。
    dataOffset = unsafe.Offsetof(struct {
        b bmap
        v int64
    }{}.v)


  // 每个桶(如果有溢出,则包含它的overflow的链接桶)在搬迁完成状态(evacuated* states)下,要么会包含它所有的键值对,要么一个都不包含(但不包括调用evacuate()方法阶段,该方法调用只会在对map发起write时发生,在该阶段其他goroutine是无法查看该map的)。简单的说,桶里的数据要么一起搬走,要么一个都还未搬。
  // tophash除了放置正常的高8位hash值,还会存储一些特殊状态值(标志该cell的搬迁状态)。正常的tophash值,最小应该是5,以下列出的就是一些特殊状态值。
    emptyRest      = 0 // 表示cell为空,并且比它高索引位的cell或者overflows中的cell都是空的。(初始化bucket时,就是该状态)
    emptyOne       = 1 // 空的cell,cell已经被搬迁到新的bucket
    evacuatedX     = 2 // 键值对已经搬迁完毕,key在新buckets数组的前半部分
    evacuatedY     = 3 // 键值对已经搬迁完毕,key在新buckets数组的后半部分
    evacuatedEmpty = 4 // cell为空,整个bucket已经搬迁完毕
    minTopHash     = 5 // tophash的最小正常值

    // flags
    iterator     = 1 // 可能有迭代器在使用buckets
    oldIterator  = 2 // 可能有迭代器在使用oldbuckets
    hashWriting  = 4 // 有协程正在向map写人key
    sameSizeGrow = 8 // 等量扩容

    // 用于迭代器检查的bucket ID
    noCheck = 1<<(8*sys.PtrSize) - 1
)  

综上,我们以B等于4为例,展示一个完整的map结构图。

创建map

map初始化有以下两种方式

 make(map[k]v)
// 指定初始化map大小为hint
make(map[k]v, hint)  

对于不指定初始化大小,和初始化值hint<=8(bucketCnt)时,go会调用makemap_small函数(源码位置src/runtime/map.go),并直接从堆上进行分配。

 func makemap_small() *hmap {
    h := new(hmap)
    h.hash0 = fastrand()
    return h
}  

当hint>8时,则调用makemap函数

 // 如果编译器认为map和第一个bucket可以直接创建在栈上,h和bucket可能都是非空
// 如果h != nil,那么map可以直接在h中创建
// 如果h.buckets != nil,那么h指向的bucket可以作为map的第一个bucket使用
func makemap(t *maptype, hint int, h *hmap) *hmap {
  // math.MulUintptr返回hint与t.bucket.size的乘积,并判断该乘积是否溢出。
    mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
// maxAlloc的值,根据平台系统的差异而不同,具体计算方式参照src/runtime/malloc.go
    if overflow || mem > maxAlloc {
        hint = 0
    }

// initialize Hmap
    if h == nil {
        h = new(hmap)
    }
  // 通过fastrand得到哈希种子
    h.hash0 = fastrand()

    // 根据输入的元素个数hint,找到能装下这些元素的B值
    B := uint8(0)
    for overLoadFactor(hint, B) {
        B++
    }
    h.B = B

    // 分配初始哈希表
  // 如果B为0,那么buckets字段后续会在mapassign方法中lazily分配
    if h.B != 0 {
        var nextOverflow *bmap
    // makeBucketArray创建一个map的底层保存buckets的数组,它最少会分配h.B^2的大小。
        h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
        if nextOverflow != nil {
    h.extra = new(mapextra)
            h.extra.nextOverflow = nextOverflow
        }
    }

    return h
}  

分配buckets数组的makeBucketArray函数

 // makeBucket为map创建用于保存buckets的数组。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
    base := bucketShift(b)
    nbuckets := base
  // 对于小的b值(小于4),即桶的数量小于16时,使用溢出桶的可能性很小。对于此情况,就避免计算开销。
    if b >= 4 {
    // 当桶的数量大于等于16个时,正常情况下就会额外创建2^(b-4)个溢出桶
        nbuckets += bucketShift(b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }

 // 这里,dirtyalloc分两种情况。如果它为nil,则会分配一个新的底层数组。如果它不为nil,则它指向的是曾经分配过的底层数组,该底层数组是由之前同样的t和b参数通过makeBucketArray分配的,如果数组不为空,需要把该数组之前的数据清空并复用。
    if dirtyalloc == nil {
        buckets = newarray(t.bucket, int(nbuckets))
    } else {
        buckets = dirtyalloc
        size := t.bucket.size * nbuckets
        if t.bucket.ptrdata != 0 {
            memclrHasPointers(buckets, size)
        } else {
            memclrNoHeapPointers(buckets, size)
        }
}

  // 即b大于等于4的情况下,会预分配一些溢出桶。
  // 为了把跟踪这些溢出桶的开销降至最低,使用了以下约定:
  // 如果预分配的溢出桶的overflow指针为nil,那么可以通过指针碰撞(bumping the pointer)获得更多可用桶。
  // (关于指针碰撞:假设内存是绝对规整的,所有用过的内存都放在一边,空闲的内存放在另一边,中间放着一个指针作为分界点的指示器,那所分配内存就仅仅是把那个指针向空闲空间那边挪动一段与对象大小相等的距离,这种分配方式称为“指针碰撞”)
  // 对于最后一个溢出桶,需要一个安全的非nil指针指向它。
    if base != nbuckets {
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}  

根据上述代码,我们能确定在正常情况下,正常桶和溢出桶在内存中的存储空间是连续的,只是被 hmap 中的不同字段引用而已。

哈希函数

在初始化go程序运行环境时(src/runtime/proc.go中的schedinit),就需要通过alginit方法完成对哈希的初始化。

 func schedinit() {
    lockInit(&sched.lock, lockRankSched)

    ...

    tracebackinit()
    moduledataverify()
    stackinit()
    mallocinit()
    fastrandinit() // must run before mcommoninit
    mcommoninit(_g_.m, -1)
    cpuinit()       // must run before alginit
    // 这里调用alginit()
    alginit()       // maps must not be used before this call
    modulesinit()   // provides activeModules
    typelinksinit() // uses maps, activeModules
    itabsinit()     // uses activeModules

    ...

    goargs()
    goenvs()
    parsedebugvars()
    gcinit()

  ...
}  

对于哈希算法的选择,程序会根据当前架构判断是否支持AES,如果支持就使用AES hash,其实现代码位于src/runtime/asm_{386,amd64,arm64}.s中;若不支持,其hash算法则根据xxhash算法()和cityhash算法()启发而来,代码分别对应于32位(src/runtime/hash32.go)和64位机器(src/runtime/hash32.go)中,对这部分内容感兴趣的读者可以深入研究。

 func alginit() {
    // Install AES hash algorithms if the instructions needed are present.
    if (GOARCH == "386" || GOARCH == "amd64") &&
        cpu.X86.HasAES && // AESENC
        cpu.X86.HasSSSE3 && // PSHUFB
        cpu.X86.HasSSE41 { // PINSR{D,Q}
        initAlgAES()
        return
    }
    if GOARCH == "arm64" && cpu.ARM64.HasAES {
        initAlgAES()
        return
    }
    getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
    hashkey[0] |= 1 // make sure these numbers are odd
    hashkey[1] |= 1
    hashkey[2] |= 1
    hashkey[3] |= 1
}  

上文在创建map的时候,我们可以知道map的哈希种子是通过h.hash0 = fastrand()得到的。它是在以下maptype中的hasher中被使用到,在下文内容中会看到hash值的生成。

 type maptype struct {
    typ    _type
    key    *_type
    elem   *_type
    bucket *_type
  // hasher的第一个参数就是指向key的指针,h.hash0 = fastrand()得到的hash0,就是hasher方法的第二个参数。
  // hasher方法返回的就是hash值。
    hasher     func(unsafe.Pointer, uintptr) uintptr
    keysize    uint8  // size of key slot
    elemsize   uint8  // size of elem slot
    bucketsize uint16 // size of bucket
    flags      uint32
}  

map操作

假定key经过哈希计算后得到64bit位的哈希值。如果B=5,buckets数组的长度,即桶的数量是32(2的5次方)。

例如,现要置一key于map中,该key经过哈希后,得到的哈希值如下:

前面我们知道,哈希值低位(low-order bits)用于选择桶,哈希值高位(high-order bits)用于在一个独立的桶中区别出键。当B等于5时,那么我们选择的哈希值低位也是5位,即01010,它的十进制值为10,代表10号桶。再用哈希值的高8位,找到此key在桶中的位置。最开始桶中还没有key,那么新加入的key和value就会被放入第一个key空位和value空位。

注意:对于高低位的选择,该操作的实质是取余,但是取余开销很大,在实际代码实现中采用的是位操作。以下是tophash的实现代码。

 func tophash(hash uintptr) uint8 {
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }
    return top
}  

当两个不同的key落在了同一个桶中,这时就发生了哈希冲突。go的解决方式是链地址法(这里为了让读者更好理解,只描述非扩容且该key是第一次添加的情况):在桶中按照顺序寻到第一个空位并记录下来,后续在该桶和它的溢出桶中均未发现存在的该key,将key置于第一个空位;否则,去该桶的溢出桶中寻找空位,如果没有溢出桶,则添加溢出桶,并将其置溢出桶的第一个空位(因为是第一次添加,所以不描述已存在该key的情况)。

上图中的B值为5,所以桶的数量为32。通过哈希函数计算出待插入key的哈希值, 低5 位哈希00110,对应于6号桶;高8位10010111,十进制为151,由于桶中前6个cell已经有正常哈希值填充了(遍历),所以将151对应的高位哈希值放置于第7位cell,对应将key和value分别置于相应的第七个空位。

如果是查找key,那么我们会根据高位哈希值去桶中的每个cell中找,若在桶中没找到,并且overflow不为nil,那么继续去溢出桶中寻找,直至找到,如果所有的cell都找过了,还未找到,则返回key类型的默认值(例如key是int类型,则返回0)。

查找key

对于map的元素查找,其源码实现如下

 func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  // 如果开启了竞态检测 -race
    if raceenabled && h != nil {
        callerpc := getcallerpc()
        pc := funcPC(mapaccess1)
        racereadpc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
  // 如果开启了memory sanitizer -msan
    if msanenabled && h != nil {
        msanread(key, t.key.size)
    }
  // 如果map为空或者元素个数为0,返回零值
    if h == nil || h.count == 0 {
        if t.hashMightPanic() {
            t.hasher(key, 0) // see issue 23734
        }
        return unsafe.Pointer(&zeroVal[0])
    }
  // 注意,这里是按位与操作
  // 当h.flags对应的值为hashWriting(代表有其他goroutine正在往map中写key)时,那么位计算的结果不为0,因此抛出以下错误。
  // 这也表明,go的map是非并发安全的
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
  // 不同类型的key,会使用不同的hash算法,可详见src/runtime/alg.go中typehash函数中的逻辑
    hash := t.hasher(key, uintptr(h.hash0))
    m := bucketMask(h.B)
  // 按位与操作,找到对应的bucket
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
  // 如果oldbuckets不为空,那么证明map发生了扩容
  // 如果有扩容发生,老的buckets中的数据可能还未搬迁至新的buckets里
  // 所以需要先在老的buckets中找
    if c := h.oldbuckets; c != nil {
        if !h.sameSizeGrow() {
            m >>= 1
        }
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
    // 如果在oldbuckets中tophash[0]的值,为evacuatedX、evacuatedY,evacuatedEmpty其中之一
    // 则evacuated()返回为true,代表搬迁完成。
    // 因此,只有当搬迁未完成时,才会从此oldbucket中遍历
        if !evacuated(oldb) {
            b = oldb
        }
    }
  // 取出当前key值的tophash值
    top := tophash(hash)
  // 以下是查找的核心逻辑
  // 双重循环遍历:外层循环是从桶到溢出桶遍历;内层是桶中的cell遍历
  // 跳出循环的条件有三种:第一种是已经找到key值;第二种是当前桶再无溢出桶;
  // 第三种是当前桶中有cell位的tophash值是emptyRest,这个值在前面解释过,它代表此时的桶后面的cell还未利用,所以无需再继续遍历。
bucketloop:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
      // 判断tophash值是否相等
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
      }
      // 因为在bucket中key是用连续的存储空间存储的,因此可以通过bucket地址+数据偏移量(bmap结构体的大小)+ keysize的大小,得到k的地址
      // 同理,value的地址也是相似的计算方法,只是再要加上8个keysize的内存地址
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
      // 判断key是否相等
            if t.key.equal(key, k) {
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                if t.indirectelem() {
                    e = *((*unsafe.Pointer)(e))
                }
                return e
            }
        }
    }
  // 所有的bucket都未找到,则返回零值
    return unsafe.Pointer(&zeroVal[0])
}  

以下是mapaccess1的查找过程图解

map的元素查找,对应go代码有两种形式

     // 形式一
    v := m[k]
    // 形式二
    v, ok := m[k]  

形式一的代码实现,就是上述的mapaccess1方法。此外,在源码中还有个mapaccess2方法,它的函数签名如下。

 func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {}  

与mapaccess1相比,mapaccess2多了一个bool类型的返回值,它代表的是是否在map中找到了对应的key。因为和mapaccess1基本一致,所以详细代码就不再贴出。

同时,源码中还有mapaccessK方法,它的函数签名如下。

 func mapaccessK(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, unsafe.Pointer) {}  

与mapaccess1相比,mapaccessK同时返回了key和value,其代码逻辑也一致。

赋值key

对于写入key的逻辑,其源码实现如下

 func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  // 如果h是空指针,赋值会引起panic
  // 例如以下语句
  // var m map[string]int
    // m["k"] = 1
    if h == nil {
        panic(plainError("assignment to entry in nil map"))
    }
  // 如果开启了竞态检测 -race
    if raceenabled {
        callerpc := getcallerpc()
        pc := funcPC(mapassign)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
  // 如果开启了memory sanitizer -msan
    if msanenabled {
        msanread(key, t.key.size)
    }
  // 有其他goroutine正在往map中写key,会抛出以下错误
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }
  // 通过key和哈希种子,算出对应哈希值
    hash := t.hasher(key, uintptr(h.hash0))

  // 将flags的值与hashWriting做按位或运算
  // 因为在当前goroutine可能还未完成key的写入,再次调用t.hasher会发生panic。
    h.flags ^= hashWriting

    if h.buckets == nil {
        h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}

again:
  // bucketMask返回值是2的B次方减1
  // 因此,通过hash值与bucketMask返回值做按位与操作,返回的在buckets数组中的第几号桶
    bucket := hash & bucketMask(h.B)
  // 如果map正在搬迁(即h.oldbuckets != nil)中,则先进行搬迁工作。
    if h.growing() {
        growWork(t, h, bucket)
    }
  // 计算出上面求出的第几号bucket的内存位置
  // post = start + bucketNumber * bucketsize
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    top := tophash(hash)

    var inserti *uint8
    var insertk unsafe.Pointer
    var elem unsafe.Pointer
bucketloop:
    for {
    // 遍历桶中的8个cell
        for i := uintptr(0); i < bucketCnt; i++ {
      // 这里分两种情况,第一种情况是cell位的tophash值和当前tophash值不相等
      // 在 b.tophash[i] != top 的情况下
      // 理论上有可能会是一个空槽位
      // 一般情况下 map 的槽位分布是这样的,e 表示 empty:
      // [h0][h1][h2][h3][h4][e][e][e]
      // 但在执行过 delete 操作时,可能会变成这样:
      // [h0][h1][e][e][h5][e][e][e]
      // 所以如果再插入的话,会尽量往前面的位置插
      // [h0][h1][e][e][h5][e][e][e]
      //          ^
      //          ^
      //       这个位置
      // 所以在循环的时候还要顺便把前面的空位置先记下来
      // 因为有可能在后面会找到相等的key,也可能找不到相等的key
            if b.tophash[i] != top {
        // 如果cell位为空,那么就可以在对应位置进行插入
                if isEmpty(b.tophash[i]) && inserti == nil {
                    inserti = &b.tophash[i]
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                }
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
      // 第二种情况是cell位的tophash值和当前的tophash值相等
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
      // 注意,即使当前cell位的tophash值相等,不一定它对应的key也是相等的,所以还要做一个key值判断
            if !t.key.equal(key, k) {
                continue
            }
            // 如果已经有该key了,就更新它
            if t.needkeyupdate() {
                typedmemmove(t.key, k, key)
            }
      // 这里获取到了要插入key对应的value的内存地址
      // pos = start + dataOffset + 8*keysize + i*elemsize
            elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
      // 如果顺利到这,就直接跳到done的结束逻辑中去
            goto done
        }
    // 如果桶中的8个cell遍历完,还未找到对应的空cell或覆盖cell,那么就进入它的溢出桶中去遍历
        ovf := b.overflow(t)
    // 如果连溢出桶中都没有找到合适的cell,跳出循环。
        if ovf == nil {
            break
        }
        b = ovf
    }

    // 在已有的桶和溢出桶中都未找到合适的cell供key写入,那么有可能会触发以下两种情况
  // 情况一:
  // 判断当前map的装载因子是否达到设定的6.5阈值,或者当前map的溢出桶数量是否过多。如果存在这两种情况之一,则进行扩容操作。
  // hashGrow()实际并未完成扩容,对哈希表数据的搬迁(复制)操作是通过growWork()来完成的。
  // 重新跳入again逻辑,在进行完growWork()操作后,再次遍历新的桶。
    if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }

  // 情况二:
// 在不满足情况一的条件下,会为当前桶再新建溢出桶,并将tophash,key插入到新建溢出桶的对应内存的0号位置
    if inserti == nil {
        // all current buckets are full, allocate a new one.
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        elem = add(insertk, bucketCnt*uintptr(t.keysize))
    }

  // 在插入位置存入新的key和value
    if t.indirectkey() {
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectelem() {
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(elem) = vmem
    }
    typedmemmove(t.key, insertk, key)
    *inserti = top
  // map中的key数量+1
    h.count++

done:
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
    if t.indirectelem() {
        elem = *((*unsafe.Pointer)(elem))
    }
    return elem
}  

通过对mapassign的代码分析之后,发现该函数并没有将插入key对应的value写入对应的内存,而是返回了value应该插入的内存地址。为了弄清楚value写入内存的操作是发生在什么时候,分析如下map.go代码。

 package main

func main() {
    m := make(map[int]int)
    for i := 0; i < 100; i++ {
        m[i] = 666
    }
}  

m[i] = 666对应的汇编代码

 $ go tool compile -S map.go
...
        0x0098 00152 (map.go:6) LEAQ    type.map[int]int(SB), CX
        0x009f 00159 (map.go:6) MOVQ    CX, (SP)
        0x00a3 00163 (map.go:6) LEAQ    ""..autotmp_2+184(SP), DX
        0x00ab 00171 (map.go:6) MOVQ    DX, 8(SP)
        0x00b0 00176 (map.go:6) MOVQ    AX, 16(SP)
        0x00b5 00181 (map.go:6) CALL    runtime.mapassign_fast64(SB) // 调用函数runtime.mapassign_fast64,该函数实质就是mapassign(上文示例源代码是该mapassign系列的通用逻辑)
        0x00ba 00186 (map.go:6) MOVQ    24(SP), AX 24(SP), AX // 返回值,即 value 应该存放的内存地址
        0x00bf 00191 (map.go:6) MOVQ    $666, (AX) // 把 666 放入该地址中
...  

赋值的最后一步实际上是编译器额外生成的汇编指令来完成的,可见靠 runtime 有些工作是没有做完的。所以,在go中,编译器和 runtime 配合,才能完成一些复杂的工作。同时说明,在平时学习go的源代码实现时,必要时还需要看一些汇编代码。

删除key

理解了赋值key的逻辑,删除key的逻辑就比较简单了。本文就不再讨论该部分内容了,读者感兴趣可以自行查看src/runtime/map.go的mapdelete方法逻辑。

遍历map

结论:迭代 map 的结果是无序的

     m := make(map[int]int)
    for i := 0; i < 10; i++ {
        m[i] = i
    }
    for k, v := range m {
        fmt.Println(k, v)
    }  

运行以上代码,我们会发现每次输出顺序都是不同的。

map遍历的过程,是按序遍历bucket,同时按需遍历bucket中和其overflow bucket中的cell。但是map在扩容后,会发生key的搬迁,这造成原来落在一个bucket中的key,搬迁后,有可能会落到其他bucket中了,从这个角度看,遍历map的结果就不可能是按照原来的顺序了(详见下文的map扩容内容)。

但其实,go为了保证遍历map的结果是无序的,做了以下事情:map在遍历时,并不是从固定的0号bucket开始遍历的,每次遍历,都会从一个 随机值序号的bucket ,再从其中 随机的cell 开始遍历。然后再按照桶序遍历下去,直到回到起始桶结束。

上图的例子,是遍历一个处于未扩容状态的map。如果map正处于扩容状态时,需要先判断当前遍历bucket是否已经完成搬迁,如果数据还在老的bucket,那么就去老bucket中拿数据。

注意:在下文中会讲解到增量扩容和等量扩容。当发生了增量扩容时,一个老的bucket数据可能会分裂到两个不同的bucket中去,那么此时,如果需要从老的bucket中遍历数据,例如1号,则不能将老1号bucket中的数据全部取出,仅仅只能取出老 1 号 bucket 中那些在裂变之后,分配到新 1 号 bucket 中的那些 key(这个内容,请读者看完下文map扩容的讲解之后再回头理解)。

鉴于篇幅原因,本文不再对map遍历的详细源码进行注释贴出。读者可自行查看源码src/runtime/map.go的 mapiterinit() mapiternext() 方法逻辑。

这里注释一下 mapiterinit() 中随机保证的关键代码

 // 生成随机数
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
   r += uintptr(fastrand()) << 31
}
// 决定了从哪个随机的bucket开始
it.startBucket = r & bucketMask(h.B)
// 决定了每个bucket中随机的cell的位置
it.offset = uint8(r >> h.B & (bucketCnt - 1))  

文章来源:智云一二三科技

文章标题:万字长文深入解析Golang中的map设计(中)

文章地址:https://www.zhihuclub.com/97926.shtml

关于作者: 智云科技

热门文章

网站地图