You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
2.7 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package mount
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "golang.org/x/exp/slices"
  8. "golang.org/x/sync/semaphore"
  9. "math"
  10. )
  11. type FileHandleId uint64
  12. type FileHandle struct {
  13. fh FileHandleId
  14. counter int64
  15. entry *LockedEntry
  16. inode uint64
  17. wfs *WFS
  18. // cache file has been written to
  19. dirtyMetadata bool
  20. dirtyPages *PageWriter
  21. entryViewCache []filer.VisibleInterval
  22. reader *filer.ChunkReadAt
  23. contentType string
  24. handle uint64
  25. orderedMutex *semaphore.Weighted
  26. isDeleted bool
  27. }
  28. func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
  29. fh := &FileHandle{
  30. fh: handleId,
  31. counter: 1,
  32. inode: inode,
  33. wfs: wfs,
  34. orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
  35. }
  36. // dirtyPages: newContinuousDirtyPages(file, writeOnly),
  37. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  38. if entry != nil {
  39. entry.Attributes.FileSize = filer.FileSize(entry)
  40. }
  41. fh.entry = &LockedEntry{
  42. Entry: entry,
  43. }
  44. return fh
  45. }
  46. func (fh *FileHandle) FullPath() util.FullPath {
  47. fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
  48. return fp
  49. }
  50. func (fh *FileHandle) GetEntry() *filer_pb.Entry {
  51. return fh.entry.GetEntry()
  52. }
  53. func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
  54. fh.entry.SetEntry(entry)
  55. }
  56. func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
  57. return fh.entry.UpdateEntry(fn)
  58. }
  59. func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
  60. if fh.entry == nil {
  61. return
  62. }
  63. // find the earliest incoming chunk
  64. newChunks := chunks
  65. earliestChunk := newChunks[0]
  66. for i := 1; i < len(newChunks); i++ {
  67. if lessThan(earliestChunk, newChunks[i]) {
  68. earliestChunk = newChunks[i]
  69. }
  70. }
  71. // pick out-of-order chunks from existing chunks
  72. for _, chunk := range fh.entry.GetChunks() {
  73. if lessThan(earliestChunk, chunk) {
  74. chunks = append(chunks, chunk)
  75. }
  76. }
  77. // sort incoming chunks
  78. slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
  79. return lessThan(a, b)
  80. })
  81. glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))
  82. fh.entry.AppendChunks(newChunks)
  83. fh.entryViewCache = nil
  84. }
  85. func (fh *FileHandle) CloseReader() {
  86. if fh.reader != nil {
  87. _ = fh.reader.Close()
  88. fh.reader = nil
  89. }
  90. }
  91. func (fh *FileHandle) Release() {
  92. fh.dirtyPages.Destroy()
  93. fh.CloseReader()
  94. }
  95. func lessThan(a, b *filer_pb.FileChunk) bool {
  96. if a.ModifiedTsNs == b.ModifiedTsNs {
  97. return a.Fid.FileKey < b.Fid.FileKey
  98. }
  99. return a.ModifiedTsNs < b.ModifiedTsNs
  100. }