You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

128 lines
2.7 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package mount
  2. import (
  3. "sync"
  4. "golang.org/x/exp/slices"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. type FileHandleId uint64
  11. type FileHandle struct {
  12. fh FileHandleId
  13. counter int64
  14. entry *filer_pb.Entry
  15. entryLock sync.Mutex
  16. inode uint64
  17. wfs *WFS
  18. // cache file has been written to
  19. dirtyMetadata bool
  20. dirtyPages *PageWriter
  21. entryViewCache []filer.VisibleInterval
  22. reader *filer.ChunkReadAt
  23. contentType string
  24. handle uint64
  25. sync.Mutex
  26. isDeleted bool
  27. }
  28. func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
  29. fh := &FileHandle{
  30. fh: handleId,
  31. counter: 1,
  32. inode: inode,
  33. wfs: wfs,
  34. }
  35. // dirtyPages: newContinuousDirtyPages(file, writeOnly),
  36. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  37. if entry != nil {
  38. entry.Attributes.FileSize = filer.FileSize(entry)
  39. }
  40. return fh
  41. }
  42. func (fh *FileHandle) FullPath() util.FullPath {
  43. fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
  44. return fp
  45. }
  46. func (fh *FileHandle) GetEntry() *filer_pb.Entry {
  47. fh.entryLock.Lock()
  48. defer fh.entryLock.Unlock()
  49. return fh.entry
  50. }
  51. func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
  52. fh.entryLock.Lock()
  53. defer fh.entryLock.Unlock()
  54. fh.entry = entry
  55. }
  56. func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
  57. fh.entryLock.Lock()
  58. defer fh.entryLock.Unlock()
  59. fn(fh.entry)
  60. return fh.entry
  61. }
  62. func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
  63. fh.entryLock.Lock()
  64. defer fh.entryLock.Unlock()
  65. // find the earliest incoming chunk
  66. newChunks := chunks
  67. earliestChunk := newChunks[0]
  68. for i := 1; i < len(newChunks); i++ {
  69. if lessThan(earliestChunk, newChunks[i]) {
  70. earliestChunk = newChunks[i]
  71. }
  72. }
  73. if fh.entry == nil {
  74. return
  75. }
  76. // pick out-of-order chunks from existing chunks
  77. for _, chunk := range fh.entry.Chunks {
  78. if lessThan(earliestChunk, chunk) {
  79. chunks = append(chunks, chunk)
  80. }
  81. }
  82. // sort incoming chunks
  83. slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
  84. return lessThan(a, b)
  85. })
  86. glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks))
  87. fh.entry.Chunks = append(fh.entry.Chunks, newChunks...)
  88. fh.entryViewCache = nil
  89. }
  90. func (fh *FileHandle) CloseReader() {
  91. if fh.reader != nil {
  92. _ = fh.reader.Close()
  93. fh.reader = nil
  94. }
  95. }
  96. func (fh *FileHandle) Release() {
  97. fh.dirtyPages.Destroy()
  98. fh.CloseReader()
  99. }
  100. func lessThan(a, b *filer_pb.FileChunk) bool {
  101. if a.Mtime == b.Mtime {
  102. return a.Fid.FileKey < b.Fid.FileKey
  103. }
  104. return a.Mtime < b.Mtime
  105. }