关闭 x
IT技术网
    技 采 号
    ITJS.cn - 技术改变世界
    • 实用工具
    • 菜鸟教程
    IT采购网 中国存储网 科技号 CIO智库

    IT技术网

    IT采购网
    • 首页
    • 行业资讯
    • 系统运维
      • 操作系统
        • Windows
        • Linux
        • Mac OS
      • 数据库
        • MySQL
        • Oracle
        • SQL Server
      • 网站建设
    • 人工智能
    • 半导体芯片
    • 笔记本电脑
    • 智能手机
    • 智能汽车
    • 编程语言
    IT技术网 - ITJS.CN
    首页 » UI前端 »Go 语言的分布式读写互斥

    Go 语言的分布式读写互斥

    2015-05-05 00:00:00 出处:未来往事
    分享

    Go语言默认的 sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种 n-way RWMutex,也可以称为“大读者(big reader)”锁,它可以为每个 CPU 内核分配独立的 RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。

    查找当前 CPU

    读者使用 CPUID 指令来决定使用何种锁,该指令仅需返回当前活动 CPU 的 APICID,而不需要发出系统调用指令抑或改变运行时。这在 Intel 或 AMD 处理器上均是可以的;ARM 处理器则需要使用 CPU ID 寄存器 。 对于超过 256 个处理器的系统,必须使用 x2APIC, 另外除了 CPUID 还要用到带有EAX=0xb 的 EDX 寄存器。程序启动时,会构建(通过 CPU 亲和力系统调用) APICID 到 CPU 索引的映射, 该映射在处理器的整个生命周期中静态存在。由于 CPUID 指令的开销可能相当昂贵,goroutine 将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的 CPUID 指令时间增加。

    陈旧的 CPU 信息。假如加上锁运行 goroutine 的 CPU 信息可能会是过时的 (goroutine 会转移到另一个核心)。在 reader 记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。

    性能

    这个模式的性能特征会被大量的参数所影响。特别是 CPUID 检测频率,readers 的数量,readers 和 writers 的比率,还有 readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个 writer 活跃的时候,这个 writer 持有锁的时期不会影响 sync.RWMutex 和 DRWMutex 之间的性能差异。

    实验证明DRWMutex表现胜过多核系统,特别writer小于1%的时候,CPUID会在最多每10个锁之间被调用(这种变化取决于锁被持有的持续时间)。甚至在少核的情况下,DRWMutex也在普遍选择通过sync.Mutex使用sync.RWMutex的应用程序的情况下表现好过sync.RWMutex.

    下图显示核数量使用增加每10个的平均性能:

    drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100

    Go 语言的分布式读写互斥

    错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。

    查看go-nuts tread进一步讨论

    cpu_amd64.s

    #include "textflag.h"
    
    // func cpu() uint64
    TEXT 路cpu(SB),NOSPLIT,$0-8
    	MOVL	$0x01, AX // version information
    	MOVL	$0x00, BX // any leaf will do
    	MOVL	$0x00, CX // any subleaf will do
    
    	// call CPUID
    	BYTE $0x0f
    	BYTE $0xa2
    
    	SHRQ	$24, BX // logical cpu id is put in EBX[31-24]
    	MOVQ	BX, ret+0(FP)
    	RET

    main.go

    package main
    
    import (
    	"flag"
    	"fmt"
    	"math/rand"
    	"os"
    	"runtime"
    	"runtime/pprof"
    	"sync"
    	"syscall"
    	"time"
    	"unsafe"
    )
    
    func cpu() uint64 // implemented in cpu_amd64.s
    
    var cpus map[uint64]int
    
    // determine mapping from APIC ID to CPU index by pinning the entire process to
    // one core at the time, and seeing that its APIC ID is.
    func init() {
    	cpus = make(map[uint64]int)
    
    	var aff uint64
    	syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
    
    	n := 0
    	start := time.Now()
    	var mask uint64 = 1
    Outer:
    	for {
    		for (aff & mask) == 0 {
    			mask <<= 1
    			if mask == 0 || mask > aff {
    				break Outer
    			}
    		}
    
    		ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))
    		if ret != 0 {
    			panic(err.Error())
    		}
    
    		// what CPU do we have 
    		<-time.After(1 * time.Millisecond)
    		c := cpu()
    
    		if oldn, ok := cpus[c]; ok {
    			fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c)
    		}
    
    		cpus[c] = n
    		mask <<= 1
    		n++
    	}
    
    	fmt.Printf("%d/%d cpus found in %v: %v/n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus)
    
    	ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
    	if ret != 0 {
    		panic(err.Error())
    	}
    }
    
    type RWMutex2 []sync.RWMutex
    
    func (mx RWMutex2) Lock() {
    	for core := range mx {
    		mx[core].Lock()
    	}
    }
    
    func (mx RWMutex2) Unlock() {
    	for core := range mx {
    		mx[core].Unlock()
    	}
    }
    
    func main() {
    	cpuprofile := flag.Bool("cpuprofile", false, "enable CPU profiling")
    	locks := flag.Uint64("i", 10000, "Number of iterations to perform")
    	write := flag.Float64("p", 0.0001, "Probability of write locks")
    	wwork := flag.Int("w", 1, "Amount of work for each writer")
    	rwork := flag.Int("r", 100, "Amount of work for each reader")
    	readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers")
    	checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations")
    	flag.Parse()
    
    	var o *os.File
    	if *cpuprofile {
    		o, _ := os.Create("rw.out")
    		pprof.StartCPUProfile(o)
    	}
    
    	readers_per_core := *readers / runtime.GOMAXPROCS(0)
    
    	var wg sync.WaitGroup
    
    	var mx1 sync.RWMutex
    
    	start1 := time.Now()
    	for n := 0; n < runtime.GOMAXPROCS(0); n++ {
    		for r := 0; r < readers_per_core; r++ {
    			wg.Add(1)
    			go func() {
    				defer wg.Done()
    				r := rand.New(rand.NewSource(rand.Int63()))
    				for n := uint64(0); n < *locks; n++ {
    					if r.Float64() < *write {
    						mx1.Lock()
    						x := 0
    						for i := 0; i < *wwork; i++ {
    							x++
    						}
    						_ = x
    						mx1.Unlock()
    					} else {
    						mx1.RLock()
    						x := 0
    						for i := 0; i < *rwork; i++ {
    							x++
    						}
    						_ = x
    						mx1.RUnlock()
    					}
    				}
    			}()
    		}
    	}
    	wg.Wait()
    	end1 := time.Now()
    
    	t1 := end1.Sub(start1)
    	fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)
    
    	if *cpuprofile {
    		pprof.StopCPUProfile()
    		o.Close()
    
    		o, _ = os.Create("rw2.out")
    		pprof.StartCPUProfile(o)
    	}
    
    	mx2 := make(RWMutex2, len(cpus))
    
    	start2 := time.Now()
    	for n := 0; n < runtime.GOMAXPROCS(0); n++ {
    		for r := 0; r < readers_per_core; r++ {
    			wg.Add(1)
    			go func() {
    				defer wg.Done()
    				c := cpus[cpu()]
    				r := rand.New(rand.NewSource(rand.Int63()))
    				for n := uint64(0); n < *locks; n++ {
    					if *checkcpu != 0 && n%*checkcpu == 0 {
    						c = cpus[cpu()]
    					}
    
    					if r.Float64() < *write {
    						mx2.Lock()
    						x := 0
    						for i := 0; i < *wwork; i++ {
    							x++
    						}
    						_ = x
    						mx2.Unlock()
    					} else {
    						mx2[c].RLock()
    						x := 0
    						for i := 0; i < *rwork; i++ {
    							x++
    						}
    						_ = x
    						mx2[c].RUnlock()
    					}
    				}
    			}()
    		}
    	}
    	wg.Wait()
    	end2 := time.Now()
    
    	pprof.StopCPUProfile()
    	o.Close()
    
    	t2 := end2.Sub(start2)
    	fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)
    }
    上一篇返回首页 下一篇

    声明: 此文观点不代表本站立场;转载务必保留本文链接;版权疑问请联系我们。

    别人在看

    正版 Windows 11产品密钥怎么查找/查看?

    还有3个月,微软将停止 Windows 10 的更新

    Windows 10 终止支持后,企业为何要立即升级?

    Windows 10 将于 2025年10 月终止技术支持,建议迁移到 Windows 11

    Windows 12 发布推迟,微软正全力筹备Windows 11 25H2更新

    Linux 退出 mail的命令是什么

    Linux 提醒 No space left on device,但我的空间看起来还有不少空余呢

    hiberfil.sys文件可以删除吗?了解该文件并手把手教你删除C盘的hiberfil.sys文件

    Window 10和 Windows 11哪个好?答案是:看你自己的需求

    盗版软件成公司里的“隐形炸弹”?老板们的“法务噩梦” 有救了!

    IT头条

    公安部:我国在售汽车搭载的“智驾”系统都不具备“自动驾驶”功能

    02:03

    液冷服务器概念股走强,博汇、润泽等液冷概念股票大涨

    01:17

    亚太地区的 AI 驱动型医疗保健:2025 年及以后的下一步是什么?

    16:30

    智能手机市场风云:iPhone领跑销量榜,华为缺席引争议

    15:43

    大数据算法和“老师傅”经验叠加 智慧化收储粮食尽显“科技范”

    15:17

    技术热点

    商业智能成CIO优先关注点 技术落地方显成效(1)

    用linux安装MySQL时产生问题破解

    JAVA中关于Map的九大问题

    windows 7旗舰版无法使用远程登录如何开启telnet服务

    Android View 事件分发机制详解

    MySQL用户变量的用法

      友情链接:
    • IT采购网
    • 科技号
    • 中国存储网
    • 存储网
    • 半导体联盟
    • 医疗软件网
    • 软件中国
    • ITbrand
    • 采购中国
    • CIO智库
    • 考研题库
    • 法务网
    • AI工具网
    • 电子芯片网
    • 安全库
    • 隐私保护
    • 版权申明
    • 联系我们
    IT技术网 版权所有 © 2020-2025,京ICP备14047533号-20,Power by OK设计网

    在上方输入关键词后,回车键 开始搜索。Esc键 取消该搜索窗口。