You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.1 KiB

3 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/cluster"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "io"
  13. "time"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. )
  16. func init() {
  17. Commands = append(Commands, &commandClusterPs{})
  18. }
  19. type commandClusterPs struct {
  20. }
  21. func (c *commandClusterPs) Name() string {
  22. return "cluster.ps"
  23. }
  24. func (c *commandClusterPs) Help() string {
  25. return `check current cluster process status
  26. cluster.ps
  27. `
  28. }
  29. func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  30. clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  31. if err = clusterPsCommand.Parse(args); err != nil {
  32. return nil
  33. }
  34. var filerNodes []*master_pb.ListClusterNodesResponse_ClusterNode
  35. var mqBrokerNodes []*master_pb.ListClusterNodesResponse_ClusterNode
  36. // get the list of filers
  37. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  38. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  39. ClientType: cluster.FilerType,
  40. FilerGroup: *commandEnv.option.FilerGroup,
  41. })
  42. if err != nil {
  43. return err
  44. }
  45. filerNodes = resp.ClusterNodes
  46. return err
  47. })
  48. if err != nil {
  49. return
  50. }
  51. // get the list of message queue brokers
  52. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  53. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  54. ClientType: cluster.BrokerType,
  55. FilerGroup: *commandEnv.option.FilerGroup,
  56. })
  57. if err != nil {
  58. return err
  59. }
  60. mqBrokerNodes = resp.ClusterNodes
  61. return err
  62. })
  63. if err != nil {
  64. return
  65. }
  66. if len(mqBrokerNodes) > 0 {
  67. fmt.Fprintf(writer, "* message queue brokers %d\n", len(mqBrokerNodes))
  68. for _, node := range mqBrokerNodes {
  69. fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)
  70. if node.DataCenter != "" {
  71. fmt.Fprintf(writer, " DataCenter: %v\n", node.DataCenter)
  72. }
  73. if node.Rack != "" {
  74. fmt.Fprintf(writer, " Rack: %v\n", node.Rack)
  75. }
  76. if node.IsLeader {
  77. fmt.Fprintf(writer, " IsLeader: %v\n", true)
  78. }
  79. }
  80. }
  81. filerSignatures := make(map[*master_pb.ListClusterNodesResponse_ClusterNode]int32)
  82. fmt.Fprintf(writer, "* filers %d\n", len(filerNodes))
  83. for _, node := range filerNodes {
  84. fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)
  85. if node.DataCenter != "" {
  86. fmt.Fprintf(writer, " DataCenter: %v\n", node.DataCenter)
  87. }
  88. if node.Rack != "" {
  89. fmt.Fprintf(writer, " Rack: %v\n", node.Rack)
  90. }
  91. pb.WithFilerClient(false, 0, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  92. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  93. if err == nil {
  94. if resp.FilerGroup != "" {
  95. fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup)
  96. }
  97. fmt.Fprintf(writer, " signature: %d\n", resp.Signature)
  98. filerSignatures[node] = resp.Signature
  99. } else {
  100. fmt.Fprintf(writer, " failed to connect: %v\n", err)
  101. }
  102. return err
  103. })
  104. }
  105. for _, node := range filerNodes {
  106. pb.WithFilerClient(false, 0, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  107. fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address)
  108. selfSignature := filerSignatures[node]
  109. for peer, peerSignature := range filerSignatures {
  110. if selfSignature == peerSignature {
  111. continue
  112. }
  113. if resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: filer.GetPeerMetaOffsetKey(peerSignature)}); err == nil && len(resp.Value) == 8 {
  114. lastTsNs := int64(util.BytesToUint64(resp.Value))
  115. fmt.Fprintf(writer, " %s: %v\n", peer.Address, time.Unix(0, lastTsNs).UTC())
  116. }
  117. }
  118. return nil
  119. })
  120. }
  121. // collect volume servers
  122. var volumeServers []pb.ServerAddress
  123. t, _, err := collectTopologyInfo(commandEnv, 0)
  124. if err != nil {
  125. return err
  126. }
  127. for _, dc := range t.DataCenterInfos {
  128. for _, r := range dc.RackInfos {
  129. for _, dn := range r.DataNodeInfos {
  130. volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn))
  131. }
  132. }
  133. }
  134. fmt.Fprintf(writer, "* volume servers %d\n", len(volumeServers))
  135. for _, dc := range t.DataCenterInfos {
  136. fmt.Fprintf(writer, " * data center: %s\n", dc.Id)
  137. for _, r := range dc.RackInfos {
  138. fmt.Fprintf(writer, " * rack: %s\n", r.Id)
  139. for _, dn := range r.DataNodeInfos {
  140. pb.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  141. resp, err := client.VolumeServerStatus(context.Background(), &volume_server_pb.VolumeServerStatusRequest{})
  142. if err == nil {
  143. fmt.Fprintf(writer, " * %s (%v)\n", dn.Id, resp.Version)
  144. }
  145. return err
  146. })
  147. }
  148. }
  149. }
  150. return nil
  151. }