You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
4.7 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package filesys
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. )
  11. type ContinuousDirtyPages struct {
  12. hasData bool
  13. Offset int64
  14. Size int64
  15. Data []byte
  16. f *File
  17. }
  18. func newDirtyPages(file *File) *ContinuousDirtyPages {
  19. return &ContinuousDirtyPages{
  20. Data: make([]byte, file.wfs.option.ChunkSizeLimit),
  21. f: file,
  22. }
  23. }
  24. func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
  25. var chunk *filer_pb.FileChunk
  26. if len(data) > len(pages.Data) {
  27. // this is more than what buffer can hold.
  28. // flush existing
  29. if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
  30. if chunk != nil {
  31. glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  32. }
  33. chunks = append(chunks, chunk)
  34. } else {
  35. glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
  36. return
  37. }
  38. pages.Size = 0
  39. // flush the big page
  40. if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
  41. if chunk != nil {
  42. glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  43. chunks = append(chunks, chunk)
  44. }
  45. } else {
  46. glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
  47. return
  48. }
  49. return
  50. }
  51. if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
  52. pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) {
  53. // if the data is out of range,
  54. // or buffer is full if adding new data,
  55. // flush current buffer and add new data
  56. // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size)
  57. if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
  58. if chunk != nil {
  59. glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  60. chunks = append(chunks, chunk)
  61. }
  62. } else {
  63. glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
  64. return
  65. }
  66. pages.Offset = offset
  67. pages.Size = int64(len(data))
  68. copy(pages.Data, data)
  69. return
  70. }
  71. copy(pages.Data[offset-pages.Offset:], data)
  72. pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
  73. return
  74. }
  75. func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
  76. if pages.Size == 0 {
  77. return nil, nil
  78. }
  79. if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
  80. pages.Size = 0
  81. if chunk != nil {
  82. glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
  83. }
  84. }
  85. return
  86. }
  87. func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
  88. return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
  89. }
  90. func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
  91. if pages.Size == 0 {
  92. return nil, nil
  93. }
  94. var fileId, host string
  95. if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  96. request := &filer_pb.AssignVolumeRequest{
  97. Count: 1,
  98. Replication: pages.f.wfs.option.Replication,
  99. Collection: pages.f.wfs.option.Collection,
  100. TtlSec: pages.f.wfs.option.TtlSec,
  101. DataCenter: pages.f.wfs.option.DataCenter,
  102. }
  103. resp, err := client.AssignVolume(ctx, request)
  104. if err != nil {
  105. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  106. return err
  107. }
  108. fileId, host = resp.FileId, resp.Url
  109. return nil
  110. }); err != nil {
  111. return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  112. }
  113. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  114. bufReader := bytes.NewReader(pages.Data[:pages.Size])
  115. uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
  116. if err != nil {
  117. glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
  118. return nil, fmt.Errorf("upload data: %v", err)
  119. }
  120. if uploadResult.Error != "" {
  121. glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
  122. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  123. }
  124. return &filer_pb.FileChunk{
  125. FileId: fileId,
  126. Offset: offset,
  127. Size: uint64(len(buf)),
  128. Mtime: time.Now().UnixNano(),
  129. }, nil
  130. }
  131. func max(x, y int64) int64 {
  132. if x > y {
  133. return x
  134. }
  135. return y
  136. }