249 lines
7.9 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/cluster"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "io"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandClusterCheck{})
  15. }
  16. type commandClusterCheck struct {
  17. }
  18. func (c *commandClusterCheck) Name() string {
  19. return "cluster.check"
  20. }
  21. func (c *commandClusterCheck) Help() string {
  22. return `check current cluster network connectivity
  23. cluster.check
  24. `
  25. }
  26. func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  27. clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  28. if err = clusterPsCommand.Parse(args); err != nil {
  29. return nil
  30. }
  31. // collect topology information
  32. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
  33. if err != nil {
  34. return err
  35. }
  36. fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(topologyInfo.DiskInfos))
  37. emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""]
  38. hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"]
  39. if !emptyDiskTypeFound && !hddDiskTypeFound {
  40. return fmt.Errorf("Need to a hdd disk type!")
  41. }
  42. if emptyDiskTypeFound && emptyDiskTypeDiskInfo.VolumeCount == 0 || hddDiskTypeFound && hddDiskTypeDiskInfo.VolumeCount == 0 {
  43. return fmt.Errorf("Need to a hdd disk type!")
  44. }
  45. // collect filers
  46. var filers []pb.ServerAddress
  47. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  48. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  49. ClientType: cluster.FilerType,
  50. FilerGroup: *commandEnv.option.FilerGroup,
  51. })
  52. for _, node := range resp.ClusterNodes {
  53. filers = append(filers, pb.ServerAddress(node.Address))
  54. }
  55. return err
  56. })
  57. if err != nil {
  58. return
  59. }
  60. fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers)
  61. // collect volume servers
  62. var volumeServers []pb.ServerAddress
  63. t, _, err := collectTopologyInfo(commandEnv, 0)
  64. if err != nil {
  65. return err
  66. }
  67. for _, dc := range t.DataCenterInfos {
  68. for _, r := range dc.RackInfos {
  69. for _, dn := range r.DataNodeInfos {
  70. volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn))
  71. }
  72. }
  73. }
  74. fmt.Fprintf(writer, "the cluster has %d volume servers: %+v\n", len(volumeServers), volumeServers)
  75. // collect all masters
  76. var masters []pb.ServerAddress
  77. for _, master := range commandEnv.MasterClient.GetMasters() {
  78. masters = append(masters, master)
  79. }
  80. // check from master to volume servers
  81. for _, master := range masters {
  82. for _, volumeServer := range volumeServers {
  83. fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer))
  84. err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
  85. pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
  86. Target: string(volumeServer),
  87. TargetType: cluster.VolumeServerType,
  88. })
  89. if err == nil {
  90. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  91. }
  92. return err
  93. })
  94. if err != nil {
  95. fmt.Fprintf(writer, "%v\n", err)
  96. }
  97. }
  98. }
  99. // check between masters
  100. for _, sourceMaster := range masters {
  101. for _, targetMaster := range masters {
  102. if sourceMaster == targetMaster {
  103. continue
  104. }
  105. fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster))
  106. err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
  107. pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
  108. Target: string(targetMaster),
  109. TargetType: cluster.MasterType,
  110. })
  111. if err == nil {
  112. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  113. }
  114. return err
  115. })
  116. if err != nil {
  117. fmt.Fprintf(writer, "%v\n", err)
  118. }
  119. }
  120. }
  121. // check from volume servers to masters
  122. for _, volumeServer := range volumeServers {
  123. for _, master := range masters {
  124. fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master))
  125. err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  126. pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
  127. Target: string(master),
  128. TargetType: cluster.MasterType,
  129. })
  130. if err == nil {
  131. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  132. }
  133. return err
  134. })
  135. if err != nil {
  136. fmt.Fprintf(writer, "%v\n", err)
  137. }
  138. }
  139. }
  140. // check from filers to masters
  141. for _, filer := range filers {
  142. for _, master := range masters {
  143. fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master))
  144. err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  145. pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  146. Target: string(master),
  147. TargetType: cluster.MasterType,
  148. })
  149. if err == nil {
  150. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  151. }
  152. return err
  153. })
  154. if err != nil {
  155. fmt.Fprintf(writer, "%v\n", err)
  156. }
  157. }
  158. }
  159. // check from filers to volume servers
  160. for _, filer := range filers {
  161. for _, volumeServer := range volumeServers {
  162. fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer))
  163. err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  164. pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  165. Target: string(volumeServer),
  166. TargetType: cluster.VolumeServerType,
  167. })
  168. if err == nil {
  169. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  170. }
  171. return err
  172. })
  173. if err != nil {
  174. fmt.Fprintf(writer, "%v\n", err)
  175. }
  176. }
  177. }
  178. // check between volume servers
  179. for _, sourceVolumeServer := range volumeServers {
  180. for _, targetVolumeServer := range volumeServers {
  181. if sourceVolumeServer == targetVolumeServer {
  182. continue
  183. }
  184. fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
  185. err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  186. pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
  187. Target: string(targetVolumeServer),
  188. TargetType: cluster.VolumeServerType,
  189. })
  190. if err == nil {
  191. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  192. }
  193. return err
  194. })
  195. if err != nil {
  196. fmt.Fprintf(writer, "%v\n", err)
  197. }
  198. }
  199. }
  200. // check between filers, and need to connect to itself
  201. for _, sourceFiler := range filers {
  202. for _, targetFiler := range filers {
  203. fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler))
  204. err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  205. pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  206. Target: string(targetFiler),
  207. TargetType: cluster.FilerType,
  208. })
  209. if err == nil {
  210. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  211. }
  212. return err
  213. })
  214. if err != nil {
  215. fmt.Fprintf(writer, "%v\n", err)
  216. }
  217. }
  218. }
  219. return nil
  220. }
  221. func printTiming(writer io.Writer, startNs, remoteNs, stopNs int64) {
  222. roundTripTimeMs := float32(stopNs-startNs) / 1000000
  223. deltaTimeMs := float32(remoteNs-(startNs+stopNs)/2) / 1000000
  224. fmt.Fprintf(writer, "ok round trip %.3fms clock delta %.3fms\n", roundTripTimeMs, deltaTimeMs)
  225. }