libchan是docker衍生出来的子模块,用来在goroutine间、进程间、机器间提供相同的类似go channel的通信方式,每个channal是单工的,方便程序从多goroutine到多进程再到多机的扩展。使用上跟linux pipe类似。
项目地址: https://github.com/docker/libchan
代码版本:commit 1e141b35ee
这是项目里提到的功能:
- Simple message passing
- Synchronization for concurrent programming
- Nesting: channels can send channels
1/3是可以理解的,2有一点疑问,可以参考项目issue里的讨论。
使用方法参考项目里的demo: https://github.com/docker/libchan/tree/master/examples/rexec
接口定义
项目里目前提供了基于tcp/spdy和inmem两种实现,每种实现只需实现以下接口即可:1
2
3
4
5
6
7
8
9
10
11type Transport interface {
NewSendChannel() (Sender, error)
WaitReceiveChannel() (Receiver, error)
}
type Sender interface {
Send(message interface{}) error
Close() error
}
type Receiver interface {
Receive(message interface{}) error
}
inmem没有提供Transport,是通过Pipe()函数成生的sender & receiver,因此spdy/inmen的实现在channel初始化的时候稍有不同。
可打包传输的数据类型
数据传输使用github.com/dmcgowan/go/codec进行编码,打包格式用的msgpack。
codec默认支持以下数据类型:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// file: https://github.com/dmcgowan/go/blob/master/codec/encode.go
// encDriver abstracts the actual codec (binc vs msgpack, etc)
type encDriver interface {
isBuiltinType(rt uintptr) bool
encodeBuiltin(rt uintptr, v interface{})
encodeNil()
encodeInt(i int64)
encodeUint(i uint64)
encodeBool(b bool)
encodeFloat32(f float32)
encodeFloat64(f float64)
encodeExtPreamble(xtag byte, length int)
encodeArrayPreamble(length int)
encodeMapPreamble(length int)
encodeString(c charEncoding, v string)
encodeSymbol(v string)
encodeStringBytes(c charEncoding, v []byte)
//TODO
//encBignum(f *big.Int)
//encStringRunes(c charEncoding, v []rune)
}
codec支持扩展,可以注册其他数据类型的编解码句柄。libchan扩展支持Sender/Receiver/io.ReadWriteCloser/io.ReadCloser/io.WriteCloser的打包传输。
TCPConn & UDPConn属于io.ReadWriteCloser类型,spdy实现只能传输建立在sender&receiver所在主机之间的连接。
io对象传输的实现流程是:
- sender端需要打包发送io句柄A
- 在sender&receiver之间建立通信管道P(P-Sender, P-Receiver),inmem、spdy分别使用net.Pipe和spdy.stream建立。
- sender起goroutine在A & P-Sender间进行数据拷贝,发送管道P的ID,receiver端对P-receiver进行读写。
操作完成后的情况如下图:
###代码分析
####inmem
每一对sender&receiver都有与之关联的一个session数据结构,用来管理通过该chan传输的sender、receiver、io对象等。1
2
3
4
5
6
7
8
9
10
11
12type streamSession struct {
pipeLock sync.Mutex
pipeCount uint64
pipeReaders map[uint64]*io.PipeReader
pipeWriters map[uint64]*io.PipeWriter
handler codec.Handle
referenceLock sync.Mutex
referenceID uint64
byteStreams map[uint64]*byteStream
}
handler是一个codec编解码句柄。
io对象的传输是通过net.Pipe()创建的管道和sender端的proxy实现,IO通道存储在byteStreams字段里,net.Pipe()返回两个句柄,对应两个byteStream对象,分别存储在map[ref_ID] map[ref_ID+1]里,本地使用map[ref_ID]进行读写,传输ref_ID+1给对端,对端使用map[ref_ID+1]的对象读写。
io.ReadClose & io.WriteClose & io.ReadWriteCloser实现方式相同,只是在sender端起的proxy routine不同。byteStream是可读写的,根据存储byteStream对象的字段类型的不同会表现出只读、只写、读写的不同特征。
Sender & Receiver通过io.Pipe()传输数据,返回的通道是单工的,读写两端分别存储在pipeReaders & pipeWriters字段里。S & R传输处理流程类似,比如senderA.send(otherB),如果senderB属于senderA的session,则直接传输对应的map-key,如果不是则新建一个io.Pipe,通过goroutine io.copy实现转发。
接下来分析下io.ReadWriteCloser、Sender对象编码传输、解码使用的流程。
#####临时对象构造
首先对要传输的对象构造出来一个临时的拷贝,其中的io.ReadWriteCloser/Sender转换成内部对应的bytesStream对象和同session的Sender,如有需要,转发的proxy都已启动。1
2
3
4
5
6
7
8
9
10
11
12
13
14func (w *pipeSender) copyValue(v interface{}) (interface{}, error) {
switch val := v.(type) {
...
case *pipeSender:
if val.session != w.session {
return w.copySender(val)
}
...
case io.ReadWriteCloser:
return w.copyByteStream(val)
...
}
return v, nil
}
copyByteStream行为是在该session内新建一个net.Pipe,然后启动转发的proxy-routine。newByteStream()创建一对byteStream对象代表net.Pipe的两端,存储在byteStreams字段里,id分别为referenceID和referenceID+1,referenceID+1给对端使用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35func (w *pipeSender) copyByteStream(stream io.ReadWriteCloser) (io.ReadWriteCloser, error) {
streamCopy, err := w.session.newByteStream()
if err != nil {
return nil, err
}
go func() {
io.Copy(streamCopy, stream)
streamCopy.Close()
}()
go func() {
io.Copy(stream, streamCopy)
stream.Close()
}()
return streamCopy, nil
}
func (s *streamSession) newByteStream() (io.ReadWriteCloser, error) {
c1, c2 := net.Pipe()
bs := &byteStream{
Conn: c1,
referenceID: s.referenceID,
session: s,
}
s.referenceLock.Lock()
s.byteStreams[s.referenceID] = bs
s.byteStreams[s.referenceID+1] = &byteStream{
Conn: c2,
referenceID: s.referenceID + 1,
session: s,
}
s.referenceID = s.referenceID + 2
s.referenceLock.Unlock()
return bs, nil
}
copySender()行为类似:1
2
3
4
5
6
7
8
9
10
11func (w *pipeSender) copySender(val Sender) (Sender, error) {
recv, send, err := w.CreateNestedReceiver()
if err != nil {
return nil, err
}
go func() {
Copy(val, recv)
val.Close()
}()
return send, nil
}
#####序列化
byteStream和Sender使用注册的codec扩展句柄编码,byteStream编码函数:1
2
3
4
5
6
7
8
9
10func (s *streamSession) encodeStream(v reflect.Value) ([]byte, error) {
bs := v.Interface().(byteStream)
if bs.referenceID == 0 {
return nil, errors.New("bad type")
}
var buf [8]byte
written := binary.PutUvarint(buf[:], uint64(bs.referenceID)^0x01)
return buf[:written], nil
}
byteStream每次都创建一对,并且referenceID从2顺序使用,因此本端对象的referenceID总是偶数,对端使用referenceID+1存储的byteStream,uint64(bs.referenceID)^0x01等价于bs.referenceID+1,这个地方太绕了。
encodeSender()编码Sender对象,行为类似。
#####反序列化
对于一个整数,反序列化后的类型取决于存储他的对象类型,所以收发两端的数据结构不对应就会出错。byteStream反序列化逻辑如下,Sender处理逻辑类似。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17func (s *streamSession) decodeStream(v reflect.Value, b []byte) error {
referenceID, readN := binary.Uvarint(b)
if readN == 0 {
return errors.New("bad reference id")
}
bs, ok := s.byteStreams[referenceID]
if !ok {
return errors.New("byte stream does not exist")
}
if bs != nil {
v.Set(reflect.ValueOf(*bs))
}
return nil
}
byteStream结构体嵌入了net.Conn,因此是一个io.ReadWriteClose对象。1
2
3
4
5type byteStream struct {
net.Conn
referenceID uint64
session *streamSession
}
####spdy
这是一个基于tcp+spdy作为传输层的实现,每一对sender&receiver或io对象都对应spdy的一个stream。spdy实现了Transport,也使用该数据结构管理在一个chan间传输的sub-chan、io对象的信息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25type Transport struct {
conn *spdystream.Connection
handler codec.Handle
receiverChan chan *channel
channelC *sync.Cond
channels map[uint64]*channel
referenceLock sync.Mutex
referenceCounter uint64
byteStreamC *sync.Cond
byteStreams map[uint64]*byteStream
netConnC *sync.Cond
netConns map[byte]map[string]net.Conn
networks map[string]byte
}
type channel struct {
referenceID uint64
parentID uint64
stream *spdystream.Stream
session *Transport
direction direction
}
byteStreams存储io对象转发的通道,使用TCPConn传输,因此是双工的。怎么使用依赖于持有他的对象。
channels存储两机间的chan通道。channel封装了一个byteStream对象和direction字段,因此只能单向使用。
因为Sender&Receiver和io对象都通过spdy的stream实现,所以stream建立的时候就需要进行区分:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54func (s *Transport) newStreamHandler(stream *spdystream.Stream) {
referenceIDString := stream.Headers().Get("libchan-ref")
parentIDString := stream.Headers().Get("libchan-parent-ref")
returnHeaders := http.Header{}
finish := false
referenceID, parseErr := strconv.ParseUint(referenceIDString, 10, 64)
if parseErr != nil {
returnHeaders.Set("status", "400")
finish = true
} else {
// parentIDString区分了channel和stream
if parentIDString == "" {
byteStream := &byteStream{
referenceID: referenceID,
stream: stream,
session: s,
}
s.byteStreamC.L.Lock()
s.byteStreams[referenceID] = byteStream
s.byteStreamC.Broadcast()
s.byteStreamC.L.Unlock()
returnHeaders.Set("status", "200")
} else {
parentID, parseErr := strconv.ParseUint(parentIDString, 10, 64)
if parseErr != nil {
returnHeaders.Set("status", "400")
finish = true
} else {
c := &channel{
referenceID: referenceID,
parentID: parentID,
stream: stream,
session: s,
}
s.channelC.L.Lock()
s.channels[referenceID] = c
s.channelC.Broadcast()
s.channelC.L.Unlock()
// subchannel是没有方向概念的
if parentID == 0 {
c.direction = inbound
s.receiverChan <- c
}
returnHeaders.Set("status", "200")
}
}
}
stream.SendReply(returnHeaders, finish)
}
如果parentIDString为空,则是一个byteStream,该stream只读、只写、读写是由持有他的对象决定的。parentIDString非空且不等于0,则是在两机间新建一个管道,创建总是由sender端发起;如果不等于,则是一个subchannel,该chan用来发送还是接收是由持有他的对象决定的。
spdy实现在函数initializeHandler()注册的扩展类型编解码句柄:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24func (s *Transport) initializeHandler() *codec.MsgpackHandle {
mh := &codec.MsgpackHandle{WriteExt: true}
...
// Register networks
s.networks["tcp"] = 0x04
s.netConns[0x04] = make(map[string]net.Conn)
err = mh.AddExt(reflect.TypeOf(net.TCPConn{}), 0x04, s.encodeNetConn, s.decodeNetConn)
if err != nil {
panic(err)
}
s.networks["udp"] = 0x05
s.netConns[0x05] = make(map[string]net.Conn)
err = mh.AddExt(reflect.TypeOf(net.UDPConn{}), 0x05, s.encodeNetConn, s.decodeNetConn)
if err != nil {
panic(err)
}
// TODO add unix network as 0x06
return mh
}
相比inmem增加了TCPConn和UDPConn的额外处理。因此这也限制了spdy只能传输在两机间建立的连接,且接收端已经将该conn注册到这个session内。
这是代码中的注释:
// RegisterConn registers a network connection to be used
// by inbound messages referring to the connection
// with the registered connection's local and remote address.
// Note: a connection does not need to be registered before
// being sent in a message, but does need to be registered
// to by the receiver of a message. If registration should be
// automatic, register a listener instead.
注册的连接保存在netConns字段内,使用二维map存储:map[network-type][localaddr<>remoteaddr]。传输时编码的是一个三元组(network-type, local-addr, remote-addr) 。实现函数是encodeNetConn(), decodeNetConn(),注意解码时的字段顺序。
网络连接的传输可以参考这个测试用例:https://github.com/docker/libchan/blob/master/spdy/conn_test.go
###summary
总体来说channel通信方式使用很直观方便,屏蔽了底层的通信建立细节。但是实现方式不容易理解,看了spdy代码也不放心,出了问题debug有困难,比如:1
2
3
4
5
6
7
8
9
10func (s *Transport) getByteStream(referenceID uint64) *byteStream {
s.byteStreamC.L.Lock()
bs, ok := s.byteStreams[referenceID]
if !ok {
s.byteStreamC.Wait()
bs, ok = s.byteStreams[referenceID]
}
s.byteStreamC.L.Unlock()
return bs
}
if是不是应该换成while判断。