You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
6.0 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. var (
  15. OS_UID = uint32(os.Getuid())
  16. OS_GID = uint32(os.Getgid())
  17. )
  18. type FilerClient interface {
  19. WithFilerClient(fn func(SeaweedFilerClient) error) error
  20. AdjustedUrl(hostAndPort string) string
  21. }
  22. func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
  23. dir, name := fullFilePath.DirAndName()
  24. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  25. request := &LookupDirectoryEntryRequest{
  26. Directory: dir,
  27. Name: name,
  28. }
  29. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  30. resp, err := LookupEntry(client, request)
  31. if err != nil {
  32. if err == ErrNotFound {
  33. return nil
  34. }
  35. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  36. return err
  37. }
  38. if resp.Entry == nil {
  39. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  40. return nil
  41. }
  42. entry = resp.Entry
  43. return nil
  44. })
  45. return
  46. }
  47. type EachEntryFunciton func(entry *Entry, isLast bool) error
  48. func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
  49. return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
  50. }
  51. func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  52. return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  53. }
  54. func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  55. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  56. request := &ListEntriesRequest{
  57. Directory: string(fullDirPath),
  58. Prefix: prefix,
  59. StartFromFileName: startFrom,
  60. Limit: limit,
  61. InclusiveStartFrom: inclusive,
  62. }
  63. glog.V(4).Infof("read directory: %v", request)
  64. ctx, cancel := context.WithCancel(context.Background())
  65. stream, err := client.ListEntries(ctx, request)
  66. if err != nil {
  67. return fmt.Errorf("list %s: %v", fullDirPath, err)
  68. }
  69. defer cancel()
  70. var prevEntry *Entry
  71. for {
  72. resp, recvErr := stream.Recv()
  73. if recvErr != nil {
  74. if recvErr == io.EOF {
  75. if prevEntry != nil {
  76. if err := fn(prevEntry, true); err != nil {
  77. return err
  78. }
  79. }
  80. break
  81. } else {
  82. return recvErr
  83. }
  84. }
  85. if prevEntry != nil {
  86. if err := fn(prevEntry, false); err != nil {
  87. return err
  88. }
  89. }
  90. prevEntry = resp.Entry
  91. }
  92. return nil
  93. })
  94. return
  95. }
  96. func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  97. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  98. request := &LookupDirectoryEntryRequest{
  99. Directory: parentDirectoryPath,
  100. Name: entryName,
  101. }
  102. glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  103. resp, err := LookupEntry(client, request)
  104. if err != nil {
  105. if err == ErrNotFound {
  106. exists = false
  107. return nil
  108. }
  109. glog.V(0).Infof("exists entry %v: %v", request, err)
  110. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  111. }
  112. exists = resp.Entry.IsDirectory == isDirectory
  113. return nil
  114. })
  115. return
  116. }
  117. func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  118. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  119. entry := &Entry{
  120. Name: dirName,
  121. IsDirectory: true,
  122. Attributes: &FuseAttributes{
  123. Mtime: time.Now().Unix(),
  124. Crtime: time.Now().Unix(),
  125. FileMode: uint32(0777 | os.ModeDir),
  126. Uid: OS_UID,
  127. Gid: OS_GID,
  128. },
  129. }
  130. if fn != nil {
  131. fn(entry)
  132. }
  133. request := &CreateEntryRequest{
  134. Directory: parentDirectoryPath,
  135. Entry: entry,
  136. }
  137. glog.V(1).Infof("mkdir: %v", request)
  138. if err := CreateEntry(client, request); err != nil {
  139. glog.V(0).Infof("mkdir %v: %v", request, err)
  140. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  141. }
  142. return nil
  143. })
  144. }
  145. func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
  146. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  147. entry := &Entry{
  148. Name: fileName,
  149. IsDirectory: false,
  150. Attributes: &FuseAttributes{
  151. Mtime: time.Now().Unix(),
  152. Crtime: time.Now().Unix(),
  153. FileMode: uint32(0770),
  154. Uid: OS_UID,
  155. Gid: OS_GID,
  156. },
  157. Chunks: chunks,
  158. }
  159. request := &CreateEntryRequest{
  160. Directory: parentDirectoryPath,
  161. Entry: entry,
  162. }
  163. glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
  164. if err := CreateEntry(client, request); err != nil {
  165. glog.V(0).Infof("create file %v:%v", request, err)
  166. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  167. }
  168. return nil
  169. })
  170. }
  171. func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signature int32) error {
  172. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  173. deleteEntryRequest := &DeleteEntryRequest{
  174. Directory: parentDirectoryPath,
  175. Name: name,
  176. IsDeleteData: isDeleteData,
  177. IsRecursive: isRecursive,
  178. IgnoreRecursiveError: ignoreRecursiveErr,
  179. IsFromOtherCluster: isFromOtherCluster,
  180. }
  181. if signature != 0 {
  182. deleteEntryRequest.Signatures = []int32{signature}
  183. }
  184. if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil {
  185. if strings.Contains(err.Error(), ErrNotFound.Error()) {
  186. return nil
  187. }
  188. return err
  189. } else {
  190. if resp.Error != "" {
  191. if strings.Contains(resp.Error, ErrNotFound.Error()) {
  192. return nil
  193. }
  194. return errors.New(resp.Error)
  195. }
  196. }
  197. return nil
  198. })
  199. }