You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
4.9 KiB

  1. package pb
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/util"
  6. "net"
  7. "strconv"
  8. "strings"
  9. )
  10. type ServerAddress string
  11. type ServerAddresses string
  12. type ServerSrvAddress string
  13. func NewServerAddress(host string, port int, grpcPort int) ServerAddress {
  14. if grpcPort == 0 || grpcPort == port+10000 {
  15. return ServerAddress(util.JoinHostPort(host, port))
  16. }
  17. return ServerAddress(util.JoinHostPort(host, port) + "." + strconv.Itoa(grpcPort))
  18. }
  19. func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress {
  20. if grpcPort == 0 {
  21. return ServerAddress(address)
  22. }
  23. _, port, _ := hostAndPort(address)
  24. if uint64(grpcPort) == port+10000 {
  25. return ServerAddress(address)
  26. }
  27. return ServerAddress(address + "." + strconv.Itoa(grpcPort))
  28. }
  29. func NewServerAddressFromDataNode(dn *master_pb.DataNodeInfo) ServerAddress {
  30. return NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort))
  31. }
  32. func NewServerAddressFromLocation(dn *master_pb.Location) ServerAddress {
  33. return NewServerAddressWithGrpcPort(dn.Url, int(dn.GrpcPort))
  34. }
  35. func (sa ServerAddress) String() string {
  36. return sa.ToHttpAddress()
  37. }
  38. func (sa ServerAddress) ToHttpAddress() string {
  39. portsSepIndex := strings.LastIndex(string(sa), ":")
  40. if portsSepIndex < 0 {
  41. return string(sa)
  42. }
  43. if portsSepIndex+1 >= len(sa) {
  44. return string(sa)
  45. }
  46. ports := string(sa[portsSepIndex+1:])
  47. sepIndex := strings.LastIndex(string(ports), ".")
  48. if sepIndex >= 0 {
  49. host := string(sa[0:portsSepIndex])
  50. return net.JoinHostPort(host, ports[0:sepIndex])
  51. }
  52. return string(sa)
  53. }
  54. func (sa ServerAddress) ToGrpcAddress() string {
  55. portsSepIndex := strings.LastIndex(string(sa), ":")
  56. if portsSepIndex < 0 {
  57. return string(sa)
  58. }
  59. if portsSepIndex+1 >= len(sa) {
  60. return string(sa)
  61. }
  62. ports := string(sa[portsSepIndex+1:])
  63. sepIndex := strings.LastIndex(ports, ".")
  64. if sepIndex >= 0 {
  65. host := string(sa[0:portsSepIndex])
  66. return net.JoinHostPort(host, ports[sepIndex+1:])
  67. }
  68. return ServerToGrpcAddress(string(sa))
  69. }
  70. // LookUp may return an error for some records along with successful lookups - make sure you do not
  71. // discard `addresses` even if `err == nil`
  72. func (r ServerSrvAddress) LookUp() (addresses []ServerAddress, err error) {
  73. _, records, lookupErr := net.LookupSRV("", "", string(r))
  74. if lookupErr != nil {
  75. err = fmt.Errorf("lookup SRV address %s: %v", r, lookupErr)
  76. }
  77. for _, srv := range records {
  78. address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
  79. addresses = append(addresses, ServerAddress(address))
  80. }
  81. return
  82. }
  83. // ToServiceDiscovery expects one of: a comma-separated list of ip:port, like
  84. //
  85. // 10.0.0.1:9999,10.0.0.2:24:9999
  86. //
  87. // OR an SRV Record prepended with 'dnssrv+', like:
  88. //
  89. // dnssrv+_grpc._tcp.master.consul
  90. // dnssrv+_grpc._tcp.headless.default.svc.cluster.local
  91. // dnssrv+seaweed-master.master.consul
  92. func (sa ServerAddresses) ToServiceDiscovery() (sd *ServerDiscovery) {
  93. sd = &ServerDiscovery{}
  94. prefix := "dnssrv+"
  95. if strings.HasPrefix(string(sa), prefix) {
  96. trimmed := strings.TrimPrefix(string(sa), prefix)
  97. srv := ServerSrvAddress(trimmed)
  98. sd.srvRecord = &srv
  99. } else {
  100. sd.list = sa.ToAddresses()
  101. }
  102. return
  103. }
  104. func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) {
  105. parts := strings.Split(string(sa), ",")
  106. for _, address := range parts {
  107. if address != "" {
  108. addresses = append(addresses, ServerAddress(address))
  109. }
  110. }
  111. return
  112. }
  113. func (sa ServerAddresses) ToAddressMap() (addresses map[string]ServerAddress) {
  114. addresses = make(map[string]ServerAddress)
  115. for _, address := range sa.ToAddresses() {
  116. addresses[string(address)] = address
  117. }
  118. return
  119. }
  120. func (sa ServerAddresses) ToAddressStrings() (addresses []string) {
  121. parts := strings.Split(string(sa), ",")
  122. for _, address := range parts {
  123. addresses = append(addresses, address)
  124. }
  125. return
  126. }
  127. func ToAddressStrings(addresses []ServerAddress) []string {
  128. var strings []string
  129. for _, addr := range addresses {
  130. strings = append(strings, string(addr))
  131. }
  132. return strings
  133. }
  134. func ToAddressStringsFromMap(addresses map[string]ServerAddress) []string {
  135. var strings []string
  136. for _, addr := range addresses {
  137. strings = append(strings, string(addr))
  138. }
  139. return strings
  140. }
  141. func FromAddressStrings(strings []string) []ServerAddress {
  142. var addresses []ServerAddress
  143. for _, addr := range strings {
  144. addresses = append(addresses, ServerAddress(addr))
  145. }
  146. return addresses
  147. }
  148. func ParseUrl(input string) (address ServerAddress, path string, err error) {
  149. if !strings.HasPrefix(input, "http://") {
  150. return "", "", fmt.Errorf("url %s needs prefix 'http://'", input)
  151. }
  152. input = input[7:]
  153. pathSeparatorIndex := strings.Index(input, "/")
  154. hostAndPorts := input
  155. if pathSeparatorIndex > 0 {
  156. path = input[pathSeparatorIndex:]
  157. hostAndPorts = input[0:pathSeparatorIndex]
  158. }
  159. commaSeparatorIndex := strings.Index(input, ":")
  160. if commaSeparatorIndex < 0 {
  161. err = fmt.Errorf("port should be specified in %s", input)
  162. return
  163. }
  164. address = ServerAddress(hostAndPorts)
  165. return
  166. }