You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
6.4 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/cluster"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  10. "io"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandClusterCheck{})
  15. }
  16. type commandClusterCheck struct {
  17. }
  18. func (c *commandClusterCheck) Name() string {
  19. return "cluster.check"
  20. }
  21. func (c *commandClusterCheck) Help() string {
  22. return `check current cluster network connectivity
  23. cluster.check
  24. `
  25. }
  26. func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  27. clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  28. if err = clusterPsCommand.Parse(args); err != nil {
  29. return nil
  30. }
  31. // collect filers
  32. var filers []pb.ServerAddress
  33. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  34. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  35. ClientType: cluster.FilerType,
  36. })
  37. for _, node := range resp.ClusterNodes {
  38. filers = append(filers, pb.ServerAddress(node.Address))
  39. }
  40. return err
  41. })
  42. if err != nil {
  43. return
  44. }
  45. fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers)
  46. // collect volume servers
  47. var volumeServers []pb.ServerAddress
  48. t, _, err := collectTopologyInfo(commandEnv, 0)
  49. if err != nil {
  50. return err
  51. }
  52. for _, dc := range t.DataCenterInfos {
  53. for _, r := range dc.RackInfos {
  54. for _, dn := range r.DataNodeInfos {
  55. volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn))
  56. }
  57. }
  58. }
  59. fmt.Fprintf(writer, "the cluster has %d volume servers: %+v\n", len(volumeServers), volumeServers)
  60. // collect all masters
  61. var masters []pb.ServerAddress
  62. for _, master := range commandEnv.MasterClient.GetMasters() {
  63. masters = append(masters, master)
  64. }
  65. // check from master to volume servers
  66. for _, master := range masters {
  67. for _, volumeServer := range volumeServers {
  68. fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer))
  69. err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
  70. _, err := client.Ping(context.Background(), &master_pb.PingRequest{
  71. Target: string(volumeServer),
  72. TargetType: cluster.VolumeServerType,
  73. })
  74. return err
  75. })
  76. if err == nil {
  77. fmt.Fprintf(writer, "ok\n")
  78. } else {
  79. fmt.Fprintf(writer, "%v\n", err)
  80. }
  81. }
  82. }
  83. // check between masters
  84. for _, sourceMaster := range masters {
  85. for _, targetMaster := range masters {
  86. fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster))
  87. err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
  88. _, err := client.Ping(context.Background(), &master_pb.PingRequest{
  89. Target: string(targetMaster),
  90. TargetType: cluster.MasterType,
  91. })
  92. return err
  93. })
  94. if err == nil {
  95. fmt.Fprintf(writer, "ok\n")
  96. } else {
  97. fmt.Fprintf(writer, "%v\n", err)
  98. }
  99. }
  100. }
  101. // check from volume servers to masters
  102. for _, volumeServer := range volumeServers {
  103. for _, master := range masters {
  104. fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master))
  105. err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  106. _, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
  107. Target: string(master),
  108. TargetType: cluster.MasterType,
  109. })
  110. return err
  111. })
  112. if err == nil {
  113. fmt.Fprintf(writer, "ok\n")
  114. } else {
  115. fmt.Fprintf(writer, "%v\n", err)
  116. }
  117. }
  118. }
  119. // check from filers to masters
  120. for _, filer := range filers {
  121. for _, master := range masters {
  122. fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master))
  123. err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  124. _, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  125. Target: string(master),
  126. TargetType: cluster.MasterType,
  127. })
  128. return err
  129. })
  130. if err == nil {
  131. fmt.Fprintf(writer, "ok\n")
  132. } else {
  133. fmt.Fprintf(writer, "%v\n", err)
  134. }
  135. }
  136. }
  137. // check from filers to volume servers
  138. for _, filer := range filers {
  139. for _, volumeServer := range volumeServers {
  140. fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer))
  141. err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  142. _, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  143. Target: string(volumeServer),
  144. TargetType: cluster.VolumeServerType,
  145. })
  146. return err
  147. })
  148. if err == nil {
  149. fmt.Fprintf(writer, "ok\n")
  150. } else {
  151. fmt.Fprintf(writer, "%v\n", err)
  152. }
  153. }
  154. }
  155. // check between volume servers
  156. for _, sourceVolumeServer := range volumeServers {
  157. for _, targetVolumeServer := range volumeServers {
  158. fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
  159. err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  160. _, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
  161. Target: string(targetVolumeServer),
  162. TargetType: cluster.VolumeServerType,
  163. })
  164. return err
  165. })
  166. if err == nil {
  167. fmt.Fprintf(writer, "ok\n")
  168. } else {
  169. fmt.Fprintf(writer, "%v\n", err)
  170. }
  171. }
  172. }
  173. // check between filers
  174. for _, sourceFiler := range filers {
  175. for _, targetFiler := range filers {
  176. if sourceFiler == targetFiler {
  177. continue
  178. }
  179. fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler))
  180. err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  181. _, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  182. Target: string(targetFiler),
  183. TargetType: cluster.FilerType,
  184. })
  185. return err
  186. })
  187. if err == nil {
  188. fmt.Fprintf(writer, "ok\n")
  189. } else {
  190. fmt.Fprintf(writer, "%v\n", err)
  191. }
  192. }
  193. }
  194. return nil
  195. }