You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

234 lines
5.6 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. OS_UID = uint32(os.Getuid())
  15. OS_GID = uint32(os.Getgid())
  16. )
  17. type FilerClient interface {
  18. WithFilerClient(fn func(SeaweedFilerClient) error) error
  19. AdjustedUrl(hostAndPort string) string
  20. }
  21. func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
  22. dir, name := fullFilePath.DirAndName()
  23. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  24. request := &LookupDirectoryEntryRequest{
  25. Directory: dir,
  26. Name: name,
  27. }
  28. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  29. resp, err := LookupEntry(client, request)
  30. if err != nil {
  31. if err == ErrNotFound {
  32. return nil
  33. }
  34. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  35. return err
  36. }
  37. if resp.Entry == nil {
  38. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  39. return nil
  40. }
  41. entry = resp.Entry
  42. return nil
  43. })
  44. return
  45. }
  46. type EachEntryFunciton func(entry *Entry, isLast bool) error
  47. func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
  48. return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
  49. }
  50. func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  51. return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  52. }
  53. func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  54. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  55. request := &ListEntriesRequest{
  56. Directory: string(fullDirPath),
  57. Prefix: prefix,
  58. StartFromFileName: startFrom,
  59. Limit: limit,
  60. InclusiveStartFrom: inclusive,
  61. }
  62. glog.V(3).Infof("read directory: %v", request)
  63. stream, err := client.ListEntries(context.Background(), request)
  64. if err != nil {
  65. return fmt.Errorf("list %s: %v", fullDirPath, err)
  66. }
  67. var prevEntry *Entry
  68. for {
  69. resp, recvErr := stream.Recv()
  70. if recvErr != nil {
  71. if recvErr == io.EOF {
  72. if prevEntry != nil {
  73. if err := fn(prevEntry, true); err != nil {
  74. return err
  75. }
  76. }
  77. break
  78. } else {
  79. return recvErr
  80. }
  81. }
  82. if prevEntry != nil {
  83. if err := fn(prevEntry, false); err != nil {
  84. return err
  85. }
  86. }
  87. prevEntry = resp.Entry
  88. }
  89. return nil
  90. })
  91. return
  92. }
  93. func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  94. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  95. request := &LookupDirectoryEntryRequest{
  96. Directory: parentDirectoryPath,
  97. Name: entryName,
  98. }
  99. glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  100. resp, err := LookupEntry(client, request)
  101. if err != nil {
  102. if err == ErrNotFound {
  103. exists = false
  104. return nil
  105. }
  106. glog.V(0).Infof("exists entry %v: %v", request, err)
  107. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  108. }
  109. exists = resp.Entry.IsDirectory == isDirectory
  110. return nil
  111. })
  112. return
  113. }
  114. func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  115. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  116. entry := &Entry{
  117. Name: dirName,
  118. IsDirectory: true,
  119. Attributes: &FuseAttributes{
  120. Mtime: time.Now().Unix(),
  121. Crtime: time.Now().Unix(),
  122. FileMode: uint32(0777 | os.ModeDir),
  123. Uid: OS_UID,
  124. Gid: OS_GID,
  125. },
  126. }
  127. if fn != nil {
  128. fn(entry)
  129. }
  130. request := &CreateEntryRequest{
  131. Directory: parentDirectoryPath,
  132. Entry: entry,
  133. }
  134. glog.V(1).Infof("mkdir: %v", request)
  135. if err := CreateEntry(client, request); err != nil {
  136. glog.V(0).Infof("mkdir %v: %v", request, err)
  137. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  138. }
  139. return nil
  140. })
  141. }
  142. func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
  143. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  144. entry := &Entry{
  145. Name: fileName,
  146. IsDirectory: false,
  147. Attributes: &FuseAttributes{
  148. Mtime: time.Now().Unix(),
  149. Crtime: time.Now().Unix(),
  150. FileMode: uint32(0770),
  151. Uid: OS_UID,
  152. Gid: OS_GID,
  153. },
  154. Chunks: chunks,
  155. }
  156. request := &CreateEntryRequest{
  157. Directory: parentDirectoryPath,
  158. Entry: entry,
  159. }
  160. glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
  161. if err := CreateEntry(client, request); err != nil {
  162. glog.V(0).Infof("create file %v:%v", request, err)
  163. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  164. }
  165. return nil
  166. })
  167. }
  168. func Remove(filerClient FilerClient, parentDirectoryPath string, name string, isDeleteData, isRecursive, ignoreRecursiveErr bool) error {
  169. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  170. if resp, err := client.DeleteEntry(context.Background(), &DeleteEntryRequest{
  171. Directory: parentDirectoryPath,
  172. Name: name,
  173. IsDeleteData: isDeleteData,
  174. IsRecursive: isRecursive,
  175. IgnoreRecursiveError: ignoreRecursiveErr,
  176. }); err != nil {
  177. return err
  178. } else {
  179. if resp.Error != "" {
  180. return errors.New(resp.Error)
  181. }
  182. }
  183. return nil
  184. })
  185. }