You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

107 lines
2.3 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package wdclient
  2. import (
  3. "bufio"
  4. "github.com/chrislusf/seaweedfs/weed/pb"
  5. "github.com/chrislusf/seaweedfs/weed/udptransfer"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient/net2"
  8. "io"
  9. "net"
  10. "time"
  11. )
  12. // VolumeUdpClient put/get/delete file chunks directly on volume servers without replication
  13. type VolumeUdpClient struct {
  14. cp net2.ConnectionPool
  15. }
  16. type VolumeUdpConn struct {
  17. net.Conn
  18. bufWriter *bufio.Writer
  19. bufReader *bufio.Reader
  20. }
  21. func NewVolumeUdpClient() *VolumeUdpClient {
  22. MaxIdleTime := 10 * time.Second
  23. return &VolumeUdpClient{
  24. cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{
  25. MaxActiveConnections: 16,
  26. MaxIdleConnections: 1,
  27. MaxIdleTime: &MaxIdleTime,
  28. DialMaxConcurrency: 0,
  29. Dial: func(network string, address string) (net.Conn, error) {
  30. listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
  31. LocalAddr: "",
  32. Bandwidth: 100,
  33. FastRetransmit: true,
  34. FlatTraffic: true,
  35. IsServ: false,
  36. })
  37. if err != nil {
  38. return nil, err
  39. }
  40. conn, err := listener.Dial(address)
  41. if err != nil {
  42. return nil, err
  43. }
  44. return &VolumeUdpConn{
  45. conn,
  46. bufio.NewWriter(conn),
  47. bufio.NewReader(conn),
  48. }, err
  49. },
  50. NowFunc: nil,
  51. ReadTimeout: 0,
  52. WriteTimeout: 0,
  53. }),
  54. }
  55. }
  56. func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
  57. udpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20001)
  58. if parseErr != nil {
  59. return parseErr
  60. }
  61. listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
  62. LocalAddr: "",
  63. Bandwidth: 100,
  64. FastRetransmit: true,
  65. FlatTraffic: true,
  66. IsServ: false,
  67. })
  68. if err != nil {
  69. return err
  70. }
  71. conn, err := listener.Dial(udpAddress)
  72. if err != nil {
  73. return err
  74. }
  75. defer conn.Close()
  76. bufWriter := bufio.NewWriter(conn)
  77. buf := []byte("+" + fileId + "\n")
  78. _, err = bufWriter.Write([]byte(buf))
  79. if err != nil {
  80. return
  81. }
  82. util.Uint32toBytes(buf[0:4], fileSize)
  83. _, err = bufWriter.Write(buf[0:4])
  84. if err != nil {
  85. return
  86. }
  87. _, err = io.Copy(bufWriter, fileReader)
  88. if err != nil {
  89. return
  90. }
  91. bufWriter.Write([]byte("!\n"))
  92. bufWriter.Flush()
  93. return nil
  94. }