You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
4.9 KiB

5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
2 years ago
5 years ago
2 years ago
2 years ago
2 years ago
2 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  8. "io"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  17. "github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
  18. )
  19. type ShellOptions struct {
  20. Masters *string
  21. GrpcDialOption grpc.DialOption
  22. // shell transient context
  23. FilerHost string
  24. FilerPort int64
  25. FilerGroup *string
  26. FilerAddress pb.ServerAddress
  27. Directory string
  28. }
  29. type CommandEnv struct {
  30. env map[string]string
  31. MasterClient *wdclient.MasterClient
  32. option *ShellOptions
  33. locker *exclusive_locks.ExclusiveLocker
  34. }
  35. type command interface {
  36. Name() string
  37. Help() string
  38. Do([]string, *CommandEnv, io.Writer) error
  39. IsResourceHeavy() bool
  40. }
  41. var (
  42. Commands = []command{}
  43. )
  44. func NewCommandEnv(options *ShellOptions) *CommandEnv {
  45. ce := &CommandEnv{
  46. env: make(map[string]string),
  47. MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", *pb.ServerAddresses(*options.Masters).ToServiceDiscovery()),
  48. option: options,
  49. }
  50. ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell")
  51. return ce
  52. }
  53. func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
  54. if strings.HasPrefix(input, "http") {
  55. err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more")
  56. return
  57. }
  58. if !strings.HasPrefix(input, "/") {
  59. input = util.Join(ce.option.Directory, input)
  60. }
  61. return input, err
  62. }
  63. func (ce *CommandEnv) isDirectory(path string) bool {
  64. return ce.checkDirectory(path) == nil
  65. }
  66. func (ce *CommandEnv) confirmIsLocked(args []string) error {
  67. if ce.locker.IsLocked() {
  68. return nil
  69. }
  70. ce.locker.SetMessage(fmt.Sprintf("%v", args))
  71. return fmt.Errorf("need to run \"lock\" first to continue")
  72. }
  73. func (ce *CommandEnv) isLocked() bool {
  74. if ce == nil {
  75. return true
  76. }
  77. return ce.locker.IsLocked()
  78. }
  79. func (ce *CommandEnv) checkDirectory(path string) error {
  80. dir, name := util.FullPath(path).DirAndName()
  81. exists, err := filer_pb.Exists(ce, dir, name, true)
  82. if !exists {
  83. return fmt.Errorf("%s is not a directory", path)
  84. }
  85. return err
  86. }
  87. var _ = filer_pb.FilerClient(&CommandEnv{})
  88. func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  89. return pb.WithGrpcFilerClient(streamingMode, 0, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
  90. }
  91. func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
  92. return location.Url
  93. }
  94. func (ce *CommandEnv) GetDataCenter() string {
  95. return ce.MasterClient.DataCenter
  96. }
  97. func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
  98. if strings.HasPrefix(entryPath, "http") {
  99. var u *url.URL
  100. u, err = url.Parse(entryPath)
  101. if err != nil {
  102. return
  103. }
  104. filerServer = u.Hostname()
  105. portString := u.Port()
  106. if portString != "" {
  107. filerPort, err = strconv.ParseInt(portString, 10, 32)
  108. }
  109. path = u.Path
  110. } else {
  111. err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath)
  112. }
  113. return
  114. }
  115. func findInputDirectory(args []string) (input string) {
  116. input = "."
  117. if len(args) > 0 {
  118. input = args[len(args)-1]
  119. if strings.HasPrefix(input, "-") {
  120. input = "."
  121. }
  122. }
  123. return input
  124. }
  125. func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) {
  126. err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption,
  127. func(client volume_server_pb.VolumeServerClient) error {
  128. if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
  129. VolumeId: volumeId,
  130. NeedleId: uint64(needleValue.Key),
  131. Offset: needleValue.Offset.ToActualOffset(),
  132. Size: int32(needleValue.Size),
  133. }); err != nil {
  134. return err
  135. }
  136. return nil
  137. },
  138. )
  139. return
  140. }
  141. func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) {
  142. err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption,
  143. func(client volume_server_pb.VolumeServerClient) error {
  144. if resp, err = client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
  145. VolumeId: volumeId,
  146. NeedleId: uint64(needleValue.Key),
  147. }); err != nil {
  148. return err
  149. }
  150. return nil
  151. },
  152. )
  153. return
  154. }
  155. func getCollectionName(commandEnv *CommandEnv, bucket string) string {
  156. if *commandEnv.option.FilerGroup != "" {
  157. return fmt.Sprintf("%s_%s", *commandEnv.option.FilerGroup, bucket)
  158. }
  159. return bucket
  160. }