You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
5.6 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
6 years ago
  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/filer2"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. )
  13. func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
  14. return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  15. entry := &filer_pb.Entry{
  16. Name: dirName,
  17. IsDirectory: true,
  18. Attributes: &filer_pb.FuseAttributes{
  19. Mtime: time.Now().Unix(),
  20. Crtime: time.Now().Unix(),
  21. FileMode: uint32(0777 | os.ModeDir),
  22. Uid: OS_UID,
  23. Gid: OS_GID,
  24. },
  25. }
  26. if fn != nil {
  27. fn(entry)
  28. }
  29. request := &filer_pb.CreateEntryRequest{
  30. Directory: parentDirectoryPath,
  31. Entry: entry,
  32. }
  33. glog.V(1).Infof("mkdir: %v", request)
  34. if err := filer_pb.CreateEntry(client, request); err != nil {
  35. glog.V(0).Infof("mkdir %v: %v", request, err)
  36. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  37. }
  38. return nil
  39. })
  40. }
  41. func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error {
  42. return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  43. entry := &filer_pb.Entry{
  44. Name: fileName,
  45. IsDirectory: false,
  46. Attributes: &filer_pb.FuseAttributes{
  47. Mtime: time.Now().Unix(),
  48. Crtime: time.Now().Unix(),
  49. FileMode: uint32(0770),
  50. Uid: OS_UID,
  51. Gid: OS_GID,
  52. },
  53. Chunks: chunks,
  54. }
  55. request := &filer_pb.CreateEntryRequest{
  56. Directory: parentDirectoryPath,
  57. Entry: entry,
  58. }
  59. glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
  60. if err := filer_pb.CreateEntry(client, request); err != nil {
  61. glog.V(0).Infof("create file %v:%v", request, err)
  62. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  63. }
  64. return nil
  65. })
  66. }
  67. func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit int) (entries []*filer_pb.Entry, err error) {
  68. err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  69. request := &filer_pb.ListEntriesRequest{
  70. Directory: parentDirectoryPath,
  71. Prefix: prefix,
  72. StartFromFileName: startFrom,
  73. InclusiveStartFrom: inclusive,
  74. Limit: uint32(limit),
  75. }
  76. glog.V(4).Infof("read directory: %v", request)
  77. stream, err := client.ListEntries(context.Background(), request)
  78. if err != nil {
  79. glog.V(0).Infof("read directory %v: %v", request, err)
  80. return fmt.Errorf("list dir %v: %v", parentDirectoryPath, err)
  81. }
  82. for {
  83. resp, recvErr := stream.Recv()
  84. if recvErr != nil {
  85. if recvErr == io.EOF {
  86. break
  87. } else {
  88. return recvErr
  89. }
  90. }
  91. entries = append(entries, resp.Entry)
  92. }
  93. return nil
  94. })
  95. return
  96. }
  97. func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDirectory, isDeleteData, isRecursive bool) error {
  98. return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  99. request := &filer_pb.DeleteEntryRequest{
  100. Directory: parentDirectoryPath,
  101. Name: entryName,
  102. IsDeleteData: isDeleteData,
  103. IsRecursive: isRecursive,
  104. }
  105. glog.V(1).Infof("delete entry %v/%v: %v", parentDirectoryPath, entryName, request)
  106. if _, err := client.DeleteEntry(context.Background(), request); err != nil {
  107. glog.V(0).Infof("delete entry %v: %v", request, err)
  108. return fmt.Errorf("delete entry %s/%s: %v", parentDirectoryPath, entryName, err)
  109. }
  110. return nil
  111. })
  112. }
  113. func (s3a *S3ApiServer) streamRemove(quiet bool, fn func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool), respFn func(err string)) error {
  114. return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  115. stream, err := client.StreamDeleteEntries(context.Background())
  116. if err != nil {
  117. glog.V(0).Infof("stream delete entry: %v", err)
  118. return fmt.Errorf("stream delete entry: %v", err)
  119. }
  120. waitc := make(chan struct{})
  121. go func() {
  122. for {
  123. resp, err := stream.Recv()
  124. if err == io.EOF {
  125. // read done.
  126. close(waitc)
  127. return
  128. }
  129. if err != nil {
  130. glog.V(0).Infof("streamRemove: %v", err)
  131. return
  132. }
  133. respFn(resp.Error)
  134. }
  135. }()
  136. for {
  137. finished, parentDirectoryPath, entryName, isDeleteData, isRecursive := fn()
  138. if finished {
  139. break
  140. }
  141. err = stream.Send(&filer_pb.DeleteEntryRequest{
  142. Directory: parentDirectoryPath,
  143. Name: entryName,
  144. IsDeleteData: isDeleteData,
  145. IsRecursive: isRecursive,
  146. IgnoreRecursiveError: quiet,
  147. })
  148. if err != nil {
  149. glog.V(0).Infof("streamRemove: %v", err)
  150. break
  151. }
  152. }
  153. stream.CloseSend()
  154. <-waitc
  155. return err
  156. })
  157. }
  158. func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  159. err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  160. request := &filer_pb.LookupDirectoryEntryRequest{
  161. Directory: parentDirectoryPath,
  162. Name: entryName,
  163. }
  164. glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  165. resp, err := filer_pb.LookupEntry(client, request)
  166. if err != nil {
  167. if err == filer2.ErrNotFound {
  168. exists = false
  169. return nil
  170. }
  171. glog.V(0).Infof("exists entry %v: %v", request, err)
  172. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  173. }
  174. exists = resp.Entry.IsDirectory == isDirectory
  175. return nil
  176. })
  177. return
  178. }
  179. func objectKey(key *string) *string {
  180. if strings.HasPrefix(*key, "/") {
  181. t := (*key)[1:]
  182. return &t
  183. }
  184. return key
  185. }