golang 共享内存

golang 中使用共享内存

简介

  • 故事起源于要搭一个高性能的日志中心。当然使用了elk这一套。但是,对于logstash来说,它主要使用的是文件日志的方式了捕捉log。而写文件日志的话会非常慢。对于实时日志要处理滚动的日志更是这样,每次检查是否需要流动日志,然后打开日志,然后写入,然后关闭,当然这中间可以优化。这一切都是那么慢,发起了n个系统调用,硬盘寻道等。这时候想到了用共享内存来通信。

共享内存的基本知识

  • 要使用共享内存要执行以下几步:
    • 1.发起一个系统调用,让系统帮你生产一块内存,或者取得一块已经存在的内存来使用。
    • 2.把内存attach到当前进程,让当前进程可以使用。大家都知道,我们在进程中访问的是虚拟内存地址,系统会把它映射到物理内存中。如果没有这一步,第1步创建的内存就不能在当前进程访问。
    • 3.这时就可以对内存进程读写操作了。
    • 4.进程结束的时候要把上面attach的内存给释放。

系统调用的基础知识

  • 什么是系统调用?

    1
    系统调用(英语:system call),又称为系统呼叫,指运行在使用者空间的程序向操作系统内核请求需要更高权限运行的服务。系统调用提供用户程序与操作系统之间的接口。
  • 以上引自维基百科。

  • 对于每个系统调用,都一个编号。内核收到编号后,就根据编号去找到对应的内核函数函数来执行。然后返回给应用程序。

  • 系统调用是怎么发起的?以下以linux为例。

    • 1.应用程序以系统调用号和对应的参数传给系统调用api
    • 2.系统调用api将系统调用号存到eax中,然后发起0x80的中断号进行中断
    • 3.内核中的中断处理函数根据系统调用号,调用对应的内核函数(系统调用)
    • 4.系统调用完成相应功能,将返回值存入 eax,返回到中断处理函数
    • 5.中断处理函数返回到 API 中
    • 6.API 将 eax 返回给应用程序
    • 7.以上就完成了系统调用。

在golang中使用共享内存

  • 了解了系统调用之后,下面就开始使用了。第一步当然是去找golang有没有直接提供共享内存的api了。几经折腾后,发现它并没有提供直接的api。而其他很多系统调用都提供了直接的api。究其原因,我想应该是因为这句话吧:

    1
    “不要通过共享内存来通信,而应该通过通信来共享内存”
  • golang不提供使用共享内存来通信。所以直接不提供了,折腾死你们,让你们用不了。

  • 于是乎,google一下解决方案,都是通过cgo来调c语言来实现的。stackoverflow的答案也都是这样。

  • 回来再来看一下golang的syscall的文档。它提供了Syscall函数。声明如下:

    1
    func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
  • 很显示trap是中断号,a1, a2, a3是系统调用相关的参数。

  • 对于中断号,在文档中可以看到,所有的系统都已经定义了常量了。而我们要用到的系统调用有:

    • SYS_SHMGET: 创建或者取得共享内存。
    • SYS_SHMAT: 将共享内存attach到当前进程空间。
    • SYS_SHMDT: 将共享内存从当前进程中deattach。
  • 具体这三个函数的使用,我们可以参考linux的shmget, shmat, shmdt函数。可以看到这三个函数跟上面三个系统调用号的常量名字一样的。 以下是这三个函数的声明:

    1
    2
    3
    int shmget(key_t key, size_t size, int shmflg);  
    void *shmat(int shm_id, const void *shm_addr, int shmflg);
    int shmdt(const void *shmaddr);
  • 以下简单介绍一下这三个函数,具体可以直接去linux上man对应的文档。

shmget函数

  • key,这个参数的类型key_y其实只是一个数字类型。这个参数命名了这一块内存。不要提供0值就行了,0值是private的,不能在进程间共享。

  • size,提供了共享内存的大小。

  • shmflg,权限标志,它的作用与open函数的mode参数一样。如果需要在内存不存在时创建它,则需要指定IPC_CREAT。

  • 在golang的文档中可以看到,它并没定义IPC_CREATE的值。所以我们只能去找到它的值了。在linux的man文档中,它也没有说明。于是乎,直接把linux的代码clone下来进行了grep(我用ag,速度非常快的文档查找工具)。从结果中找到了IPC_CREATE是一个宏,它的值定义成了00001000。一个8进制的数字。低三位都是0,因为低三位是用来标志权限位的。

  • 下面我们直接来发起这个系统调用看一下效果,把调用c的参数一一对应到a1, a2, a3中:

    1
    shmid, _, err := syscall.Syscall(syscall.SYS_SHMGET, 2, 4, IpcCreate|0600)
  • Syscall函数返回了两个值和一个error字段。而c的shmget只返回了一个int值,因为这个函数把结果错误和结果都通过返回值来承载了,如果是小于0的,则是错误,这时对应到go中应该是err的值,没有错误的时候,我们只需要一个返回值,第二个返回值会一直是0。第一个返回值就是给shmat调用的第一个参数。

shmat函数

  • shm_id, 这是shmget返回id,以标志了要attach的是这一块内存

  • shm_addr,这个标志需要把它attach到的内存地址,通常填0,让系统去选择地址来attach

  • shmflg,这个可以值SHM_RDONLY表示只读,其他值为可以读写,我们直接传0就好。

    1
    shmaddr, _, err := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0)
  • c函数返回了进程空间地址,这个调用也是只返回了一个值,所我们只接收第一个值。在c中,如果调用失败,会返回-1。在go中,我们只要直接处理err的值就好了。

shmdt函数

  • shmaddr, 这个参数表示deattach的地址值,是从shmat中返回的。 我们在go中直接用defer来调用就好了:
    1
    defer syscall.Syscall(syscall.SYS_SHMDT, shmaddr, 0, 0)

Golang 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main

import (
"errors"
"flag"
"fmt"
"os"
"sync"
"syscall"
"time"
"unsafe"
)

const (
// IpcCreate create if key is nonexistent
IpcCreate = 00001000
capacity = 1000000
)

type SHM struct {
mutex sync.Mutex
head int
count int
queue [capacity]int
}

func (s *SHM) Append(item int) error {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.count >= capacity {
return errors.New("the capacity is full")
}

s.queue[(s.head+s.count)%capacity] = item
s.count++

return nil
}

func (s *SHM) Consume() []int {
s.mutex.Lock()
defer s.mutex.Unlock()
var ret []int
for i := 0; i < s.count; i++ {
ret = append(ret, s.queue[(s.head+i)%capacity])
}
s.count = 0
s.head = 0
return ret
}

var (
mode = flag.Int("mode", 0, "0:write 1:read")
)

func main() {
flag.Parse()
shmid, _, err := syscall.Syscall(syscall.SYS_SHMGET, 2, 10000, IpcCreate|0600)
if err != 0 {
fmt.Printf("syscall error, err: %v\n", err)
os.Exit(-1)
}
fmt.Printf("shmid: %v\n", shmid)

shmaddr, _, err := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0)
if err != 0 {
fmt.Printf("syscall error, err: %v\n", err)
os.Exit(-2)
}
fmt.Printf("shmaddr: %v\n", shmaddr)

defer syscall.Syscall(syscall.SYS_SHMDT, shmaddr, 0, 0)

shm := (*SHM)(unsafe.Pointer(uintptr(shmaddr)))

if *mode == 0 {
fmt.Println("write mode")
i := 0
for {
fmt.Println("增加数据:", i)
shm.Append(i)
i++
time.Sleep(1 * time.Second)
}
} else {
fmt.Println("read mode")
for {
fmt.Println("消费数据:", shm.Consume())
time.Sleep(2 * time.Second)

}
}
}
  • 需要注意的一点就是:自定义类型的数据中不可以使用 slice,因为slice进行数据append的时候会涉及到扩容,会出现一些问题,后续有待调研

mac 上的查看共享内存

1
2
3
4
# 查看 shm
ipcs -m
# 删除 shm
ipcrm -m pid