You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
4.6 KiB

7 years ago
7 years ago
  1. package filesys
  2. import (
  3. "sync"
  4. "fmt"
  5. "bytes"
  6. "io"
  7. "time"
  8. "context"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. )
  13. type DirtyPage struct {
  14. Offset int64
  15. Data []byte
  16. }
  17. type ContinuousDirtyPages struct {
  18. sync.Mutex
  19. pages []*DirtyPage
  20. f *File
  21. }
  22. func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunk *filer_pb.FileChunk, err error) {
  23. pages.Lock()
  24. defer pages.Unlock()
  25. isPerfectOverwrite := false
  26. isPerfectAppend := false
  27. if len(pages.pages) > 0 {
  28. lastPage := pages.pages[len(pages.pages)-1]
  29. if lastPage.Offset+int64(len(lastPage.Data)) == offset {
  30. // write continuous pages
  31. glog.V(4).Infof("%s/%s append [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  32. isPerfectAppend = true
  33. }
  34. if pages.pages[0].Offset == offset && pages.totalSize() == int64(len(data)) {
  35. glog.V(4).Infof("%s/%s overwrite [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  36. isPerfectOverwrite = true
  37. }
  38. } else {
  39. glog.V(4).Infof("%s/%s append [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  40. isPerfectAppend = true
  41. }
  42. isPerfectReplace := false
  43. for _, page := range pages.pages {
  44. if page.Offset == offset && len(page.Data) == len(data) {
  45. // perfect replace
  46. glog.V(4).Infof("%s/%s replace [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  47. page.Data = data
  48. isPerfectReplace = true
  49. }
  50. }
  51. if isPerfectReplace {
  52. return nil, nil
  53. }
  54. if isPerfectAppend || isPerfectOverwrite {
  55. if isPerfectAppend {
  56. glog.V(4).Infof("%s/%s append2 [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  57. pages.pages = append(pages.pages, &DirtyPage{
  58. Offset: offset,
  59. Data: data,
  60. })
  61. }
  62. if isPerfectOverwrite {
  63. glog.V(4).Infof("%s/%s overwrite2 [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  64. pages.pages = []*DirtyPage{&DirtyPage{
  65. Offset: offset,
  66. Data: data,
  67. }}
  68. }
  69. if pages.f.wfs.chunkSizeLimit > 0 && pages.totalSize() >= pages.f.wfs.chunkSizeLimit {
  70. chunk, err = pages.saveToStorage(ctx)
  71. pages.pages = nil
  72. glog.V(4).Infof("%s/%s over size limit [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  73. }
  74. return
  75. }
  76. chunk, err = pages.saveToStorage(ctx)
  77. glog.V(4).Infof("%s/%s saved [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  78. pages.pages = []*DirtyPage{&DirtyPage{
  79. Offset: offset,
  80. Data: data,
  81. }}
  82. return
  83. }
  84. func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
  85. pages.Lock()
  86. defer pages.Unlock()
  87. if chunk, err = pages.saveToStorage(ctx); err == nil {
  88. pages.pages = nil
  89. if chunk != nil {
  90. glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  91. }
  92. }
  93. return
  94. }
  95. func (pages *ContinuousDirtyPages) totalSize() (total int64) {
  96. for _, page := range pages.pages {
  97. total += int64(len(page.Data))
  98. }
  99. return
  100. }
  101. func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
  102. if len(pages.pages) == 0 {
  103. return nil, nil
  104. }
  105. var fileId, host string
  106. if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  107. request := &filer_pb.AssignVolumeRequest{
  108. Count: 1,
  109. Replication: pages.f.wfs.replication,
  110. Collection: pages.f.wfs.collection,
  111. }
  112. resp, err := client.AssignVolume(ctx, request)
  113. if err != nil {
  114. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  115. return err
  116. }
  117. fileId, host = resp.FileId, resp.Url
  118. return nil
  119. }); err != nil {
  120. return nil, fmt.Errorf("filer assign volume: %v", err)
  121. }
  122. var readers []io.Reader
  123. for _, page := range pages.pages {
  124. readers = append(readers, bytes.NewReader(page.Data))
  125. }
  126. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  127. bufReader := io.MultiReader(readers...)
  128. uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
  129. if err != nil {
  130. glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
  131. return nil, fmt.Errorf("upload data: %v", err)
  132. }
  133. if uploadResult.Error != "" {
  134. glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
  135. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  136. }
  137. return &filer_pb.FileChunk{
  138. FileId: fileId,
  139. Offset: pages.pages[0].Offset,
  140. Size: uint64(pages.totalSize()),
  141. Mtime: time.Now().UnixNano(),
  142. }, nil
  143. }