310 lines
8.4 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. )
  14. var (
  15. OS_UID = uint32(os.Getuid())
  16. OS_GID = uint32(os.Getgid())
  17. )
  18. type FilerClient interface {
  19. WithFilerClient(streamingMode bool, fn func(SeaweedFilerClient) error) error
  20. AdjustedUrl(location *Location) string
  21. GetDataCenter() string
  22. }
  23. func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
  24. dir, name := fullFilePath.DirAndName()
  25. err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  26. request := &LookupDirectoryEntryRequest{
  27. Directory: dir,
  28. Name: name,
  29. }
  30. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  31. resp, err := LookupEntry(client, request)
  32. if err != nil {
  33. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  34. return err
  35. }
  36. if resp.Entry == nil {
  37. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  38. return nil
  39. }
  40. entry = resp.Entry
  41. return nil
  42. })
  43. return
  44. }
  45. type EachEntryFunction func(entry *Entry, isLast bool) error
  46. func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction) (err error) {
  47. var counter uint32
  48. var startFrom string
  49. var counterFunc = func(entry *Entry, isLast bool) error {
  50. counter++
  51. startFrom = entry.Name
  52. return fn(entry, isLast)
  53. }
  54. var paginationLimit uint32 = 10000
  55. if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
  56. return err
  57. }
  58. for counter == paginationLimit {
  59. counter = 0
  60. if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
  61. return err
  62. }
  63. }
  64. return nil
  65. }
  66. func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
  67. return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  68. return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  69. })
  70. }
  71. func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
  72. return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  73. return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit)
  74. })
  75. }
  76. func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
  77. return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  78. }
  79. func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
  80. // Redundancy limit to make it correctly judge whether it is the last file.
  81. redLimit := limit
  82. if limit < math.MaxInt32 && limit != 0 {
  83. redLimit = limit + 1
  84. }
  85. if redLimit > math.MaxInt32 {
  86. redLimit = math.MaxInt32
  87. }
  88. request := &ListEntriesRequest{
  89. Directory: string(fullDirPath),
  90. Prefix: prefix,
  91. StartFromFileName: startFrom,
  92. Limit: redLimit,
  93. InclusiveStartFrom: inclusive,
  94. }
  95. glog.V(4).Infof("read directory: %v", request)
  96. ctx, cancel := context.WithCancel(context.Background())
  97. defer cancel()
  98. stream, err := client.ListEntries(ctx, request)
  99. if err != nil {
  100. return fmt.Errorf("list %s: %v", fullDirPath, err)
  101. }
  102. var prevEntry *Entry
  103. count := 0
  104. for {
  105. resp, recvErr := stream.Recv()
  106. if recvErr != nil {
  107. if recvErr == io.EOF {
  108. if prevEntry != nil {
  109. if err := fn(prevEntry, true); err != nil {
  110. return err
  111. }
  112. }
  113. break
  114. } else {
  115. return recvErr
  116. }
  117. }
  118. if prevEntry != nil {
  119. if err := fn(prevEntry, false); err != nil {
  120. return err
  121. }
  122. }
  123. prevEntry = resp.Entry
  124. count++
  125. if count > int(limit) && limit != 0 {
  126. prevEntry = nil
  127. }
  128. }
  129. return nil
  130. }
  131. func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  132. err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  133. request := &LookupDirectoryEntryRequest{
  134. Directory: parentDirectoryPath,
  135. Name: entryName,
  136. }
  137. glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  138. resp, err := LookupEntry(client, request)
  139. if err != nil {
  140. if err == ErrNotFound {
  141. exists = false
  142. return nil
  143. }
  144. glog.V(0).Infof("exists entry %v: %v", request, err)
  145. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  146. }
  147. exists = resp.Entry.IsDirectory == isDirectory
  148. return nil
  149. })
  150. return
  151. }
  152. func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string, entry *Entry) (err error) {
  153. return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  154. request := &UpdateEntryRequest{
  155. Directory: parentDirectoryPath,
  156. Entry: entry,
  157. }
  158. glog.V(4).Infof("touch entry %v/%v: %v", parentDirectoryPath, entryName, request)
  159. if err := UpdateEntry(client, request); err != nil {
  160. glog.V(0).Infof("touch exists entry %v: %v", request, err)
  161. return fmt.Errorf("touch exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  162. }
  163. return nil
  164. })
  165. }
  166. func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  167. return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  168. return DoMkdir(client, parentDirectoryPath, dirName, fn)
  169. })
  170. }
  171. func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  172. entry := &Entry{
  173. Name: dirName,
  174. IsDirectory: true,
  175. Attributes: &FuseAttributes{
  176. Mtime: time.Now().Unix(),
  177. Crtime: time.Now().Unix(),
  178. FileMode: uint32(0777 | os.ModeDir),
  179. Uid: OS_UID,
  180. Gid: OS_GID,
  181. },
  182. }
  183. if fn != nil {
  184. fn(entry)
  185. }
  186. request := &CreateEntryRequest{
  187. Directory: parentDirectoryPath,
  188. Entry: entry,
  189. }
  190. glog.V(1).Infof("mkdir: %v", request)
  191. if err := CreateEntry(client, request); err != nil {
  192. glog.V(0).Infof("mkdir %v: %v", request, err)
  193. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  194. }
  195. return nil
  196. }
  197. func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error {
  198. return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  199. entry := &Entry{
  200. Name: fileName,
  201. IsDirectory: false,
  202. Attributes: &FuseAttributes{
  203. Mtime: time.Now().Unix(),
  204. Crtime: time.Now().Unix(),
  205. FileMode: uint32(0770),
  206. Uid: OS_UID,
  207. Gid: OS_GID,
  208. },
  209. Chunks: chunks,
  210. }
  211. if fn != nil {
  212. fn(entry)
  213. }
  214. request := &CreateEntryRequest{
  215. Directory: parentDirectoryPath,
  216. Entry: entry,
  217. }
  218. glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
  219. if err := CreateEntry(client, request); err != nil {
  220. glog.V(0).Infof("create file %v:%v", request, err)
  221. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  222. }
  223. return nil
  224. })
  225. }
  226. func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
  227. return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
  228. return DoRemove(client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures)
  229. })
  230. }
  231. func DoRemove(client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32) error {
  232. deleteEntryRequest := &DeleteEntryRequest{
  233. Directory: parentDirectoryPath,
  234. Name: name,
  235. IsDeleteData: isDeleteData,
  236. IsRecursive: isRecursive,
  237. IgnoreRecursiveError: ignoreRecursiveErr,
  238. IsFromOtherCluster: isFromOtherCluster,
  239. Signatures: signatures,
  240. }
  241. if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil {
  242. if strings.Contains(err.Error(), ErrNotFound.Error()) {
  243. return nil
  244. }
  245. return err
  246. } else {
  247. if resp.Error != "" {
  248. if strings.Contains(resp.Error, ErrNotFound.Error()) {
  249. return nil
  250. }
  251. return errors.New(resp.Error)
  252. }
  253. }
  254. return nil
  255. }