关闭 x
IT技术网
    技 采 号
    ITJS.cn - 技术改变世界
    • 实用工具
    • 菜鸟教程
    IT采购网 中国存储网 科技号 CIO智库

    IT技术网

    IT采购网
    • 首页
    • 行业资讯
    • 系统运维
      • 操作系统
        • Windows
        • Linux
        • Mac OS
      • 数据库
        • MySQL
        • Oracle
        • SQL Server
      • 网站建设
    • 人工智能
    • 半导体芯片
    • 笔记本电脑
    • 智能手机
    • 智能汽车
    • 编程语言
    IT技术网 - ITJS.CN
    首页 » UI前端 »Go 语言的分布式读写互斥

    Go 语言的分布式读写互斥

    2015-05-05 00:00:00 出处:未来往事
    分享

    Go语言默认的 sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种 n-way RWMutex,也可以称为“大读者(big reader)”锁,它可以为每个 CPU 内核分配独立的 RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。

    查找当前 CPU

    读者使用 CPUID 指令来决定使用何种锁,该指令仅需返回当前活动 CPU 的 APICID,而不需要发出系统调用指令抑或改变运行时。这在 Intel 或 AMD 处理器上均是可以的;ARM 处理器则需要使用 CPU ID 寄存器 。 对于超过 256 个处理器的系统,必须使用 x2APIC, 另外除了 CPUID 还要用到带有EAX=0xb 的 EDX 寄存器。程序启动时,会构建(通过 CPU 亲和力系统调用) APICID 到 CPU 索引的映射, 该映射在处理器的整个生命周期中静态存在。由于 CPUID 指令的开销可能相当昂贵,goroutine 将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的 CPUID 指令时间增加。

    陈旧的 CPU 信息。假如加上锁运行 goroutine 的 CPU 信息可能会是过时的 (goroutine 会转移到另一个核心)。在 reader 记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。

    性能

    这个模式的性能特征会被大量的参数所影响。特别是 CPUID 检测频率,readers 的数量,readers 和 writers 的比率,还有 readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个 writer 活跃的时候,这个 writer 持有锁的时期不会影响 sync.RWMutex 和 DRWMutex 之间的性能差异。

    实验证明DRWMutex表现胜过多核系统,特别writer小于1%的时候,CPUID会在最多每10个锁之间被调用(这种变化取决于锁被持有的持续时间)。甚至在少核的情况下,DRWMutex也在普遍选择通过sync.Mutex使用sync.RWMutex的应用程序的情况下表现好过sync.RWMutex.

    下图显示核数量使用增加每10个的平均性能:

    drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100

    Go 语言的分布式读写互斥

    错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。

    查看go-nuts tread进一步讨论

    cpu_amd64.s

    #include "textflag.h"
    
    // func cpu() uint64
    TEXT 路cpu(SB),NOSPLIT,$0-8
    	MOVL	$0x01, AX // version information
    	MOVL	$0x00, BX // any leaf will do
    	MOVL	$0x00, CX // any subleaf will do
    
    	// call CPUID
    	BYTE $0x0f
    	BYTE $0xa2
    
    	SHRQ	$24, BX // logical cpu id is put in EBX[31-24]
    	MOVQ	BX, ret+0(FP)
    	RET

    main.go

    package main
    
    import (
    	"flag"
    	"fmt"
    	"math/rand"
    	"os"
    	"runtime"
    	"runtime/pprof"
    	"sync"
    	"syscall"
    	"time"
    	"unsafe"
    )
    
    func cpu() uint64 // implemented in cpu_amd64.s
    
    var cpus map[uint64]int
    
    // determine mapping from APIC ID to CPU index by pinning the entire process to
    // one core at the time, and seeing that its APIC ID is.
    func init() {
    	cpus = make(map[uint64]int)
    
    	var aff uint64
    	syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
    
    	n := 0
    	start := time.Now()
    	var mask uint64 = 1
    Outer:
    	for {
    		for (aff & mask) == 0 {
    			mask <<= 1
    			if mask == 0 || mask > aff {
    				break Outer
    			}
    		}
    
    		ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))
    		if ret != 0 {
    			panic(err.Error())
    		}
    
    		// what CPU do we have 
    		<-time.After(1 * time.Millisecond)
    		c := cpu()
    
    		if oldn, ok := cpus[c]; ok {
    			fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c)
    		}
    
    		cpus[c] = n
    		mask <<= 1
    		n++
    	}
    
    	fmt.Printf("%d/%d cpus found in %v: %v/n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus)
    
    	ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
    	if ret != 0 {
    		panic(err.Error())
    	}
    }
    
    type RWMutex2 []sync.RWMutex
    
    func (mx RWMutex2) Lock() {
    	for core := range mx {
    		mx[core].Lock()
    	}
    }
    
    func (mx RWMutex2) Unlock() {
    	for core := range mx {
    		mx[core].Unlock()
    	}
    }
    
    func main() {
    	cpuprofile := flag.Bool("cpuprofile", false, "enable CPU profiling")
    	locks := flag.Uint64("i", 10000, "Number of iterations to perform")
    	write := flag.Float64("p", 0.0001, "Probability of write locks")
    	wwork := flag.Int("w", 1, "Amount of work for each writer")
    	rwork := flag.Int("r", 100, "Amount of work for each reader")
    	readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers")
    	checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations")
    	flag.Parse()
    
    	var o *os.File
    	if *cpuprofile {
    		o, _ := os.Create("rw.out")
    		pprof.StartCPUProfile(o)
    	}
    
    	readers_per_core := *readers / runtime.GOMAXPROCS(0)
    
    	var wg sync.WaitGroup
    
    	var mx1 sync.RWMutex
    
    	start1 := time.Now()
    	for n := 0; n < runtime.GOMAXPROCS(0); n++ {
    		for r := 0; r < readers_per_core; r++ {
    			wg.Add(1)
    			go func() {
    				defer wg.Done()
    				r := rand.New(rand.NewSource(rand.Int63()))
    				for n := uint64(0); n < *locks; n++ {
    					if r.Float64() < *write {
    						mx1.Lock()
    						x := 0
    						for i := 0; i < *wwork; i++ {
    							x++
    						}
    						_ = x
    						mx1.Unlock()
    					} else {
    						mx1.RLock()
    						x := 0
    						for i := 0; i < *rwork; i++ {
    							x++
    						}
    						_ = x
    						mx1.RUnlock()
    					}
    				}
    			}()
    		}
    	}
    	wg.Wait()
    	end1 := time.Now()
    
    	t1 := end1.Sub(start1)
    	fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)
    
    	if *cpuprofile {
    		pprof.StopCPUProfile()
    		o.Close()
    
    		o, _ = os.Create("rw2.out")
    		pprof.StartCPUProfile(o)
    	}
    
    	mx2 := make(RWMutex2, len(cpus))
    
    	start2 := time.Now()
    	for n := 0; n < runtime.GOMAXPROCS(0); n++ {
    		for r := 0; r < readers_per_core; r++ {
    			wg.Add(1)
    			go func() {
    				defer wg.Done()
    				c := cpus[cpu()]
    				r := rand.New(rand.NewSource(rand.Int63()))
    				for n := uint64(0); n < *locks; n++ {
    					if *checkcpu != 0 && n%*checkcpu == 0 {
    						c = cpus[cpu()]
    					}
    
    					if r.Float64() < *write {
    						mx2.Lock()
    						x := 0
    						for i := 0; i < *wwork; i++ {
    							x++
    						}
    						_ = x
    						mx2.Unlock()
    					} else {
    						mx2[c].RLock()
    						x := 0
    						for i := 0; i < *rwork; i++ {
    							x++
    						}
    						_ = x
    						mx2[c].RUnlock()
    					}
    				}
    			}()
    		}
    	}
    	wg.Wait()
    	end2 := time.Now()
    
    	pprof.StopCPUProfile()
    	o.Close()
    
    	t2 := end2.Sub(start2)
    	fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)
    }
    上一篇返回首页 下一篇

    声明: 此文观点不代表本站立场;转载务必保留本文链接;版权疑问请联系我们。

    别人在看

    hiberfil.sys文件可以删除吗?了解该文件并手把手教你删除C盘的hiberfil.sys文件

    Window 10和 Windows 11哪个好?答案是:看你自己的需求

    盗版软件成公司里的“隐形炸弹”?老板们的“法务噩梦” 有救了!

    帝国CMS7.5编辑器上传图片取消宽高的三种方法

    帝国cms如何自动生成缩略图的实现方法

    Windows 12即将到来,将彻底改变人机交互

    帝国CMS 7.5忘记登陆账号密码怎么办?可以phpmyadmin中重置管理员密码

    帝国CMS 7.5 后台编辑器换行,修改回车键br换行为p标签

    Windows 11 版本与 Windows 10比较,新功能一览

    Windows 11激活产品密钥收集及专业版激活方法

    IT头条

    无线路由大厂 TP-Link突然大裁员:补偿N+3

    02:39

    Meta 千万美金招募AI高级人才

    00:22

    更容易爆炸?罗马仕充电宝被北京多所高校禁用,公司紧急回应

    17:19

    天衍”量子计算云平台,“超算+量算” 告别“算力孤岛时代”

    18:18

    华为Pura80系列新机预热,余承东力赞其复杂光线下的视频拍摄实力

    01:28

    技术热点

    MySQL基本调度策略浅析

    MySQL使用INSERT插入多条记录

    SQL Server高可用的常见问题

    3D立体图片展示幻灯片JS特效

    windows 7上网看视频出现绿屏的原因及解决方法

    windows 7 64位系统的HOSTS文件在哪里?想用它加快域名解析

      友情链接:
    • IT采购网
    • 科技号
    • 中国存储网
    • 存储网
    • 半导体联盟
    • 医疗软件网
    • 软件中国
    • ITbrand
    • 采购中国
    • CIO智库
    • 考研题库
    • 法务网
    • AI工具网
    • 电子芯片网
    • 安全库
    • 隐私保护
    • 版权申明
    • 联系我们
    IT技术网 版权所有 © 2020-2025,京ICP备14047533号-20,Power by OK设计网

    在上方输入关键词后,回车键 开始搜索。Esc键 取消该搜索窗口。