You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
2.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package mount
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "golang.org/x/sync/semaphore"
  8. "math"
  9. "os"
  10. "sync"
  11. )
  12. type FileHandleId uint64
  13. var IsDebugFileReadWrite = false
  14. type FileHandle struct {
  15. fh FileHandleId
  16. counter int64
  17. entry *LockedEntry
  18. entryLock sync.Mutex
  19. entryChunkGroup *filer.ChunkGroup
  20. inode uint64
  21. wfs *WFS
  22. // cache file has been written to
  23. dirtyMetadata bool
  24. dirtyPages *PageWriter
  25. reader *filer.ChunkReadAt
  26. contentType string
  27. handle uint64
  28. orderedMutex *semaphore.Weighted
  29. isDeleted bool
  30. // for debugging
  31. mirrorFile *os.File
  32. }
  33. func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
  34. fh := &FileHandle{
  35. fh: handleId,
  36. counter: 1,
  37. inode: inode,
  38. wfs: wfs,
  39. orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
  40. }
  41. // dirtyPages: newContinuousDirtyPages(file, writeOnly),
  42. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  43. fh.entry = &LockedEntry{
  44. Entry: entry,
  45. }
  46. if entry != nil {
  47. fh.SetEntry(entry)
  48. }
  49. if IsDebugFileReadWrite {
  50. var err error
  51. fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
  52. if err != nil {
  53. println("failed to create mirror:", err.Error())
  54. }
  55. }
  56. return fh
  57. }
  58. func (fh *FileHandle) FullPath() util.FullPath {
  59. fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
  60. return fp
  61. }
  62. func (fh *FileHandle) GetEntry() *filer_pb.Entry {
  63. return fh.entry.GetEntry()
  64. }
  65. func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
  66. if entry != nil {
  67. fileSize := filer.FileSize(entry)
  68. entry.Attributes.FileSize = fileSize
  69. var resolveManifestErr error
  70. fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
  71. if resolveManifestErr != nil {
  72. glog.Warningf("failed to resolve manifest chunks in %+v", entry)
  73. }
  74. } else {
  75. glog.Fatalf("setting file handle entry to nil")
  76. }
  77. fh.entry.SetEntry(entry)
  78. }
  79. func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
  80. return fh.entry.UpdateEntry(fn)
  81. }
  82. func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
  83. fh.entryLock.Lock()
  84. defer fh.entryLock.Unlock()
  85. if fh.entry == nil {
  86. return
  87. }
  88. fh.entry.AppendChunks(chunks)
  89. }
  90. func (fh *FileHandle) Release() {
  91. fh.entryLock.Lock()
  92. defer fh.entryLock.Unlock()
  93. glog.V(4).Infof("Release %s fh %d", fh.entry.Name, fh.handle)
  94. fh.dirtyPages.Destroy()
  95. if IsDebugFileReadWrite {
  96. fh.mirrorFile.Close()
  97. }
  98. }
  99. func lessThan(a, b *filer_pb.FileChunk) bool {
  100. if a.ModifiedTsNs == b.ModifiedTsNs {
  101. return a.Fid.FileKey < b.Fid.FileKey
  102. }
  103. return a.ModifiedTsNs < b.ModifiedTsNs
  104. }