You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
4.3 KiB

  1. package filesys
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "io"
  7. "os"
  8. "path/filepath"
  9. "sync"
  10. "time"
  11. )
  12. type TempFileDirtyPages struct {
  13. f *File
  14. tf *os.File
  15. writtenIntervals *WrittenContinuousIntervals
  16. writeOnly bool
  17. writeWaitGroup sync.WaitGroup
  18. pageAddLock sync.Mutex
  19. chunkAddLock sync.Mutex
  20. lastErr error
  21. collection string
  22. replication string
  23. }
  24. var (
  25. tmpDir = filepath.Join(os.TempDir(), "sw")
  26. )
  27. func init() {
  28. os.Mkdir(tmpDir, 0755)
  29. }
  30. func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
  31. tempFile := &TempFileDirtyPages{
  32. f: file,
  33. writeOnly: writeOnly,
  34. writtenIntervals: &WrittenContinuousIntervals{},
  35. }
  36. return tempFile
  37. }
  38. func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
  39. pages.pageAddLock.Lock()
  40. defer pages.pageAddLock.Unlock()
  41. if pages.tf == nil {
  42. tf, err := os.CreateTemp(tmpDir, "")
  43. if err != nil {
  44. glog.Errorf("create temp file: %v", err)
  45. pages.lastErr = err
  46. return
  47. }
  48. pages.tf = tf
  49. pages.writtenIntervals.tempFile = tf
  50. pages.writtenIntervals.lastOffset = 0
  51. }
  52. writtenOffset := pages.writtenIntervals.lastOffset
  53. dataSize := int64(len(data))
  54. // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
  55. if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
  56. pages.lastErr = err
  57. } else {
  58. pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
  59. pages.writtenIntervals.lastOffset += dataSize
  60. }
  61. // pages.writtenIntervals.debug()
  62. return
  63. }
  64. func (pages *TempFileDirtyPages) FlushData() error {
  65. pages.saveExistingPagesToStorage()
  66. pages.writeWaitGroup.Wait()
  67. if pages.lastErr != nil {
  68. return fmt.Errorf("flush data: %v", pages.lastErr)
  69. }
  70. pages.pageAddLock.Lock()
  71. defer pages.pageAddLock.Unlock()
  72. if pages.tf != nil {
  73. pages.writtenIntervals.tempFile = nil
  74. pages.writtenIntervals.lists = nil
  75. pages.tf.Close()
  76. os.Remove(pages.tf.Name())
  77. pages.tf = nil
  78. }
  79. return nil
  80. }
  81. func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
  82. pageSize := pages.f.wfs.option.ChunkSizeLimit
  83. // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
  84. for _, list := range pages.writtenIntervals.lists {
  85. listStopOffset := list.Offset() + list.Size()
  86. for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
  87. start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
  88. if start >= stop {
  89. continue
  90. }
  91. // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
  92. pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
  93. }
  94. }
  95. }
  96. func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
  97. mtime := time.Now().UnixNano()
  98. pages.writeWaitGroup.Add(1)
  99. writer := func() {
  100. defer pages.writeWaitGroup.Done()
  101. reader = io.LimitReader(reader, size)
  102. chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
  103. if err != nil {
  104. glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
  105. pages.lastErr = err
  106. return
  107. }
  108. chunk.Mtime = mtime
  109. pages.collection, pages.replication = collection, replication
  110. pages.chunkAddLock.Lock()
  111. defer pages.chunkAddLock.Unlock()
  112. pages.f.addChunks([]*filer_pb.FileChunk{chunk})
  113. glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
  114. }
  115. if pages.f.wfs.concurrentWriters != nil {
  116. pages.f.wfs.concurrentWriters.Execute(writer)
  117. } else {
  118. go writer()
  119. }
  120. }
  121. func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
  122. return pages.writtenIntervals.ReadDataAt(data, startOffset)
  123. }
  124. func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
  125. return pages.collection, pages.replication
  126. }
  127. func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) {
  128. if pages.writeOnly {
  129. pages.writeOnly = writeOnly
  130. }
  131. }
  132. func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) {
  133. return pages.writeOnly
  134. }