You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
6.4 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. OS_UID = uint32(os.Getuid())
  15. OS_GID = uint32(os.Getgid())
  16. )
  17. type FilerClient interface {
  18. WithFilerClient(fn func(SeaweedFilerClient) error) error
  19. AdjustedUrl(location *Location) string
  20. }
  21. func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
  22. dir, name := fullFilePath.DirAndName()
  23. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  24. request := &LookupDirectoryEntryRequest{
  25. Directory: dir,
  26. Name: name,
  27. }
  28. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  29. resp, err := LookupEntry(client, request)
  30. if err != nil {
  31. if err == ErrNotFound {
  32. return nil
  33. }
  34. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  35. return err
  36. }
  37. if resp.Entry == nil {
  38. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  39. return nil
  40. }
  41. entry = resp.Entry
  42. return nil
  43. })
  44. return
  45. }
  46. type EachEntryFunciton func(entry *Entry, isLast bool) error
  47. func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
  48. var counter uint32
  49. var startFrom string
  50. var counterFunc = func(entry *Entry, isLast bool) error {
  51. counter++
  52. startFrom = entry.Name
  53. return fn(entry, isLast)
  54. }
  55. var paginationLimit uint32 = 10000
  56. if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
  57. return err
  58. }
  59. for counter == paginationLimit {
  60. counter = 0
  61. if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
  62. return err
  63. }
  64. }
  65. return nil
  66. }
  67. func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  68. return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  69. }
  70. func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  71. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  72. request := &ListEntriesRequest{
  73. Directory: string(fullDirPath),
  74. Prefix: prefix,
  75. StartFromFileName: startFrom,
  76. Limit: limit,
  77. InclusiveStartFrom: inclusive,
  78. }
  79. glog.V(4).Infof("read directory: %v", request)
  80. ctx, cancel := context.WithCancel(context.Background())
  81. defer cancel()
  82. stream, err := client.ListEntries(ctx, request)
  83. if err != nil {
  84. return fmt.Errorf("list %s: %v", fullDirPath, err)
  85. }
  86. var prevEntry *Entry
  87. for {
  88. resp, recvErr := stream.Recv()
  89. if recvErr != nil {
  90. if recvErr == io.EOF {
  91. if prevEntry != nil {
  92. if err := fn(prevEntry, true); err != nil {
  93. return err
  94. }
  95. }
  96. break
  97. } else {
  98. return recvErr
  99. }
  100. }
  101. if prevEntry != nil {
  102. if err := fn(prevEntry, false); err != nil {
  103. return err
  104. }
  105. }
  106. prevEntry = resp.Entry
  107. }
  108. return nil
  109. })
  110. return
  111. }
  112. func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  113. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  114. request := &LookupDirectoryEntryRequest{
  115. Directory: parentDirectoryPath,
  116. Name: entryName,
  117. }
  118. glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  119. resp, err := LookupEntry(client, request)
  120. if err != nil {
  121. if err == ErrNotFound {
  122. exists = false
  123. return nil
  124. }
  125. glog.V(0).Infof("exists entry %v: %v", request, err)
  126. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  127. }
  128. exists = resp.Entry.IsDirectory == isDirectory
  129. return nil
  130. })
  131. return
  132. }
  133. func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  134. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  135. entry := &Entry{
  136. Name: dirName,
  137. IsDirectory: true,
  138. Attributes: &FuseAttributes{
  139. Mtime: time.Now().Unix(),
  140. Crtime: time.Now().Unix(),
  141. FileMode: uint32(0777 | os.ModeDir),
  142. Uid: OS_UID,
  143. Gid: OS_GID,
  144. },
  145. }
  146. if fn != nil {
  147. fn(entry)
  148. }
  149. request := &CreateEntryRequest{
  150. Directory: parentDirectoryPath,
  151. Entry: entry,
  152. }
  153. glog.V(1).Infof("mkdir: %v", request)
  154. if err := CreateEntry(client, request); err != nil {
  155. glog.V(0).Infof("mkdir %v: %v", request, err)
  156. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  157. }
  158. return nil
  159. })
  160. }
  161. func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
  162. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  163. entry := &Entry{
  164. Name: fileName,
  165. IsDirectory: false,
  166. Attributes: &FuseAttributes{
  167. Mtime: time.Now().Unix(),
  168. Crtime: time.Now().Unix(),
  169. FileMode: uint32(0770),
  170. Uid: OS_UID,
  171. Gid: OS_GID,
  172. },
  173. Chunks: chunks,
  174. }
  175. request := &CreateEntryRequest{
  176. Directory: parentDirectoryPath,
  177. Entry: entry,
  178. }
  179. glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
  180. if err := CreateEntry(client, request); err != nil {
  181. glog.V(0).Infof("create file %v:%v", request, err)
  182. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  183. }
  184. return nil
  185. })
  186. }
  187. func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
  188. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  189. deleteEntryRequest := &DeleteEntryRequest{
  190. Directory: parentDirectoryPath,
  191. Name: name,
  192. IsDeleteData: isDeleteData,
  193. IsRecursive: isRecursive,
  194. IgnoreRecursiveError: ignoreRecursiveErr,
  195. IsFromOtherCluster: isFromOtherCluster,
  196. Signatures: signatures,
  197. }
  198. if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil {
  199. if strings.Contains(err.Error(), ErrNotFound.Error()) {
  200. return nil
  201. }
  202. return err
  203. } else {
  204. if resp.Error != "" {
  205. if strings.Contains(resp.Error, ErrNotFound.Error()) {
  206. return nil
  207. }
  208. return errors.New(resp.Error)
  209. }
  210. }
  211. return nil
  212. })
  213. }