You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
3.5 KiB

  1. package filesys
  2. import (
  3. "sync"
  4. "sort"
  5. "fmt"
  6. "bytes"
  7. "io"
  8. "time"
  9. "context"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. )
  14. type DirtyPage struct {
  15. Offset int64
  16. Data []byte
  17. }
  18. type ContinuousDirtyPages struct {
  19. sync.Mutex
  20. pages []*DirtyPage
  21. f *File
  22. }
  23. func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunk *filer_pb.FileChunk, err error) {
  24. pages.Lock()
  25. defer pages.Unlock()
  26. isPerfectAppend := len(pages.pages) == 0
  27. if len(pages.pages) > 0 {
  28. lastPage := pages.pages[len(pages.pages)-1]
  29. if lastPage.Offset+int64(len(lastPage.Data)) == offset {
  30. // write continuous pages
  31. glog.V(3).Infof("%s/%s append [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  32. isPerfectAppend = true
  33. }
  34. }
  35. isPerfectReplace := false
  36. for _, page := range pages.pages {
  37. if page.Offset == offset && len(page.Data) == len(data) {
  38. // perfect replace
  39. glog.V(3).Infof("%s/%s replace [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
  40. page.Data = data
  41. isPerfectReplace = true
  42. }
  43. }
  44. if isPerfectReplace {
  45. return nil, nil
  46. }
  47. if isPerfectAppend {
  48. pages.pages = append(pages.pages, &DirtyPage{
  49. Offset: offset,
  50. Data: data,
  51. })
  52. return nil, nil
  53. }
  54. chunk, err = pages.saveToStorage(ctx)
  55. glog.V(3).Infof("%s/%s saved [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  56. pages.pages = []*DirtyPage{&DirtyPage{
  57. Offset: offset,
  58. Data: data,
  59. }}
  60. return
  61. }
  62. func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
  63. pages.Lock()
  64. defer pages.Unlock()
  65. if chunk, err = pages.saveToStorage(ctx); err == nil {
  66. pages.pages = nil
  67. }
  68. return
  69. }
  70. func (pages *ContinuousDirtyPages) totalSize() (total int64) {
  71. for _, page := range pages.pages {
  72. total += int64(len(page.Data))
  73. }
  74. return
  75. }
  76. func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
  77. if len(pages.pages) == 0 {
  78. return nil, nil
  79. }
  80. sort.Slice(pages.pages, func(i, j int) bool {
  81. return pages.pages[i].Offset < pages.pages[j].Offset
  82. })
  83. var fileId, host string
  84. if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  85. request := &filer_pb.AssignVolumeRequest{
  86. Count: 1,
  87. Replication: pages.f.wfs.replication,
  88. Collection: pages.f.wfs.collection,
  89. }
  90. resp, err := client.AssignVolume(ctx, request)
  91. if err != nil {
  92. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  93. return err
  94. }
  95. fileId, host = resp.FileId, resp.Url
  96. return nil
  97. }); err != nil {
  98. return nil, fmt.Errorf("filer assign volume: %v", err)
  99. }
  100. var readers []io.Reader
  101. for _, page := range pages.pages {
  102. readers = append(readers, bytes.NewReader(page.Data))
  103. }
  104. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  105. bufReader := io.MultiReader(readers...)
  106. uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
  107. if err != nil {
  108. glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
  109. return nil, fmt.Errorf("upload data: %v", err)
  110. }
  111. if uploadResult.Error != "" {
  112. glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
  113. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  114. }
  115. return &filer_pb.FileChunk{
  116. FileId: fileId,
  117. Offset: pages.pages[0].Offset,
  118. Size: uint64(pages.totalSize()),
  119. Mtime: time.Now().UnixNano(),
  120. }, nil
  121. }