You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
2.6 KiB

4 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. )
  11. func (f *Filer) appendToFile(targetFile string, data []byte) error {
  12. assignResult, uploadResult, err2 := f.assignAndUpload(targetFile, data)
  13. if err2 != nil {
  14. return err2
  15. }
  16. // find out existing entry
  17. fullpath := util.FullPath(targetFile)
  18. entry, err := f.FindEntry(context.Background(), fullpath)
  19. var offset int64 = 0
  20. if err == filer_pb.ErrNotFound {
  21. entry = &Entry{
  22. FullPath: fullpath,
  23. Attr: Attr{
  24. Crtime: time.Now(),
  25. Mtime: time.Now(),
  26. Mode: os.FileMode(0644),
  27. Uid: OS_UID,
  28. Gid: OS_GID,
  29. },
  30. }
  31. } else if err != nil {
  32. return fmt.Errorf("find %s: %v", fullpath, err)
  33. } else {
  34. offset = int64(TotalSize(entry.GetChunks()))
  35. }
  36. // append to existing chunks
  37. entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano()))
  38. // update the entry
  39. err = f.CreateEntry(context.Background(), entry, false, false, nil, false, f.MaxFilenameLength)
  40. return err
  41. }
  42. func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
  43. // assign a volume location
  44. rule := f.FilerConf.MatchStorageRule(targetFile)
  45. assignRequest := &operation.VolumeAssignRequest{
  46. Count: 1,
  47. Collection: util.Nvl(f.metaLogCollection, rule.Collection),
  48. Replication: util.Nvl(f.metaLogReplication, rule.Replication),
  49. WritableVolumeCount: rule.VolumeGrowthCount,
  50. }
  51. assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
  52. if err != nil {
  53. return nil, nil, fmt.Errorf("AssignVolume: %v", err)
  54. }
  55. if assignResult.Error != "" {
  56. return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
  57. }
  58. // upload data
  59. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  60. uploadOption := &operation.UploadOption{
  61. UploadUrl: targetUrl,
  62. Filename: "",
  63. Cipher: f.Cipher,
  64. IsInputCompressed: false,
  65. MimeType: "",
  66. PairMap: nil,
  67. Jwt: assignResult.Auth,
  68. }
  69. uploader, err := operation.NewUploader()
  70. if err != nil {
  71. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  72. }
  73. uploadResult, err := uploader.UploadData(data, uploadOption)
  74. if err != nil {
  75. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  76. }
  77. // println("uploaded to", targetUrl)
  78. return assignResult, uploadResult, nil
  79. }