You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1048 lines
24 KiB

  1. package penet
  2. import (
  3. "container/list"
  4. crand "crypto/rand"
  5. "encoding/binary"
  6. "errors"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. mrand "math/rand"
  9. "net"
  10. "runtime"
  11. "runtime/debug"
  12. "sync"
  13. "time"
  14. )
  15. type DataSend struct {
  16. Seq uint32
  17. Acked bool
  18. Resend byte
  19. Fast bool
  20. Code byte
  21. Time uint32
  22. Data []byte
  23. }
  24. type DataRecv struct {
  25. Seq uint32
  26. AckCnt byte
  27. Code byte
  28. Data []byte
  29. }
  30. type UdpSend struct {
  31. Id uint64
  32. sock *net.UDPConn
  33. remote *net.UDPAddr
  34. seq uint32
  35. rtt float64
  36. rttMax float64
  37. rttMin float64
  38. rate uint32
  39. mss uint32
  40. interval uint32
  41. data []DataSend
  42. dataBegin int
  43. dataLen int
  44. sendRnd int
  45. sendList *list.List
  46. sendListLock sync.Mutex
  47. writable chan bool
  48. writeMax int
  49. isClose bool
  50. closing bool
  51. opening byte
  52. conn *Conn
  53. resendCnt int
  54. name string
  55. acked uint32
  56. recvWnd uint32
  57. }
  58. const (
  59. TypeData uint8 = 1
  60. TypeAck uint8 = 2
  61. TypeClose uint8 = 3
  62. TypeSYN uint8 = 8
  63. )
  64. var (
  65. ErrClose = errors.New("conn close")
  66. ErrTimeout = errors.New("conn timeout")
  67. mss uint32 = 1200
  68. defaultRate = mss * 3000
  69. dropRate = 0.0
  70. writeMaxSep = 5
  71. resendLimit = true
  72. )
  73. func NewUdpSend(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpSend {
  74. u := &UdpSend{
  75. conn: conn,
  76. Id: id,
  77. sock: sock,
  78. remote: remote,
  79. mss: mss,
  80. interval: 20,
  81. data: make([]DataSend, 4),
  82. seq: 1,
  83. rate: defaultRate, // 修复bug: 太高导致压测的时候内存爆炸,速度慢
  84. rtt: 200,
  85. rttMax: 200,
  86. rttMin: 200,
  87. sendList: list.New(),
  88. writable: make(chan bool, 1),
  89. name: name,
  90. }
  91. u.writeMax = int(u.rate/u.mss) / writeMaxSep // fixed bug: 初始化 修复bug: 写入太多,应该一点点写
  92. u.recvWnd = uint32(u.writeMax)
  93. return u
  94. }
  95. func (u *UdpSend) send(nowTime time.Time, buf []byte) {
  96. var sendMax = int(u.rate / u.mss)
  97. u.writeMax = sendMax / writeMaxSep
  98. // glog.V(4).Info("send", u.dataLen, u.name, sendMax, uint32(u.rtt))
  99. var sendCount = uint32(u.rate / u.mss / (1000 / u.interval))
  100. var sendTotal = sendCount
  101. if sendCount > u.recvWnd/5 {
  102. sendCount = u.recvWnd / 5
  103. }
  104. if sendCount <= 0 {
  105. sendCount = 1
  106. }
  107. now := uint32(nowTime.UnixNano() / int64(time.Millisecond))
  108. resendCnt := 0
  109. for i := 0; i < u.dataLen; i++ {
  110. if sendCount <= 0 {
  111. break
  112. }
  113. index := (i + u.dataBegin) % len(u.data)
  114. d := &u.data[index]
  115. if d.Acked == false && now-d.Time >= uint32(u.rtt*2.0) {
  116. glog.V(4).Infof("resend, seq:%v id:%v rtt:%v name:%v", d.Seq, u.Id, uint32(u.rtt), u.name)
  117. // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
  118. // 发送窗口就是配置值,其实发送窗口不需要发送出去
  119. // 接收端能发送了,要告诉对面开始发送。这个逻辑可以通过接受端的发送通道来发送。
  120. headLen := structPack(buf, "BBQHIII", uint8(TypeData),
  121. d.Code, uint64(u.Id), uint16(len(d.Data)+4+4+4), d.Seq, uint32(now), uint32(sendMax))
  122. copy(buf[headLen:], d.Data)
  123. u.sock.WriteToUDP(buf[:headLen+len(d.Data)], u.remote)
  124. d.Time = now
  125. d.Resend++
  126. d.Fast = false
  127. sendCount--
  128. resendCnt++
  129. u.resendCnt++
  130. }
  131. }
  132. u.sendRnd++
  133. if u.sendRnd > 100 && u.dataLen > 0 {
  134. u.sendRnd = 0
  135. var resendMax = uint32(float64(u.dataLen) * 0.6)
  136. if u.resendCnt > 2 {
  137. glog.V(0).Infof("resendCnt:%v resendMax:%v remain:%v rtt:%v id:%v", u.resendCnt,
  138. resendMax, u.dataLen, uint32(u.rtt), u.Id)
  139. u.resendCnt = 0
  140. }
  141. // 修复bug: 之前使用lastrecvtime,存在新增发送数据数据那瞬间,出现超时情况
  142. if u.data[u.dataBegin].Resend > 10 {
  143. glog.V(0).Infof("resend too much, close, id:%v name:%v drop seq:%v remain:%v",
  144. u.Id, u.name, u.data[u.dataBegin].Seq, u.dataLen)
  145. u.conn.close(true, true)
  146. }
  147. }
  148. var resendToomuch bool
  149. if uint32(resendCnt) > sendTotal/3 && resendLimit { // 修复bug: 限速
  150. resendToomuch = true
  151. glog.V(4).Infof("resend too much, slow, %v %v", resendCnt, sendTotal)
  152. }
  153. u.sendListLock.Lock()
  154. for ; sendCount > 0; sendCount-- {
  155. sendData := u.sendList.Front()
  156. if sendData == nil {
  157. break
  158. }
  159. if sendMax-u.dataLen <= 0 || resendToomuch {
  160. break
  161. }
  162. if u.dataLen >= len(u.data) {
  163. newData := make([]DataSend, 2*len(u.data))
  164. copy(newData, u.data[u.dataBegin:])
  165. copy(newData[len(u.data)-u.dataBegin:], u.data[:u.dataBegin])
  166. u.dataBegin = 0
  167. u.data = newData
  168. }
  169. u.sendList.Remove(sendData)
  170. hType := uint8(0)
  171. sdata, ok := sendData.Value.([]byte)
  172. if !ok {
  173. // 发送控制消息
  174. mesCode, _ := sendData.Value.(byte)
  175. hType = mesCode
  176. }
  177. headLen := structPack(buf, "BBQHIII", uint8(TypeData), hType,
  178. uint64(u.Id), uint16(len(sdata)+4+4+4), u.seq, uint32(now), uint32(sendMax))
  179. copy(buf[headLen:], sdata)
  180. u.sock.WriteToUDP(buf[:headLen+len(sdata)], u.remote)
  181. glog.V(4).Infof("send, seq:%v id:%v %v", u.seq, u.Id, u.name)
  182. index := (u.dataBegin + u.dataLen) % len(u.data)
  183. u.data[index] = DataSend{
  184. Seq: u.seq,
  185. Time: now,
  186. Code: hType,
  187. Data: sdata,
  188. }
  189. u.seq++
  190. u.dataLen++
  191. }
  192. listRemain := u.writeMax - u.sendList.Len()
  193. u.sendListLock.Unlock()
  194. if listRemain > 0 {
  195. select {
  196. case u.writable <- !u.isClose:
  197. default:
  198. }
  199. }
  200. if u.isClose && u.dataLen == 0 && u.closing == false { // 修复bug: 之前5秒删除,实际发送没完成
  201. // 修复bug: 接受端write端close为true, 导致5秒后呗删除,来不及接受数据。
  202. glog.V(1).Infof("close and emtpy, rm conn, id:%v name:%v seq:%v listlen:%v",
  203. u.Id, u.name, u.seq, u.sendList.Len())
  204. u.closing = true
  205. u.conn.close(false, true)
  206. }
  207. }
  208. func testDrop() bool {
  209. if dropRate < 0.01 {
  210. return false
  211. }
  212. var v uint32
  213. var b [4]byte
  214. if _, err := crand.Read(b[:]); err != nil {
  215. v = mrand.Uint32()
  216. } else {
  217. v = binary.BigEndian.Uint32(b[:])
  218. }
  219. if v%1000 < uint32(dropRate*1000) {
  220. return true
  221. }
  222. return false
  223. }
  224. func (u *UdpSend) recv(buf []byte) {
  225. if testDrop() {
  226. return
  227. }
  228. now := uint32(time.Now().UnixNano() / int64(time.Millisecond))
  229. // ack包: type0 flag1 id2 len3 tm4 rcvwnd5 acked6
  230. head, headLen := structUnPack(buf, "BBQHIII")
  231. if head[0] == uint64(TypeAck) && u.dataLen > 0 {
  232. firstSeq := u.data[u.dataBegin].Seq
  233. acked := uint32(head[6])
  234. offset := int(int32(acked - firstSeq))
  235. glog.V(4).Infof("id:%v recv ack: %v offset:%v databegin:%v dataLen:%v firstSeq:%v name:%v",
  236. u.Id, acked, offset, u.dataBegin, u.dataLen, firstSeq, u.name)
  237. if offset >= 0 && offset < u.dataLen {
  238. offset++
  239. for i := 0; i < offset; i++ { // 修复bug: 清除引用等
  240. index := (u.dataBegin + i) % len(u.data) // 修复bug,i写成offset
  241. d := &u.data[index]
  242. d.Data = nil
  243. d.Acked = true
  244. }
  245. u.acked += uint32(offset)
  246. u.dataBegin += offset
  247. u.dataBegin = u.dataBegin % len(u.data)
  248. u.dataLen -= offset
  249. glog.V(4).Infof("2 acked ok:%v, id:%v %v", u.acked, u.Id, u.name)
  250. }
  251. if u.dataLen > 0 {
  252. curSeq := u.data[u.dataBegin].Seq
  253. // var ackedSeq = []uint32{}
  254. for i := headLen; i < len(buf); i += 4 {
  255. seq := binary.BigEndian.Uint32(buf[i:])
  256. offset := int(int32(seq - curSeq))
  257. if offset < 0 || offset >= u.dataLen {
  258. continue
  259. }
  260. index := (u.dataBegin + offset) % len(u.data)
  261. d := &u.data[index]
  262. if d.Seq == seq {
  263. d.Acked = true
  264. d.Data = nil // 修复bug: memory leak
  265. // ackedSeq = append(ackedSeq, seq)
  266. } else {
  267. panic("index not correct")
  268. }
  269. }
  270. // if len(ackedSeq) > 0 {
  271. // glog.V(0).Info("seq:", ackedSeq)
  272. // }
  273. }
  274. var i = 0
  275. for ; i < u.dataLen; i++ {
  276. index := (i + u.dataBegin) % len(u.data)
  277. d := &u.data[index]
  278. if d.Acked == false {
  279. break
  280. }
  281. // fmt.Println("acked->", d.Seq)
  282. // if d.Seq == 7 {
  283. // fmt.Println("data:", u.data[u.dataBegin:u.dataBegin+5])
  284. // }
  285. }
  286. if i > 0 {
  287. u.acked += uint32(i)
  288. u.dataBegin += i
  289. u.dataBegin = u.dataBegin % len(u.data)
  290. u.dataLen -= i
  291. glog.V(4).Infof("3 acked ok:%v, id:%v %v %v", u.acked, u.Id, u.dataBegin, u.name)
  292. }
  293. sendTime := uint32(head[4])
  294. rtt := now - sendTime
  295. if rtt > 0 {
  296. if firstSeq < 3 {
  297. u.rtt = float64(rtt) // 初始值
  298. } else {
  299. u.rtt = u.rtt*0.8 + float64(rtt)*0.2
  300. }
  301. if u.rtt < 50.0 {
  302. u.rtt = 50
  303. }
  304. // glog.V(4).Infof("rtt:%v u.rtt:%v id:%v", rtt, u.rtt, u.Id)
  305. }
  306. u.recvWnd = uint32(head[5])
  307. }
  308. }
  309. func structPack(b []byte, format string, param ...interface{}) int {
  310. j := 0
  311. for i, s := range format {
  312. switch s {
  313. case 'I':
  314. p, _ := param[i].(uint32)
  315. binary.BigEndian.PutUint32(b[j:], p)
  316. j += 4
  317. case 'B':
  318. p, _ := param[i].(uint8)
  319. b[j] = p
  320. j++
  321. case 'H':
  322. p, _ := param[i].(uint16)
  323. binary.BigEndian.PutUint16(b[j:], p)
  324. j += 2
  325. case 'Q':
  326. p, _ := param[i].(uint64)
  327. binary.BigEndian.PutUint64(b[j:], p)
  328. j += 8
  329. default:
  330. panic("structPack not found")
  331. }
  332. }
  333. return j
  334. }
  335. func structUnPack(b []byte, format string) ([]uint64, int) {
  336. var re = make([]uint64, 0, len(format))
  337. defer func() {
  338. if err := recover(); err != nil {
  339. // log.Error(err)
  340. re[0] = 0
  341. }
  342. }()
  343. j := 0
  344. for _, s := range format {
  345. switch s {
  346. case 'I':
  347. re = append(re, uint64(binary.BigEndian.Uint32(b[j:])))
  348. j += 4
  349. case 'B':
  350. re = append(re, uint64(b[j]))
  351. j++
  352. case 'H':
  353. re = append(re, uint64(binary.BigEndian.Uint16(b[j:])))
  354. j += 2
  355. case 'Q':
  356. re = append(re, uint64(binary.BigEndian.Uint64(b[j:])))
  357. j += 8
  358. default:
  359. panic("structUnPack not found")
  360. }
  361. }
  362. return re, j
  363. }
  364. type UdpRecv struct {
  365. conn *Conn
  366. Id uint64
  367. sock *net.UDPConn
  368. remote *net.UDPAddr
  369. acked uint32
  370. lastTm uint32
  371. sndWnd uint32
  372. recvCnt uint32
  373. isClose bool
  374. isNew byte
  375. isRecved bool
  376. recvList *list.List
  377. recvListLock sync.Mutex
  378. readable chan byte
  379. readDeadline *time.Time
  380. seqData map[uint32]*DataRecv
  381. dataList *list.List
  382. name string
  383. }
  384. func NewUdpRecv(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpRecv {
  385. return &UdpRecv{
  386. Id: id,
  387. conn: conn,
  388. sock: sock,
  389. remote: remote,
  390. acked: 0,
  391. recvList: list.New(),
  392. dataList: list.New(),
  393. seqData: make(map[uint32]*DataRecv),
  394. readable: make(chan byte, 1),
  395. isNew: 50,
  396. name: name,
  397. sndWnd: 1000,
  398. }
  399. }
  400. func (u *UdpRecv) SetReadDeadline(t time.Time) {
  401. u.readDeadline = &t
  402. }
  403. func (u *UdpRecv) sendAck(nowTime time.Time, buf []byte) {
  404. u.recvListLock.Lock()
  405. for {
  406. if d, ok := u.seqData[u.acked+1]; ok {
  407. u.acked++
  408. if d.Code == TypeClose {
  409. if u.isClose == false {
  410. glog.V(1).Info("recv close:", u.Id)
  411. u.conn.close(false, true)
  412. u.isClose = true
  413. }
  414. } else {
  415. u.recvList.PushBack(d.Data)
  416. }
  417. d.Data = nil
  418. delete(u.seqData, u.acked)
  419. } else {
  420. break
  421. }
  422. }
  423. recvListLen := u.recvList.Len()
  424. u.recvListLock.Unlock()
  425. // glog.V(4).Info("acked:", u.acked, u.Id, recvListLen, u.name)
  426. if recvListLen > 0 { // 修复bug,用标志可能会有读不到的数据 修复bug: 去掉 && u.isClose == false,导致读取延迟
  427. select {
  428. case u.readable <- 1:
  429. default:
  430. }
  431. } else if u.isClose == true { // 修复bug:快速close
  432. select {
  433. case u.readable <- 0:
  434. default:
  435. }
  436. }
  437. if u.readDeadline != nil && !u.readDeadline.IsZero() {
  438. if u.readDeadline.Before(nowTime) { // 修复bug after
  439. select {
  440. case u.readable <- 2:
  441. glog.V(0).Info("read dealline: ", u.Id)
  442. default:
  443. }
  444. }
  445. }
  446. var b = buf[:mss]
  447. buf = buf[mss:]
  448. var n = 0
  449. for i := u.dataList.Front(); i != nil; {
  450. d := i.Value.(*DataRecv)
  451. next := i.Next()
  452. if d.AckCnt > 6 || before(d.Seq, u.acked+1) { // 发几次够了,不反复发
  453. // d.Removed = true
  454. u.dataList.Remove(i) // 修复bug,删除逻辑不对,需要保存next
  455. // delete(u.seqData, d.Seq) // 修复bug: 对面已经确认,这里删了,没有重发,也没有了数据。已经收到的数据并且发了ack的数据,不要删了!
  456. i = next
  457. continue
  458. }
  459. i = next
  460. if d.AckCnt%3 == 0 {
  461. binary.BigEndian.PutUint32(b[n:], d.Seq)
  462. n += 4
  463. if n >= len(b) {
  464. wnd := int(u.sndWnd) - recvListLen - len(u.seqData)
  465. if wnd < 0 {
  466. wnd = 0
  467. }
  468. headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n),
  469. u.lastTm, uint32(wnd), u.acked)
  470. copy(buf[headLen:], b[:n])
  471. u.sock.WriteToUDP(buf[:headLen+n], u.remote)
  472. glog.V(4).Infof("id:%v send ack n:%v", u.Id, n)
  473. n = 0 // 修复bug,没有置零
  474. u.isRecved = false // 修复bug: 有时候发多一条数据
  475. }
  476. }
  477. d.AckCnt++
  478. }
  479. if n > 0 || u.isRecved { // 修复bug: 一直发数据 修复bug: 有时候发多一条数据
  480. wnd := int(u.sndWnd) - recvListLen - len(u.seqData)
  481. if wnd < 0 {
  482. wnd = 0
  483. }
  484. headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n),
  485. u.lastTm, uint32(wnd), u.acked)
  486. copy(buf[headLen:], b[:n])
  487. u.sock.WriteToUDP(buf[:headLen+n], u.remote)
  488. glog.V(4).Infof("id:%v send ack n:%v datalen:%v", u.Id, n, u.dataList.Len())
  489. }
  490. u.isRecved = false
  491. // 修复bug: 如果自己只是发数据,那么自己的acked通道会没用到。所以要判断自己是否在发送数据。
  492. if u.isNew > 0 { // 完成的优化:超时不确认第一包,就删除链接。防止旧链接不断发包。
  493. u.isNew--
  494. if u.isNew == 0 && u.acked == 0 {
  495. glog.V(0).Infof("not recv first packet!!! close, id:%v name:%v", u.Id, u.name)
  496. u.conn.Close()
  497. }
  498. if u.acked >= 1 || u.conn.s.seq > 1 {
  499. u.isNew = 0
  500. }
  501. }
  502. }
  503. // before seq1比seq2小
  504. func before(seq1, seq2 uint32) bool {
  505. return (int32)(seq1-seq2) < 0
  506. }
  507. func after(seq1, seq2 uint32) bool {
  508. return (int32)(seq2-seq1) < 0
  509. }
  510. func (u *UdpRecv) recv(buf []byte) {
  511. if testDrop() {
  512. return
  513. }
  514. u.isRecved = true
  515. u.recvCnt++
  516. // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
  517. head, headLen := structUnPack(buf, "BBQHIII")
  518. if head[0] == uint64(TypeData) {
  519. seq := uint32(head[4])
  520. u.lastTm = uint32(head[5])
  521. u.sndWnd = uint32(head[6])
  522. glog.V(4).Info(u.Id, " recv seq: ", seq, " len: ", u.dataList.Len())
  523. if before(seq, u.acked+1) { // 修复bug: 收到before的数据,seqData不会回收。
  524. // glog.V(0).Info("seq before u.acked:", seq, u.acked, u.Id)
  525. return
  526. }
  527. // 修复bug: 修复没重发问题。之前由于一直会重发,这个逻辑有意义。现在只重发几次。
  528. // if d, ok := u.seqData[seq]; ok {
  529. // d.AckCnt = 0
  530. // if d.Removed == false { // 修复bug:可能没ack导致一直重发,要直到acked全部覆盖。
  531. // return
  532. // }
  533. // }
  534. // glog.V(4).Info(u.Id, " recv 2 seq: ", seq, " len: ", u.dataList.Len())
  535. d := &DataRecv{
  536. Seq: seq,
  537. Data: buf[headLen:],
  538. Code: byte(head[1]),
  539. }
  540. u.dataList.PushBack(d)
  541. u.seqData[seq] = d
  542. }
  543. }
  544. func SetRate(rate uint32) {
  545. defaultRate = rate
  546. }
  547. func SetDropRate(rate float64) {
  548. if rate > 0.001 {
  549. dropRate = rate
  550. }
  551. }
  552. type Conn struct {
  553. Id uint64
  554. s *UdpSend
  555. r *UdpRecv
  556. responsed chan bool
  557. isClose bool
  558. isSendClose bool
  559. isRmConn bool
  560. conns *Conns
  561. }
  562. func NewConn(conns *Conns, Id uint64, localConn *net.UDPConn, remote *net.UDPAddr, name string) *Conn {
  563. conn := &Conn{
  564. Id: Id,
  565. conns: conns,
  566. responsed: make(chan bool, 1),
  567. }
  568. conn.s = NewUdpSend(conn, conn.Id, localConn, remote, name)
  569. conn.r = NewUdpRecv(conn, conn.Id, localConn, remote, name)
  570. return conn
  571. }
  572. func (c *Conn) Write(bb []byte) (n int, err error) {
  573. if c.isClose {
  574. return 0, ErrClose
  575. }
  576. // 1.对方告诉你满了,是要叫你不要发数据了,而不是还发数据。
  577. // 2.没有窗口就没法快速地确定对面有没有满,如果你要等到自己的的buffer满,那么可能会比较慢感知到
  578. // 固定buffer + 停止通知
  579. b := make([]byte, len(bb)) // 修复bug1:没有拷贝
  580. copy(b, bb)
  581. for {
  582. c.s.sendListLock.Lock()
  583. remain := c.s.writeMax - c.s.sendList.Len()
  584. for {
  585. if len(b) <= 0 {
  586. break
  587. }
  588. if remain <= 0 {
  589. break
  590. }
  591. remain--
  592. var sendLen = int(mss)
  593. if len(b) < sendLen {
  594. // 实际发送值
  595. sendLen = len(b)
  596. }
  597. n += sendLen
  598. c.s.sendList.PushBack(b[:sendLen])
  599. // fmt.Println("write:", b[:10])
  600. b = b[sendLen:]
  601. }
  602. c.s.sendListLock.Unlock()
  603. if len(b) <= 0 {
  604. break
  605. }
  606. // glog.V(0).Info("wait write: ", c.Id)
  607. w := <-c.s.writable
  608. if w == false {
  609. return n, ErrClose
  610. }
  611. }
  612. return n, nil
  613. }
  614. func (c *Conn) Read(b []byte) (n int, err error) {
  615. for {
  616. c.r.recvListLock.Lock()
  617. for {
  618. f := c.r.recvList.Front()
  619. if f != nil {
  620. data := f.Value.([]byte)
  621. copy(b[n:], data)
  622. maxCap := len(b[n:])
  623. if maxCap < len(data) {
  624. // b已满
  625. f.Value = data[maxCap:]
  626. n += maxCap
  627. break
  628. } else {
  629. // b未满
  630. c.r.recvList.Remove(f)
  631. n += len(data)
  632. }
  633. } else {
  634. // 读完数据了
  635. break
  636. }
  637. }
  638. c.r.recvListLock.Unlock()
  639. if n <= 0 {
  640. // glog.V(4).Info("wait read", c.Id)
  641. // wait for chan
  642. r := <-c.r.readable
  643. if r == 0 { // close之后总是返回初始值
  644. c.r.recvListLock.Lock()
  645. rlen := c.r.recvList.Len()
  646. c.r.recvListLock.Unlock()
  647. if rlen <= 0 { // 修复bug: 等到read完所有数据才让read返回错误
  648. return n, ErrClose
  649. }
  650. }
  651. if r == 2 {
  652. return n, ErrTimeout
  653. }
  654. } else {
  655. break
  656. }
  657. }
  658. return
  659. }
  660. func (c *Conn) LocalAddr() net.Addr {
  661. return c.conns.sock.LocalAddr()
  662. }
  663. func (c *Conn) RemoteAddr() net.Addr {
  664. return c.conns.sock.RemoteAddr()
  665. }
  666. func (c *Conn) SetDeadline(t time.Time) error {
  667. c.r.SetReadDeadline(t)
  668. return nil
  669. }
  670. func (c *Conn) SetReadDeadline(t time.Time) error {
  671. c.r.SetReadDeadline(t)
  672. return nil
  673. }
  674. func (c *Conn) SetWriteDeadline(t time.Time) error {
  675. return nil
  676. }
  677. func (c *Conn) close(sendClose, rmConn bool) {
  678. if c.isClose == false {
  679. c.isClose = true
  680. c.r.isClose = true
  681. }
  682. if sendClose && c.isSendClose == false { // 修复bug:
  683. c.isSendClose = true
  684. c.s.sendListLock.Lock()
  685. c.s.sendList.PushBack(byte(TypeClose))
  686. c.s.sendListLock.Unlock()
  687. }
  688. if rmConn && c.isRmConn == false {
  689. c.isRmConn = true
  690. time.AfterFunc(time.Second*5, func() {
  691. select {
  692. case c.conns.input <- Input{
  693. typ: ActRmConn,
  694. param: c,
  695. }:
  696. default:
  697. }
  698. })
  699. }
  700. }
  701. func (c *Conn) Close() error {
  702. if c.isClose == false {
  703. c.isClose = true
  704. c.s.isClose = true
  705. // bug: 接收不能主动关闭
  706. c.isSendClose = true
  707. c.s.sendListLock.Lock()
  708. c.s.sendList.PushBack(byte(TypeClose))
  709. c.s.sendListLock.Unlock()
  710. // bug: close之后,5秒数据可能无法完成发送
  711. }
  712. return nil
  713. }
  714. type Conns struct {
  715. conns map[uint64]*Conn
  716. sock *net.UDPConn
  717. accept chan *Conn
  718. isClose bool
  719. isDial bool
  720. timerRnd uint32
  721. input chan Input
  722. }
  723. func NewConns() *Conns {
  724. return &Conns{
  725. conns: make(map[uint64]*Conn),
  726. accept: make(chan *Conn, 256),
  727. input: make(chan Input, 2048),
  728. }
  729. }
  730. func Listen(network, address string) (net.Listener, error) {
  731. addr, err := net.ResolveUDPAddr("udp", address)
  732. if err != nil {
  733. return nil, err
  734. }
  735. conn, err := net.ListenUDP("udp", addr)
  736. if err != nil {
  737. return nil, err
  738. }
  739. listener := NewConns()
  740. listener.sock = conn
  741. go listener.loop()
  742. return listener, nil
  743. }
  744. var dialConns *Conns
  745. var dialConnsLock sync.Mutex
  746. func Dial(network, address string) (net.Conn, error) {
  747. return DialTimeout(network, address, time.Second*3)
  748. }
  749. func DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
  750. addr, err := net.ResolveUDPAddr("udp", address)
  751. if err != nil {
  752. return nil, err
  753. }
  754. var b [8]byte
  755. if _, err := crand.Read(b[:]); err != nil {
  756. return nil, err
  757. }
  758. id := binary.LittleEndian.Uint64(b[:])
  759. glog.V(0).Info("dial new 3:", id)
  760. dialConnsLock.Lock()
  761. if dialConns == nil {
  762. dialConns = NewConns()
  763. }
  764. if dialConns.sock == nil {
  765. s, err := net.ListenUDP("udp", &net.UDPAddr{})
  766. if err != nil {
  767. dialConnsLock.Unlock()
  768. return nil, err
  769. }
  770. dialConns.sock = s
  771. dialConns.isDial = true
  772. go dialConns.loop()
  773. }
  774. dialConnsLock.Unlock()
  775. conn := NewConn(dialConns, id, dialConns.sock, addr, "reqer")
  776. dialConns.input <- Input{
  777. typ: ActAddConn,
  778. param: conn,
  779. }
  780. // conn.s.sendListLock.Lock()
  781. // conn.s.sendList.PushBack(byte(TypeSYN))
  782. // conn.s.sendListLock.Unlock()
  783. return conn, nil
  784. }
  785. func (c *Conns) Accept() (net.Conn, error) {
  786. for {
  787. if c.isClose {
  788. return nil, errors.New("listener close")
  789. }
  790. conn := <-c.accept
  791. if conn == nil {
  792. return nil, errors.New("listener close")
  793. }
  794. return conn, nil
  795. }
  796. }
  797. func (c *Conns) Close() error {
  798. c.isClose = true
  799. c.sock.Close()
  800. c.input <- Input{
  801. typ: ActEnd,
  802. }
  803. return nil
  804. }
  805. func (c *Conns) Addr() net.Addr {
  806. return c.sock.LocalAddr()
  807. }
  808. const (
  809. ActData = 1
  810. ActTimer = 2
  811. ActAddConn = 3
  812. ActRmConn = 4
  813. ActEnd = 5
  814. )
  815. type Input struct {
  816. typ uint8
  817. data []byte
  818. param interface{}
  819. }
  820. func (c *Conns) loop() {
  821. // 这里输入时间和数据
  822. // 只起一个timer,给所有conn发
  823. // 之前用setreaddeadline这个不太好,容易出现长时间没超时
  824. runtime.LockOSThread()
  825. glog.V(0).Info("loop: ", c.isDial)
  826. go func() {
  827. var buf = make([]byte, 2048)
  828. for {
  829. n, remote, err := c.sock.ReadFromUDP(buf)
  830. if n <= 0 || err != nil {
  831. c.Close()
  832. return
  833. }
  834. b := make([]byte, n)
  835. copy(b, buf[:n])
  836. c.input <- Input{
  837. typ: ActData,
  838. data: b,
  839. param: remote,
  840. }
  841. if c.isClose {
  842. return
  843. }
  844. }
  845. }()
  846. var timerRunning bool
  847. var releaseMemory uint32
  848. var buf = make([]byte, mss*3)
  849. for {
  850. data := <-c.input
  851. switch data.typ {
  852. case ActData:
  853. head, _ := structUnPack(data.data, "BBQ")
  854. // fmt.Println(head[0], head[2])
  855. var dataType = head[0]
  856. if conn, ok := c.conns[head[2]]; ok {
  857. // TODO: 给dial中的链接发送成功 -> 暂时不需要,现在dial不判断这些,默认成功
  858. if dataType == uint64(TypeData) {
  859. conn.r.recv(data.data)
  860. } else if dataType == uint64(TypeAck) {
  861. conn.s.recv(data.data)
  862. }
  863. } else {
  864. if c.isDial == false && dataType == uint64(TypeData) && c.isClose == false { // 不需要TypeSYN
  865. // glog.V(0).Info("create new:", head[2])
  866. // 只有主动listen的,才有新链接,而dial自己就会创建新连接,不用创建
  867. conn := NewConn(c, head[2], c.sock, data.param.(*net.UDPAddr), "rsper")
  868. c.conns[conn.Id] = conn
  869. conn.r.recv(data.data)
  870. select {
  871. case c.accept <- conn:
  872. default:
  873. }
  874. if timerRunning == false {
  875. timerRunning = true
  876. go c.runTimer(c.timerRnd)
  877. glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial)
  878. }
  879. }
  880. }
  881. case ActTimer:
  882. now := time.Now()
  883. for _, conn := range c.conns {
  884. conn.s.send(now, buf)
  885. conn.r.sendAck(now, buf)
  886. }
  887. if len(c.conns) == 0 && timerRunning {
  888. glog.V(0).Info("no conn, stop timer, round:", c.timerRnd, c.isDial)
  889. c.timerRnd++
  890. timerRunning = false
  891. debug.FreeOSMemory()
  892. }
  893. releaseMemory++
  894. if releaseMemory > 50*60 {
  895. releaseMemory = 0
  896. go func() {
  897. debug.FreeOSMemory() // 修复bug: go在windows不回收内存
  898. }()
  899. }
  900. case ActAddConn:
  901. conn := data.param.(*Conn)
  902. c.conns[conn.Id] = conn
  903. if timerRunning == false {
  904. timerRunning = true
  905. go c.runTimer(c.timerRnd)
  906. glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial)
  907. }
  908. case ActRmConn:
  909. conn := data.param.(*Conn)
  910. // fmt.Println("rm conn:", conn.Id)
  911. conn.isClose = true
  912. conn.s.isClose = true
  913. conn.r.isClose = true
  914. if _, ok := c.conns[conn.Id]; ok {
  915. glog.V(0).Info("rm conn ok:", conn.Id, c.isDial, len(c.conns))
  916. close(conn.r.readable)
  917. close(conn.s.writable) // 修复bug: 没有close
  918. delete(c.conns, conn.Id)
  919. }
  920. case ActEnd:
  921. c.timerRnd++
  922. timerRunning = false
  923. c.isClose = true
  924. for _, conn := range c.conns {
  925. conn.isClose = true
  926. conn.s.isClose = true
  927. conn.r.isClose = true
  928. close(conn.r.readable)
  929. close(conn.s.writable)
  930. }
  931. close(c.accept)
  932. c.conns = make(map[uint64]*Conn)
  933. return
  934. }
  935. }
  936. }
  937. func (c *Conns) runTimer(rnd uint32) {
  938. // runtime.LockOSThread()
  939. for {
  940. time.Sleep(20 * time.Millisecond)
  941. // C.usleep(20 * 1000)
  942. if rnd != c.timerRnd {
  943. glog.V(0).Info("timer stop, round:", rnd, c.isDial)
  944. }
  945. if c.isClose || rnd != c.timerRnd {
  946. return
  947. }
  948. c.input <- Input{typ: ActTimer}
  949. }
  950. }