You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

237 lines
5.7 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. OS_UID = uint32(os.Getuid())
  15. OS_GID = uint32(os.Getgid())
  16. )
  17. type FilerClient interface {
  18. WithFilerClient(fn func(SeaweedFilerClient) error) error
  19. AdjustedUrl(hostAndPort string) string
  20. }
  21. func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
  22. dir, name := fullFilePath.DirAndName()
  23. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  24. request := &LookupDirectoryEntryRequest{
  25. Directory: dir,
  26. Name: name,
  27. }
  28. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  29. resp, err := LookupEntry(client, request)
  30. if err != nil {
  31. if err == ErrNotFound {
  32. return nil
  33. }
  34. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  35. return err
  36. }
  37. if resp.Entry == nil {
  38. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  39. return nil
  40. }
  41. entry = resp.Entry
  42. return nil
  43. })
  44. return
  45. }
  46. type EachEntryFunciton func(entry *Entry, isLast bool) error
  47. func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
  48. return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
  49. }
  50. func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  51. return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  52. }
  53. func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  54. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  55. request := &ListEntriesRequest{
  56. Directory: string(fullDirPath),
  57. Prefix: prefix,
  58. StartFromFileName: startFrom,
  59. Limit: limit,
  60. InclusiveStartFrom: inclusive,
  61. }
  62. glog.V(3).Infof("read directory: %v", request)
  63. ctx, cancel := context.WithCancel(context.Background())
  64. stream, err := client.ListEntries(ctx, request)
  65. if err != nil {
  66. return fmt.Errorf("list %s: %v", fullDirPath, err)
  67. }
  68. defer cancel()
  69. var prevEntry *Entry
  70. for {
  71. resp, recvErr := stream.Recv()
  72. if recvErr != nil {
  73. if recvErr == io.EOF {
  74. if prevEntry != nil {
  75. if err := fn(prevEntry, true); err != nil {
  76. return err
  77. }
  78. }
  79. break
  80. } else {
  81. return recvErr
  82. }
  83. }
  84. if prevEntry != nil {
  85. if err := fn(prevEntry, false); err != nil {
  86. return err
  87. }
  88. }
  89. prevEntry = resp.Entry
  90. }
  91. return nil
  92. })
  93. return
  94. }
  95. func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  96. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  97. request := &LookupDirectoryEntryRequest{
  98. Directory: parentDirectoryPath,
  99. Name: entryName,
  100. }
  101. glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  102. resp, err := LookupEntry(client, request)
  103. if err != nil {
  104. if err == ErrNotFound {
  105. exists = false
  106. return nil
  107. }
  108. glog.V(0).Infof("exists entry %v: %v", request, err)
  109. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  110. }
  111. exists = resp.Entry.IsDirectory == isDirectory
  112. return nil
  113. })
  114. return
  115. }
  116. func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  117. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  118. entry := &Entry{
  119. Name: dirName,
  120. IsDirectory: true,
  121. Attributes: &FuseAttributes{
  122. Mtime: time.Now().Unix(),
  123. Crtime: time.Now().Unix(),
  124. FileMode: uint32(0777 | os.ModeDir),
  125. Uid: OS_UID,
  126. Gid: OS_GID,
  127. },
  128. }
  129. if fn != nil {
  130. fn(entry)
  131. }
  132. request := &CreateEntryRequest{
  133. Directory: parentDirectoryPath,
  134. Entry: entry,
  135. }
  136. glog.V(1).Infof("mkdir: %v", request)
  137. if err := CreateEntry(client, request); err != nil {
  138. glog.V(0).Infof("mkdir %v: %v", request, err)
  139. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  140. }
  141. return nil
  142. })
  143. }
  144. func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
  145. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  146. entry := &Entry{
  147. Name: fileName,
  148. IsDirectory: false,
  149. Attributes: &FuseAttributes{
  150. Mtime: time.Now().Unix(),
  151. Crtime: time.Now().Unix(),
  152. FileMode: uint32(0770),
  153. Uid: OS_UID,
  154. Gid: OS_GID,
  155. },
  156. Chunks: chunks,
  157. }
  158. request := &CreateEntryRequest{
  159. Directory: parentDirectoryPath,
  160. Entry: entry,
  161. }
  162. glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
  163. if err := CreateEntry(client, request); err != nil {
  164. glog.V(0).Infof("create file %v:%v", request, err)
  165. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  166. }
  167. return nil
  168. })
  169. }
  170. func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool) error {
  171. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  172. if resp, err := client.DeleteEntry(context.Background(), &DeleteEntryRequest{
  173. Directory: parentDirectoryPath,
  174. Name: name,
  175. IsDeleteData: isDeleteData,
  176. IsRecursive: isRecursive,
  177. IgnoreRecursiveError: ignoreRecursiveErr,
  178. IsFromOtherCluster: isFromOtherCluster,
  179. }); err != nil {
  180. return err
  181. } else {
  182. if resp.Error != "" {
  183. return errors.New(resp.Error)
  184. }
  185. }
  186. return nil
  187. })
  188. }