|
|
package penet
import ( "container/list" crand "crypto/rand" "encoding/binary" "errors" "github.com/chrislusf/seaweedfs/weed/glog" mrand "math/rand" "net" "runtime" "runtime/debug" "sync" "time" )
type DataSend struct { Seq uint32 Acked bool Resend byte Fast bool Code byte Time uint32 Data []byte }
type DataRecv struct { Seq uint32 AckCnt byte Code byte Data []byte }
type UdpSend struct { Id uint64 sock *net.UDPConn remote *net.UDPAddr seq uint32 rtt float64 rttMax float64 rttMin float64 rate uint32 mss uint32 interval uint32 data []DataSend dataBegin int dataLen int sendRnd int sendList *list.List sendListLock sync.Mutex writable chan bool writeMax int isClose bool closing bool opening byte conn *Conn resendCnt int name string acked uint32 recvWnd uint32 }
const ( TypeData uint8 = 1 TypeAck uint8 = 2 TypeClose uint8 = 3 TypeSYN uint8 = 8 )
var ( ErrClose = errors.New("conn close") ErrTimeout = errors.New("conn timeout")
mss uint32 = 1200 defaultRate = mss * 3000 dropRate = 0.0 writeMaxSep = 5 resendLimit = true )
func NewUdpSend(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpSend { u := &UdpSend{ conn: conn, Id: id, sock: sock, remote: remote, mss: mss, interval: 20, data: make([]DataSend, 4), seq: 1, rate: defaultRate, // 修复bug: 太高导致压测的时候内存爆炸,速度慢
rtt: 200, rttMax: 200, rttMin: 200, sendList: list.New(), writable: make(chan bool, 1), name: name, } u.writeMax = int(u.rate/u.mss) / writeMaxSep // fixed bug: 初始化 修复bug: 写入太多,应该一点点写
u.recvWnd = uint32(u.writeMax) return u }
func (u *UdpSend) send(nowTime time.Time, buf []byte) { var sendMax = int(u.rate / u.mss) u.writeMax = sendMax / writeMaxSep // glog.V(4).Info("send", u.dataLen, u.name, sendMax, uint32(u.rtt))
var sendCount = uint32(u.rate / u.mss / (1000 / u.interval)) var sendTotal = sendCount if sendCount > u.recvWnd/5 { sendCount = u.recvWnd / 5 } if sendCount <= 0 { sendCount = 1 }
now := uint32(nowTime.UnixNano() / int64(time.Millisecond)) resendCnt := 0 for i := 0; i < u.dataLen; i++ { if sendCount <= 0 { break } index := (i + u.dataBegin) % len(u.data) d := &u.data[index] if d.Acked == false && now-d.Time >= uint32(u.rtt*2.0) { glog.V(4).Infof("resend, seq:%v id:%v rtt:%v name:%v", d.Seq, u.Id, uint32(u.rtt), u.name) // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
// 发送窗口就是配置值,其实发送窗口不需要发送出去
// 接收端能发送了,要告诉对面开始发送。这个逻辑可以通过接受端的发送通道来发送。
headLen := structPack(buf, "BBQHIII", uint8(TypeData), d.Code, uint64(u.Id), uint16(len(d.Data)+4+4+4), d.Seq, uint32(now), uint32(sendMax)) copy(buf[headLen:], d.Data) u.sock.WriteToUDP(buf[:headLen+len(d.Data)], u.remote) d.Time = now d.Resend++ d.Fast = false sendCount-- resendCnt++ u.resendCnt++ }
}
u.sendRnd++ if u.sendRnd > 100 && u.dataLen > 0 { u.sendRnd = 0
var resendMax = uint32(float64(u.dataLen) * 0.6) if u.resendCnt > 2 { glog.V(0).Infof("resendCnt:%v resendMax:%v remain:%v rtt:%v id:%v", u.resendCnt, resendMax, u.dataLen, uint32(u.rtt), u.Id) u.resendCnt = 0 } // 修复bug: 之前使用lastrecvtime,存在新增发送数据数据那瞬间,出现超时情况
if u.data[u.dataBegin].Resend > 10 { glog.V(0).Infof("resend too much, close, id:%v name:%v drop seq:%v remain:%v", u.Id, u.name, u.data[u.dataBegin].Seq, u.dataLen) u.conn.close(true, true) } }
var resendToomuch bool if uint32(resendCnt) > sendTotal/3 && resendLimit { // 修复bug: 限速
resendToomuch = true glog.V(4).Infof("resend too much, slow, %v %v", resendCnt, sendTotal) }
u.sendListLock.Lock() for ; sendCount > 0; sendCount-- { sendData := u.sendList.Front() if sendData == nil { break }
if sendMax-u.dataLen <= 0 || resendToomuch { break } if u.dataLen >= len(u.data) { newData := make([]DataSend, 2*len(u.data)) copy(newData, u.data[u.dataBegin:]) copy(newData[len(u.data)-u.dataBegin:], u.data[:u.dataBegin]) u.dataBegin = 0 u.data = newData }
u.sendList.Remove(sendData) hType := uint8(0) sdata, ok := sendData.Value.([]byte) if !ok { // 发送控制消息
mesCode, _ := sendData.Value.(byte) hType = mesCode } headLen := structPack(buf, "BBQHIII", uint8(TypeData), hType, uint64(u.Id), uint16(len(sdata)+4+4+4), u.seq, uint32(now), uint32(sendMax)) copy(buf[headLen:], sdata) u.sock.WriteToUDP(buf[:headLen+len(sdata)], u.remote) glog.V(4).Infof("send, seq:%v id:%v %v", u.seq, u.Id, u.name)
index := (u.dataBegin + u.dataLen) % len(u.data) u.data[index] = DataSend{ Seq: u.seq, Time: now, Code: hType, Data: sdata, } u.seq++ u.dataLen++ } listRemain := u.writeMax - u.sendList.Len() u.sendListLock.Unlock()
if listRemain > 0 { select { case u.writable <- !u.isClose: default: } }
if u.isClose && u.dataLen == 0 && u.closing == false { // 修复bug: 之前5秒删除,实际发送没完成
// 修复bug: 接受端write端close为true, 导致5秒后呗删除,来不及接受数据。
glog.V(1).Infof("close and emtpy, rm conn, id:%v name:%v seq:%v listlen:%v", u.Id, u.name, u.seq, u.sendList.Len()) u.closing = true u.conn.close(false, true) } }
func testDrop() bool { if dropRate < 0.01 { return false } var v uint32 var b [4]byte if _, err := crand.Read(b[:]); err != nil { v = mrand.Uint32() } else { v = binary.BigEndian.Uint32(b[:]) } if v%1000 < uint32(dropRate*1000) { return true } return false }
func (u *UdpSend) recv(buf []byte) {
if testDrop() { return }
now := uint32(time.Now().UnixNano() / int64(time.Millisecond))
// ack包: type0 flag1 id2 len3 tm4 rcvwnd5 acked6
head, headLen := structUnPack(buf, "BBQHIII") if head[0] == uint64(TypeAck) && u.dataLen > 0 {
firstSeq := u.data[u.dataBegin].Seq acked := uint32(head[6]) offset := int(int32(acked - firstSeq)) glog.V(4).Infof("id:%v recv ack: %v offset:%v databegin:%v dataLen:%v firstSeq:%v name:%v", u.Id, acked, offset, u.dataBegin, u.dataLen, firstSeq, u.name) if offset >= 0 && offset < u.dataLen { offset++ for i := 0; i < offset; i++ { // 修复bug: 清除引用等
index := (u.dataBegin + i) % len(u.data) // 修复bug,i写成offset
d := &u.data[index] d.Data = nil d.Acked = true } u.acked += uint32(offset) u.dataBegin += offset u.dataBegin = u.dataBegin % len(u.data) u.dataLen -= offset glog.V(4).Infof("2 acked ok:%v, id:%v %v", u.acked, u.Id, u.name) } if u.dataLen > 0 { curSeq := u.data[u.dataBegin].Seq // var ackedSeq = []uint32{}
for i := headLen; i < len(buf); i += 4 { seq := binary.BigEndian.Uint32(buf[i:]) offset := int(int32(seq - curSeq)) if offset < 0 || offset >= u.dataLen { continue } index := (u.dataBegin + offset) % len(u.data) d := &u.data[index] if d.Seq == seq { d.Acked = true d.Data = nil // 修复bug: memory leak
// ackedSeq = append(ackedSeq, seq)
} else { panic("index not correct") } } // if len(ackedSeq) > 0 {
// glog.V(0).Info("seq:", ackedSeq)
// }
}
var i = 0 for ; i < u.dataLen; i++ { index := (i + u.dataBegin) % len(u.data) d := &u.data[index] if d.Acked == false { break } // fmt.Println("acked->", d.Seq)
// if d.Seq == 7 {
// fmt.Println("data:", u.data[u.dataBegin:u.dataBegin+5])
// }
} if i > 0 { u.acked += uint32(i) u.dataBegin += i u.dataBegin = u.dataBegin % len(u.data) u.dataLen -= i glog.V(4).Infof("3 acked ok:%v, id:%v %v %v", u.acked, u.Id, u.dataBegin, u.name) }
sendTime := uint32(head[4]) rtt := now - sendTime if rtt > 0 { if firstSeq < 3 { u.rtt = float64(rtt) // 初始值
} else { u.rtt = u.rtt*0.8 + float64(rtt)*0.2 } if u.rtt < 50.0 { u.rtt = 50 } // glog.V(4).Infof("rtt:%v u.rtt:%v id:%v", rtt, u.rtt, u.Id)
}
u.recvWnd = uint32(head[5]) }
}
func structPack(b []byte, format string, param ...interface{}) int { j := 0 for i, s := range format { switch s { case 'I': p, _ := param[i].(uint32) binary.BigEndian.PutUint32(b[j:], p) j += 4 case 'B': p, _ := param[i].(uint8) b[j] = p j++ case 'H': p, _ := param[i].(uint16) binary.BigEndian.PutUint16(b[j:], p) j += 2 case 'Q': p, _ := param[i].(uint64) binary.BigEndian.PutUint64(b[j:], p) j += 8 default: panic("structPack not found") } } return j }
func structUnPack(b []byte, format string) ([]uint64, int) { var re = make([]uint64, 0, len(format)) defer func() { if err := recover(); err != nil { // log.Error(err)
re[0] = 0 } }() j := 0 for _, s := range format { switch s { case 'I': re = append(re, uint64(binary.BigEndian.Uint32(b[j:]))) j += 4 case 'B': re = append(re, uint64(b[j])) j++ case 'H': re = append(re, uint64(binary.BigEndian.Uint16(b[j:]))) j += 2 case 'Q': re = append(re, uint64(binary.BigEndian.Uint64(b[j:]))) j += 8 default: panic("structUnPack not found") } } return re, j }
type UdpRecv struct { conn *Conn Id uint64 sock *net.UDPConn remote *net.UDPAddr acked uint32 lastTm uint32 sndWnd uint32 recvCnt uint32 isClose bool isNew byte isRecved bool recvList *list.List recvListLock sync.Mutex readable chan byte readDeadline *time.Time seqData map[uint32]*DataRecv dataList *list.List name string }
func NewUdpRecv(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpRecv { return &UdpRecv{ Id: id, conn: conn, sock: sock, remote: remote, acked: 0, recvList: list.New(), dataList: list.New(), seqData: make(map[uint32]*DataRecv), readable: make(chan byte, 1), isNew: 50, name: name, sndWnd: 1000, } }
func (u *UdpRecv) SetReadDeadline(t time.Time) { u.readDeadline = &t }
func (u *UdpRecv) sendAck(nowTime time.Time, buf []byte) {
u.recvListLock.Lock() for { if d, ok := u.seqData[u.acked+1]; ok { u.acked++ if d.Code == TypeClose { if u.isClose == false { glog.V(1).Info("recv close:", u.Id) u.conn.close(false, true) u.isClose = true } } else { u.recvList.PushBack(d.Data) } d.Data = nil delete(u.seqData, u.acked) } else { break } } recvListLen := u.recvList.Len() u.recvListLock.Unlock()
// glog.V(4).Info("acked:", u.acked, u.Id, recvListLen, u.name)
if recvListLen > 0 { // 修复bug,用标志可能会有读不到的数据 修复bug: 去掉 && u.isClose == false,导致读取延迟
select { case u.readable <- 1: default: } } else if u.isClose == true { // 修复bug:快速close
select { case u.readable <- 0: default: } } if u.readDeadline != nil && !u.readDeadline.IsZero() { if u.readDeadline.Before(nowTime) { // 修复bug after
select { case u.readable <- 2: glog.V(0).Info("read dealline: ", u.Id) default: } } }
var b = buf[:mss] buf = buf[mss:] var n = 0 for i := u.dataList.Front(); i != nil; { d := i.Value.(*DataRecv)
next := i.Next() if d.AckCnt > 6 || before(d.Seq, u.acked+1) { // 发几次够了,不反复发
// d.Removed = true
u.dataList.Remove(i) // 修复bug,删除逻辑不对,需要保存next
// delete(u.seqData, d.Seq) // 修复bug: 对面已经确认,这里删了,没有重发,也没有了数据。已经收到的数据并且发了ack的数据,不要删了!
i = next continue } i = next
if d.AckCnt%3 == 0 { binary.BigEndian.PutUint32(b[n:], d.Seq) n += 4 if n >= len(b) { wnd := int(u.sndWnd) - recvListLen - len(u.seqData) if wnd < 0 { wnd = 0 } headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n), u.lastTm, uint32(wnd), u.acked) copy(buf[headLen:], b[:n]) u.sock.WriteToUDP(buf[:headLen+n], u.remote) glog.V(4).Infof("id:%v send ack n:%v", u.Id, n) n = 0 // 修复bug,没有置零
u.isRecved = false // 修复bug: 有时候发多一条数据
} } d.AckCnt++ } if n > 0 || u.isRecved { // 修复bug: 一直发数据 修复bug: 有时候发多一条数据
wnd := int(u.sndWnd) - recvListLen - len(u.seqData) if wnd < 0 { wnd = 0 } headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n), u.lastTm, uint32(wnd), u.acked) copy(buf[headLen:], b[:n]) u.sock.WriteToUDP(buf[:headLen+n], u.remote) glog.V(4).Infof("id:%v send ack n:%v datalen:%v", u.Id, n, u.dataList.Len()) } u.isRecved = false
// 修复bug: 如果自己只是发数据,那么自己的acked通道会没用到。所以要判断自己是否在发送数据。
if u.isNew > 0 { // 完成的优化:超时不确认第一包,就删除链接。防止旧链接不断发包。
u.isNew-- if u.isNew == 0 && u.acked == 0 { glog.V(0).Infof("not recv first packet!!! close, id:%v name:%v", u.Id, u.name) u.conn.Close() } if u.acked >= 1 || u.conn.s.seq > 1 { u.isNew = 0 } }
}
// before seq1比seq2小
func before(seq1, seq2 uint32) bool { return (int32)(seq1-seq2) < 0 }
func after(seq1, seq2 uint32) bool { return (int32)(seq2-seq1) < 0 }
func (u *UdpRecv) recv(buf []byte) {
if testDrop() { return }
u.isRecved = true u.recvCnt++ // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
head, headLen := structUnPack(buf, "BBQHIII") if head[0] == uint64(TypeData) {
seq := uint32(head[4]) u.lastTm = uint32(head[5]) u.sndWnd = uint32(head[6])
glog.V(4).Info(u.Id, " recv seq: ", seq, " len: ", u.dataList.Len())
if before(seq, u.acked+1) { // 修复bug: 收到before的数据,seqData不会回收。
// glog.V(0).Info("seq before u.acked:", seq, u.acked, u.Id)
return }
// 修复bug: 修复没重发问题。之前由于一直会重发,这个逻辑有意义。现在只重发几次。
// if d, ok := u.seqData[seq]; ok {
// d.AckCnt = 0
// if d.Removed == false { // 修复bug:可能没ack导致一直重发,要直到acked全部覆盖。
// return
// }
// }
// glog.V(4).Info(u.Id, " recv 2 seq: ", seq, " len: ", u.dataList.Len())
d := &DataRecv{ Seq: seq, Data: buf[headLen:], Code: byte(head[1]), } u.dataList.PushBack(d) u.seqData[seq] = d
}
}
func SetRate(rate uint32) { defaultRate = rate }
func SetDropRate(rate float64) { if rate > 0.001 { dropRate = rate } }
type Conn struct { Id uint64 s *UdpSend r *UdpRecv responsed chan bool isClose bool isSendClose bool isRmConn bool conns *Conns }
func NewConn(conns *Conns, Id uint64, localConn *net.UDPConn, remote *net.UDPAddr, name string) *Conn { conn := &Conn{ Id: Id, conns: conns, responsed: make(chan bool, 1), } conn.s = NewUdpSend(conn, conn.Id, localConn, remote, name) conn.r = NewUdpRecv(conn, conn.Id, localConn, remote, name) return conn }
func (c *Conn) Write(bb []byte) (n int, err error) {
if c.isClose { return 0, ErrClose }
// 1.对方告诉你满了,是要叫你不要发数据了,而不是还发数据。
// 2.没有窗口就没法快速地确定对面有没有满,如果你要等到自己的的buffer满,那么可能会比较慢感知到
// 固定buffer + 停止通知
b := make([]byte, len(bb)) // 修复bug1:没有拷贝
copy(b, bb) for { c.s.sendListLock.Lock() remain := c.s.writeMax - c.s.sendList.Len() for { if len(b) <= 0 { break } if remain <= 0 { break } remain-- var sendLen = int(mss) if len(b) < sendLen { // 实际发送值
sendLen = len(b) } n += sendLen c.s.sendList.PushBack(b[:sendLen]) // fmt.Println("write:", b[:10])
b = b[sendLen:] } c.s.sendListLock.Unlock() if len(b) <= 0 { break } // glog.V(0).Info("wait write: ", c.Id)
w := <-c.s.writable if w == false { return n, ErrClose } }
return n, nil }
func (c *Conn) Read(b []byte) (n int, err error) { for { c.r.recvListLock.Lock() for { f := c.r.recvList.Front() if f != nil { data := f.Value.([]byte) copy(b[n:], data) maxCap := len(b[n:]) if maxCap < len(data) { // b已满
f.Value = data[maxCap:] n += maxCap break } else { // b未满
c.r.recvList.Remove(f) n += len(data) } } else { // 读完数据了
break } } c.r.recvListLock.Unlock() if n <= 0 { // glog.V(4).Info("wait read", c.Id)
// wait for chan
r := <-c.r.readable if r == 0 { // close之后总是返回初始值
c.r.recvListLock.Lock() rlen := c.r.recvList.Len() c.r.recvListLock.Unlock() if rlen <= 0 { // 修复bug: 等到read完所有数据才让read返回错误
return n, ErrClose } } if r == 2 { return n, ErrTimeout } } else { break } }
return }
func (c *Conn) LocalAddr() net.Addr { return c.conns.sock.LocalAddr() }
func (c *Conn) RemoteAddr() net.Addr { return c.conns.sock.RemoteAddr() }
func (c *Conn) SetDeadline(t time.Time) error { c.r.SetReadDeadline(t) return nil }
func (c *Conn) SetReadDeadline(t time.Time) error { c.r.SetReadDeadline(t) return nil }
func (c *Conn) SetWriteDeadline(t time.Time) error { return nil }
func (c *Conn) close(sendClose, rmConn bool) { if c.isClose == false { c.isClose = true c.r.isClose = true } if sendClose && c.isSendClose == false { // 修复bug:
c.isSendClose = true c.s.sendListLock.Lock() c.s.sendList.PushBack(byte(TypeClose)) c.s.sendListLock.Unlock() } if rmConn && c.isRmConn == false { c.isRmConn = true time.AfterFunc(time.Second*5, func() { select { case c.conns.input <- Input{ typ: ActRmConn, param: c, }: default: } }) } }
func (c *Conn) Close() error { if c.isClose == false { c.isClose = true c.s.isClose = true // bug: 接收不能主动关闭
c.isSendClose = true c.s.sendListLock.Lock() c.s.sendList.PushBack(byte(TypeClose)) c.s.sendListLock.Unlock() // bug: close之后,5秒数据可能无法完成发送
} return nil }
type Conns struct { conns map[uint64]*Conn sock *net.UDPConn accept chan *Conn isClose bool isDial bool timerRnd uint32 input chan Input }
func NewConns() *Conns { return &Conns{ conns: make(map[uint64]*Conn), accept: make(chan *Conn, 256), input: make(chan Input, 2048), } }
func Listen(network, address string) (net.Listener, error) { addr, err := net.ResolveUDPAddr("udp", address) if err != nil { return nil, err } conn, err := net.ListenUDP("udp", addr) if err != nil { return nil, err } listener := NewConns() listener.sock = conn go listener.loop() return listener, nil }
var dialConns *Conns var dialConnsLock sync.Mutex
func Dial(network, address string) (net.Conn, error) { return DialTimeout(network, address, time.Second*3) }
func DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) { addr, err := net.ResolveUDPAddr("udp", address) if err != nil { return nil, err } var b [8]byte if _, err := crand.Read(b[:]); err != nil { return nil, err } id := binary.LittleEndian.Uint64(b[:])
glog.V(0).Info("dial new 3:", id)
dialConnsLock.Lock() if dialConns == nil { dialConns = NewConns() } if dialConns.sock == nil { s, err := net.ListenUDP("udp", &net.UDPAddr{}) if err != nil { dialConnsLock.Unlock() return nil, err } dialConns.sock = s dialConns.isDial = true go dialConns.loop() } dialConnsLock.Unlock()
conn := NewConn(dialConns, id, dialConns.sock, addr, "reqer") dialConns.input <- Input{ typ: ActAddConn, param: conn, }
// conn.s.sendListLock.Lock()
// conn.s.sendList.PushBack(byte(TypeSYN))
// conn.s.sendListLock.Unlock()
return conn, nil }
func (c *Conns) Accept() (net.Conn, error) { for { if c.isClose { return nil, errors.New("listener close") } conn := <-c.accept if conn == nil { return nil, errors.New("listener close") } return conn, nil } }
func (c *Conns) Close() error { c.isClose = true c.sock.Close() c.input <- Input{ typ: ActEnd, } return nil }
func (c *Conns) Addr() net.Addr { return c.sock.LocalAddr() }
const ( ActData = 1 ActTimer = 2 ActAddConn = 3 ActRmConn = 4 ActEnd = 5 )
type Input struct { typ uint8 data []byte param interface{} }
func (c *Conns) loop() { // 这里输入时间和数据
// 只起一个timer,给所有conn发
// 之前用setreaddeadline这个不太好,容易出现长时间没超时
runtime.LockOSThread()
glog.V(0).Info("loop: ", c.isDial)
go func() { var buf = make([]byte, 2048) for { n, remote, err := c.sock.ReadFromUDP(buf) if n <= 0 || err != nil { c.Close() return } b := make([]byte, n) copy(b, buf[:n]) c.input <- Input{ typ: ActData, data: b, param: remote, } if c.isClose { return } } }()
var timerRunning bool var releaseMemory uint32 var buf = make([]byte, mss*3)
for { data := <-c.input switch data.typ { case ActData: head, _ := structUnPack(data.data, "BBQ") // fmt.Println(head[0], head[2])
var dataType = head[0] if conn, ok := c.conns[head[2]]; ok { // TODO: 给dial中的链接发送成功 -> 暂时不需要,现在dial不判断这些,默认成功
if dataType == uint64(TypeData) { conn.r.recv(data.data) } else if dataType == uint64(TypeAck) { conn.s.recv(data.data) } } else { if c.isDial == false && dataType == uint64(TypeData) && c.isClose == false { // 不需要TypeSYN
// glog.V(0).Info("create new:", head[2])
// 只有主动listen的,才有新链接,而dial自己就会创建新连接,不用创建
conn := NewConn(c, head[2], c.sock, data.param.(*net.UDPAddr), "rsper") c.conns[conn.Id] = conn conn.r.recv(data.data) select { case c.accept <- conn: default: } if timerRunning == false { timerRunning = true go c.runTimer(c.timerRnd) glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial) } } } case ActTimer: now := time.Now() for _, conn := range c.conns { conn.s.send(now, buf) conn.r.sendAck(now, buf) } if len(c.conns) == 0 && timerRunning { glog.V(0).Info("no conn, stop timer, round:", c.timerRnd, c.isDial) c.timerRnd++ timerRunning = false debug.FreeOSMemory() } releaseMemory++ if releaseMemory > 50*60 { releaseMemory = 0 go func() { debug.FreeOSMemory() // 修复bug: go在windows不回收内存
}() } case ActAddConn: conn := data.param.(*Conn) c.conns[conn.Id] = conn if timerRunning == false { timerRunning = true go c.runTimer(c.timerRnd) glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial) } case ActRmConn: conn := data.param.(*Conn) // fmt.Println("rm conn:", conn.Id)
conn.isClose = true conn.s.isClose = true conn.r.isClose = true if _, ok := c.conns[conn.Id]; ok { glog.V(0).Info("rm conn ok:", conn.Id, c.isDial, len(c.conns)) close(conn.r.readable) close(conn.s.writable) // 修复bug: 没有close
delete(c.conns, conn.Id) } case ActEnd: c.timerRnd++ timerRunning = false c.isClose = true for _, conn := range c.conns { conn.isClose = true conn.s.isClose = true conn.r.isClose = true close(conn.r.readable) close(conn.s.writable) } close(c.accept) c.conns = make(map[uint64]*Conn) return }
}
}
func (c *Conns) runTimer(rnd uint32) { // runtime.LockOSThread()
for { time.Sleep(20 * time.Millisecond) // C.usleep(20 * 1000)
if rnd != c.timerRnd { glog.V(0).Info("timer stop, round:", rnd, c.isDial) } if c.isClose || rnd != c.timerRnd { return } c.input <- Input{typ: ActTimer} } }
|