diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 1f89bd902..0a796a92f 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -1,13 +1,13 @@ package main import ( + "context" "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "log" "math/rand" "time" - "context" "google.golang.org/grpc" @@ -56,7 +56,7 @@ func main() { } func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) { - assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(context.Background(), func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, Replication: *replication, }) @@ -84,7 +84,7 @@ func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, st log.Fatalf("upload: %v", err) } - _, err = uploader.UploadData(data, uploadOption) + _, err = uploader.UploadData(context.Background(), data, uploadOption) if err != nil { log.Fatalf("upload: %v", err) } diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 08db2ef3d..715ae3d67 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -241,7 +241,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { Replication: *b.replication, DiskType: *b.diskType, } - if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil { + if assignResult, err := operation.Assign(context.Background(), b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil { fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection if !isSecure && assignResult.Auth != "" { isSecure = true @@ -288,7 +288,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { start := time.Now() var bytesRead int var err error - urls, err := b.masterClient.LookupFileId(fid) + urls, err := b.masterClient.LookupFileId(context.Background(), fid) if err != nil { s.failed++ println("!!!! ", fid, " location not found!!!!!") diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 2c0f84ddc..136440109 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -28,9 +28,9 @@ type FilerCatOptions struct { } func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { - return func(fileId string) (targetUrls []string, err error) { + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { vid := filer.VolumeId(fileId) - resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + resp, err := fco.filerClient.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) if err != nil { diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 752238de9..2e61826ca 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -1,6 +1,7 @@ package filer import ( + "context" "io" "sync" @@ -89,7 +90,7 @@ func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { continue } - resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk) + resolvedChunks, err := ResolveOneChunkManifest(context.Background(), group.lookupFn, chunk) if err != nil { return err } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 36096d2c1..00e9a73a1 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "context" "fmt" "io" "math" @@ -48,7 +49,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa return } -func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { +func ResolveChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { // TODO maybe parallel this for _, chunk := range chunks { @@ -61,14 +62,14 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun continue } - resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) + resolvedChunks, err := ResolveOneChunkManifest(ctx, lookupFileIdFn, chunk) if err != nil { return dataChunks, nil, err } manifestChunks = append(manifestChunks, chunk) // recursive - subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) + subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(ctx, lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { return dataChunks, nil, subErr } @@ -78,7 +79,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun return } -func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { +func ResolveOneChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { if !chunk.IsChunkManifest { return } @@ -87,7 +88,7 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer) bytesBuffer.Reset() defer bytesBufferPool.Put(bytesBuffer) - err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + err := fetchWholeChunk(ctx, bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) if err != nil { return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) } @@ -102,13 +103,13 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // TODO fetch from cache for weed mount? -func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { - urlStrings, err := lookupFileIdFn(fileId) +func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { + urlStrings, err := lookupFileIdFn(ctx, fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return err } - err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) + err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) if err != nil { return err } @@ -116,15 +117,15 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi } func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { - urlStrings, err := lookupFileIdFn(fileId) + urlStrings, err := lookupFileIdFn(context.Background(), fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return 0, err } - return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) + return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset) } -func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { +func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { var shouldRetry bool var totalWritten int @@ -135,7 +136,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri retriedCnt++ var localProcessed int var writeErr error - shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { if totalWritten > localProcessed { toBeSkipped := totalWritten - localProcessed if len(data) <= toBeSkipped { diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 7c8bb2fe1..7252169d8 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/wdclient" "math" @@ -61,9 +62,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks)) } -func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { +func CompactFileChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64) + visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64) compacted, garbage = SeparateGarbageChunks(visibles, chunks) @@ -98,13 +99,13 @@ func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, st return } -func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { +func MinusChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { - aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64) + aData, aMeta, aErr := ResolveChunkManifest(ctx, lookupFileIdFn, as, 0, math.MaxInt64) if aErr != nil { return nil, aErr } - bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64) + bData, bMeta, bErr := ResolveChunkManifest(ctx, lookupFileIdFn, bs, 0, math.MaxInt64) if bErr != nil { return nil, bErr } @@ -180,9 +181,9 @@ func (cv *ChunkView) IsFullChunk() bool { return cv.ViewSize == cv.ChunkSize } -func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) { +func ViewFromChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) { - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size) + visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, offset, offset+size) return ViewFromVisibleIntervals(visibles, offset, size) @@ -264,9 +265,9 @@ func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop // NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory // If the file chunk content is a chunk manifest -func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) { +func NonOverlappingVisibleIntervals(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) { - chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset) + chunks, _, err = ResolveChunkManifest(ctx, lookupFileIdFn, chunks, startOffset, stopOffset) if err != nil { return } diff --git a/weed/filer/filechunks2_test.go b/weed/filer/filechunks2_test.go index dfa971f86..b735b8a27 100644 --- a/weed/filer/filechunks2_test.go +++ b/weed/filer/filechunks2_test.go @@ -1,6 +1,7 @@ package filer import ( + "context" "github.com/stretchr/testify/assert" "log" "slices" @@ -65,7 +66,7 @@ func TestCompactFileChunksRealCase(t *testing.T) { printChunks("before", chunks) - compacted, garbage := CompactFileChunks(nil, chunks) + compacted, garbage := CompactFileChunks(context.Background(), nil, chunks) printChunks("compacted", compacted) printChunks("garbage", garbage) diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go index 7554b0080..4af2af3f6 100644 --- a/weed/filer/filechunks_test.go +++ b/weed/filer/filechunks_test.go @@ -1,6 +1,7 @@ package filer import ( + "context" "fmt" "log" "math" @@ -21,7 +22,7 @@ func TestCompactFileChunks(t *testing.T) { {Offset: 110, Size: 200, FileId: "jkl", ModifiedTsNs: 300}, } - compacted, garbage := CompactFileChunks(nil, chunks) + compacted, garbage := CompactFileChunks(context.Background(), nil, chunks) if len(compacted) != 3 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -54,7 +55,7 @@ func TestCompactFileChunks2(t *testing.T) { }) } - compacted, garbage := CompactFileChunks(nil, chunks) + compacted, garbage := CompactFileChunks(context.Background(), nil, chunks) if len(compacted) != 4 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -90,7 +91,7 @@ func TestRandomFileChunksCompact(t *testing.T) { } } - visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64) + visibles, _ := NonOverlappingVisibleIntervals(context.Background(), nil, chunks, 0, math.MaxInt64) for visible := visibles.Front(); visible != nil; visible = visible.Next { v := visible.Value @@ -228,7 +229,7 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64) + intervals, _ := NonOverlappingVisibleIntervals(context.Background(), nil, testcase.Chunks, 0, math.MaxInt64) x := -1 for visible := intervals.Front(); visible != nil; visible = visible.Next { x++ @@ -426,7 +427,7 @@ func TestChunksReading(t *testing.T) { // continue } log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) - chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) + chunks := ViewFromChunks(context.Background(), nil, testcase.Chunks, testcase.Offset, testcase.Size) x := -1 for c := chunks.Front(); c != nil; c = c.Next { x++ @@ -473,7 +474,7 @@ func BenchmarkCompactFileChunks(b *testing.B) { } for n := 0; n < b.N; n++ { - CompactFileChunks(nil, chunks) + CompactFileChunks(context.Background(), nil, chunks) } } @@ -562,7 +563,7 @@ func TestCompactFileChunks3(t *testing.T) { {Offset: 300, Size: 100, FileId: "def", ModifiedTsNs: 200}, } - compacted, _ := CompactFileChunks(nil, chunks) + compacted, _ := CompactFileChunks(context.Background(), nil, chunks) if len(compacted) != 4 { t.Fatalf("unexpected compacted: %d", len(compacted)) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index acde49d54..829b4c4b4 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -235,7 +235,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) - f.deleteChunksIfNotNew(oldEntry, entry) + f.deleteChunksIfNotNew(ctx, oldEntry, entry) glog.V(4).Infof("CreateEntry %s: created", entry.FullPath) diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 0ae421981..17ce48143 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -36,7 +36,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR // A case not handled: // what if the chunk is in a different collection? if shouldDeleteChunks { - f.maybeDeleteHardLinks(hardLinkIds) + f.maybeDeleteHardLinks(ctx, hardLinkIds) } return nil }) @@ -53,7 +53,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR } if shouldDeleteChunks && !isDeleteCollection { - f.DeleteChunks(p, entry.GetChunks()) + f.DeleteChunks(ctx, p, entry.GetChunks()) } if isDeleteCollection { @@ -117,7 +117,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry } f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) - f.DeleteChunks(entry.FullPath, chunksToDelete) + f.DeleteChunks(ctx, entry.FullPath, chunksToDelete) return nil } @@ -150,9 +150,9 @@ func (f *Filer) DoDeleteCollection(collectionName string) (err error) { } -func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) { +func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLinkId) { for _, hardLinkId := range hardLinkIds { - if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil { + if err := f.Store.DeleteHardLink(ctx, hardLinkId); err != nil { glog.Errorf("delete hard link id %d : %v", hardLinkId, err) } } diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 362c7c51b..cc140b3ce 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -1,6 +1,7 @@ package filer import ( + "context" "strings" "time" @@ -72,25 +73,25 @@ func (f *Filer) loopProcessingDeletion() { } } -func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk) { - f.doDeleteChunks(chunks) +func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) { + f.doDeleteChunks(ctx, chunks) } -func (f *Filer) DeleteChunks(fullpath util.FullPath, chunks []*filer_pb.FileChunk) { +func (f *Filer) DeleteChunks(ctx context.Context, fullpath util.FullPath, chunks []*filer_pb.FileChunk) { rule := f.FilerConf.MatchStorageRule(string(fullpath)) if rule.DisableChunkDeletion { return } - f.doDeleteChunks(chunks) + f.doDeleteChunks(ctx, chunks) } -func (f *Filer) doDeleteChunks(chunks []*filer_pb.FileChunk) { +func (f *Filer) doDeleteChunks(ctx context.Context, chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { if !chunk.IsChunkManifest { f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) continue } - dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) + dataChunks, manifestResolveErr := ResolveOneChunkManifest(ctx, f.MasterClient.LookupFileId, chunk) if manifestResolveErr != nil { glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) } @@ -107,7 +108,7 @@ func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) { } } -func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { +func (f *Filer) deleteChunksIfNotNew(ctx context.Context, oldEntry, newEntry *Entry) { var oldChunks, newChunks []*filer_pb.FileChunk if oldEntry != nil { oldChunks = oldEntry.GetChunks() @@ -116,7 +117,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { newChunks = newEntry.GetChunks() } - toDelete, err := MinusChunks(f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks) + toDelete, err := MinusChunks(ctx, f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks) if err != nil { glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks) return diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 3c9a3496c..699a4b70e 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -58,7 +58,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi WritableVolumeCount: rule.VolumeGrowthCount, } - assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest) + assignResult, err := operation.Assign(context.Background(), f.GetMaster, f.GrpcDialOption, assignRequest) if err != nil { return nil, nil, fmt.Errorf("AssignVolume: %v", err) } @@ -83,7 +83,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) } - uploadResult, err := uploader.UploadData(data, uploadOption) + uploadResult, err := uploader.UploadData(context.Background(), data, uploadOption) if err != nil { return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) } diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go index ac2c763e6..e67b283dd 100644 --- a/weed/filer/filer_notify_read.go +++ b/weed/filer/filer_notify_read.go @@ -323,7 +323,7 @@ type LogFileIterator struct { func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator { return &LogFileIterator{ - r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks), + r: NewChunkStreamReaderFromFiler(context.Background(), masterClient, fileEntry.Chunks), sizeBuf: make([]byte, 4), startTsNs: startTsNs, stopTsNs: stopTsNs, diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index b87fa0411..fecdc5174 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -29,7 +29,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp vidCache := make(map[string]*filer_pb.Locations) var vicCacheLock sync.RWMutex - return func(fileId string) (targetUrls []string, err error) { + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { vid := VolumeId(fileId) vicCacheLock.RLock() locations, found := vidCache[vid] @@ -38,7 +38,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp if !found { util.Retry("lookup volume "+vid, func() error { err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) if err != nil { diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 2ef81a931..08c59a34d 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -1,6 +1,7 @@ package filer import ( + "context" "fmt" "sync" "sync/atomic" @@ -169,7 +170,7 @@ func (s *SingleChunkCacher) startCaching() { s.cacheStartedCh <- struct{}{} // means this has been started - urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId) + urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId) if err != nil { s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) return @@ -177,7 +178,7 @@ func (s *SingleChunkCacher) startCaching() { s.data = mem.Allocate(s.chunkSize) - _, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) + _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) if s.err != nil { mem.Free(s.data) s.data = nil diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 2f55e3e44..bca9c7f6e 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "context" "fmt" "io" "math" @@ -71,7 +72,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R type DoStreamContent func(writer io.Writer) error func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { - return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0) + return PrepareStreamContentWithThrottler(context.Background(), masterClient, jwtFunc, chunks, offset, size, 0) } type VolumeServerJwtFunction func(fileId string) string @@ -80,9 +81,9 @@ func noJwtFunc(string) string { return "" } -func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { +func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) - chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) + chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -91,7 +92,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc var urlStrings []string var err error for _, backoff := range getLookupFileIdBackoffSchedule { - urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId) + urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) if err == nil && len(urlStrings) > 0 { break } @@ -127,7 +128,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc urlStrings := fileId2Url[chunkView.FileId] start := time.Now() jwt := jwtFunc(chunkView.FileId) - err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) offset += int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) @@ -177,25 +178,25 @@ func writeZero(w io.Writer, size int64) (err error) { return } -func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error { +func ReadAll(ctx context.Context, buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error { - lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { - return masterClient.LookupFileId(fileId) + lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrls []string, err error) { + return masterClient.LookupFileId(ctx, fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer))) + chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, int64(len(buffer))) idx := 0 for x := chunkViews.Front(); x != nil; x = x.Next { chunkView := x.Value - urlStrings, err := lookupFileIdFn(chunkView.FileId) + urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } - n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) + n, err := util_http.RetriedFetchChunkData(ctx, buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) if err != nil { return err } @@ -220,9 +221,9 @@ type ChunkStreamReader struct { var _ = io.ReadSeeker(&ChunkStreamReader{}) var _ = io.ReaderAt(&ChunkStreamReader{}) -func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { +func doNewChunkStreamReader(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64) var totalSize int64 for x := chunkViews.Front(); x != nil; x = x.Next { @@ -238,20 +239,20 @@ func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, ch } } -func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { +func NewChunkStreamReaderFromFiler(ctx context.Context, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { - return masterClient.LookupFileId(fileId) + lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrl []string, err error) { + return masterClient.LookupFileId(ctx, fileId) } - return doNewChunkStreamReader(lookupFileIdFn, chunks) + return doNewChunkStreamReader(ctx, lookupFileIdFn, chunks) } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := LookupFn(filerClient) - return doNewChunkStreamReader(lookupFileIdFn, chunks) + return doNewChunkStreamReader(context.Background(), lookupFileIdFn, chunks) } func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { @@ -343,7 +344,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlStrings, err := c.lookupFileId(chunkView.FileId) + urlStrings, err := c.lookupFileId(context.Background(), chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err @@ -351,7 +352,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 77ffb7e77..97e9ca30c 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -212,7 +212,7 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { if wfs.option.VolumeServerAccess == "filerProxy" { - return func(fileId string) (targetUrls []string, err error) { + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil } } diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 541270ff5..eda5ad8da 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -148,7 +148,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks()) - chunks, _ := filer.CompactFileChunks(wfs.LookupFn(), nonManifestChunks) + chunks, _ := filer.CompactFileChunks(context.Background(), wfs.LookupFn(), nonManifestChunks) chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks) if manifestErr != nil { // not good, but should be ok diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index b2ee2ae00..2a646e4ee 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -378,7 +378,7 @@ func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry return err } -func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) { +func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(ctx context.Context, fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) { if len(entry.Content) > 0 { // skip .offset files return @@ -392,7 +392,7 @@ func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetU fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name) return } - urlStrings, err = lookupFileIdFn(chunk.FileId) + urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId) if err != nil { err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) return diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go index 2c8a7c1de..12fca1706 100644 --- a/weed/mq/logstore/read_log_from_disk.go +++ b/weed/mq/logstore/read_log_from_disk.go @@ -75,7 +75,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) return } - urlStrings, err = lookupFileIdFn(chunk.FileId) + urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId) if err != nil { err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) return diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 6c69b9f12..3438af61a 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -55,7 +55,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { // create readerAt for the parquet file fileSize := filer.FileSize(entry) - visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) + visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize)) diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 13418da1a..eb54c674b 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -139,7 +139,7 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri return } -func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { +func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) @@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, Replication: request.Replication, @@ -165,7 +165,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest DataNode: request.DataNode, WritableVolumeCount: request.WritableVolumeCount, } - resp, grpcErr := masterClient.Assign(context.Background(), req) + resp, grpcErr := masterClient.Assign(ctx, req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go index ac0f4eee6..b2ec7d92a 100644 --- a/weed/operation/assign_file_id_test.go +++ b/weed/operation/assign_file_id_test.go @@ -60,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) { func BenchmarkUnaryAssign(b *testing.B) { for i := 0; i < b.N; i++ { - Assign(func(_ context.Context) pb.ServerAddress { + Assign(context.Background(), func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), &VolumeAssignRequest{ Count: 1, diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 7526a6e79..339d4507e 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -2,6 +2,7 @@ package operation import ( "bytes" + "context" "fmt" "io" "net/http" @@ -58,7 +59,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption) + uploadResult, err, data := uploader.Upload(context.Background(), bytes.NewReader([]byte(textContent)), uploadOption) if len(data) != len(textContent) { t.Errorf("data actual %d expected %d", len(data), len(textContent)) } @@ -86,7 +87,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - uploader.Upload(bytes.NewReader(gzippedData), uploadOption) + uploader.Upload(context.Background(), bytes.NewReader(gzippedData), uploadOption) } /* diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 9470afced..1efa42b2f 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -62,7 +62,7 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []* Ttl: pref.Ttl, DiskType: pref.DiskType, } - ret, err := Assign(masterFn, grpcDialOption, ar) + ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { for index := range files { results[index].Error = err.Error() @@ -155,7 +155,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j Ttl: fi.Pref.Ttl, DiskType: fi.Pref.DiskType, } - ret, err = Assign(masterFn, grpcDialOption, ar) + ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { return } @@ -169,7 +169,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j Ttl: fi.Pref.Ttl, DiskType: fi.Pref.DiskType, } - ret, err = Assign(masterFn, grpcDialOption, ar) + ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { // delete all uploaded chunks cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) @@ -223,7 +223,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j return 0, e } - ret, e, _ := uploader.Upload(fi.Reader, uploadOption) + ret, e, _ := uploader.Upload(context.Background(), fi.Reader, uploadOption) if e != nil { return 0, e } @@ -267,7 +267,7 @@ func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn, return 0, uploaderError } - uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption) + uploadResult, uploadError, _ := uploader.Upload(context.Background(), reader, uploadOption) if uploadError != nil { return 0, uploadError } @@ -299,6 +299,6 @@ func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt secu return e } - _, e = uploader.UploadData(buf, uploadOption) + _, e = uploader.UploadData(context.Background(), buf, uploadOption) return e } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 0cf6bf7cf..6d910674b 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption) + uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption) return uploadErr } if uploadOption.RetryForever { @@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi } // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { - uploadResult, err = uploader.retriedUploadData(data, option) +func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = uploader.retriedUploadData(ctx, data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = uploader.doUpload(reader, option) +func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = uploader.doUpload(ctx, reader, option) return } -func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes @@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo return } } - uploadResult, uploadErr := uploader.retriedUploadData(data, option) + uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option) return uploadResult, uploadErr, data } -func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { if i > 0 { time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } - uploadResult, err = uploader.doUploadData(data, option) + uploadResult, err = uploader.doUploadData(ctx, data, option) if err == nil { uploadResult.RetryCount = i return @@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) ( return } -func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { contentIsGzipped := option.IsInputCompressed shouldGzipNow := false if !option.IsInputCompressed { @@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa } // upload data - uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return }, len(encryptedData), &UploadOption{ @@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa } } else { // upload data - uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) { _, err = w.Write(data) return }, len(data), &UploadOption{ @@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa return uploadResult, err } -func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { +func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { var body_writer *multipart.Writer var reqReader *bytes.Reader var buf *bytebufferpool.ByteBuffer @@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er if option.Jwt != "" { req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } + + util.ReqWithRequestId(req, ctx) + // print("+") resp, post_err := uploader.httpClient.Do(req) defer util_http.CloseResponse(resp) diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index ba0d7d0cc..f179cc3a7 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -127,8 +127,8 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor { info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { - md, _ := metadata.FromIncomingContext(ctx) - idList := md.Get(util.RequestIDKey) + incomingMd, _ := metadata.FromIncomingContext(ctx) + idList := incomingMd.Get(util.RequestIDKey) var reqID string if len(idList) > 0 { reqID = idList[0] @@ -137,6 +137,11 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor { reqID = uuid.New().String() } + ctx = metadata.NewOutgoingContext(ctx, + metadata.New(map[string]string{ + util.RequestIDKey: reqID, + })) + ctx = util.WithRequestID(ctx, reqID) grpc.SetTrailer(ctx, metadata.Pairs(util.RequestIDKey, reqID)) diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go index 4a77fd04a..57c206e3e 100644 --- a/weed/replication/repl_util/replication_util.go +++ b/weed/replication/repl_util/replication_util.go @@ -1,6 +1,7 @@ package repl_util import ( + "context" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -12,7 +13,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS for x := chunkViews.Front(); x != nil; x = x.Next { chunk := x.Value - fileUrls, err := filerSource.LookupFileId(chunk.FileId) + fileUrls, err := filerSource.LookupFileId(context.Background(), chunk.FileId) if err != nil { return err } @@ -21,7 +22,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS var shouldRetry bool for _, fileUrl := range fileUrls { - shouldRetry, err = util_http.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index fb2f9ff82..fb28355bc 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -105,7 +105,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] } totalSize := filer.FileSize(entry) - chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) + chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your // Azure Storage account's container. diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 28a10b195..90f77f441 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -99,7 +99,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int } totalSize := filer.FileSize(entry) - chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) + chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 2254ca2f7..8b4b0e513 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -198,7 +198,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent glog.V(2).Infof("late updates %s", key) } else { // find out what changed - deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry) + deletedChunks, newChunks, err := compareChunks(context.Background(), filer.LookupFn(fs), oldEntry, newEntry) if err != nil { return true, fmt.Errorf("replicate %s compare chunks error: %v", key, err) } @@ -242,12 +242,12 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent }) } -func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { - aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64) +func compareChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { + aData, aMeta, aErr := filer.ResolveChunkManifest(ctx, lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64) if aErr != nil { return nil, nil, aErr } - bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64) + bData, bMeta, bErr := filer.ResolveChunkManifest(ctx, lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64) if bErr != nil { return nil, nil, bErr } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index db6ea4aec..6fe78b21b 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -97,7 +97,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in } totalSize := filer.FileSize(entry) - chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) + chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) defer wc.Close() diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go index c6dddb80a..2e962d1d0 100644 --- a/weed/replication/sink/localsink/local_sink.go +++ b/weed/replication/sink/localsink/local_sink.go @@ -1,6 +1,7 @@ package localsink import ( + "context" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -75,7 +76,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa glog.V(4).Infof("Create Entry key: %s", key) totalSize := filer.FileSize(entry) - chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) + chunkViews := filer.ViewFromChunks(context.Background(), localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) dir := filepath.Dir(key) diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 768e251a4..8a63d0c8f 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -55,7 +55,7 @@ func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, rea return nil } -func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) { +func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls []string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) @@ -63,7 +63,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) if err != nil { @@ -110,7 +110,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") } - fileUrls, err := fs.LookupFileId(fileId) + fileUrls, err := fs.LookupFileId(context.Background(), fileId) if err != nil { return "", nil, nil, err } diff --git a/weed/server/common.go b/weed/server/common.go index 516f8bf1c..3aeee7752 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -129,6 +129,7 @@ func debug(params ...interface{}) { } func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) { + ctx := r.Context() m := make(map[string]interface{}) if r.Method != http.MethodPost { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) @@ -163,7 +164,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope Ttl: r.FormValue("ttl"), DiskType: r.FormValue("disk"), } - assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar) + assignResult, ae := operation.Assign(ctx, masterFn, grpcDialOption, ar) if ae != nil { writeJsonError(w, r, http.StatusInternalServerError, ae) return @@ -189,7 +190,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope writeJsonError(w, r, http.StatusInternalServerError, err) return } - uploadResult, err := uploader.UploadData(pu.Data, uploadOption) + uploadResult, err := uploader.UploadData(ctx, pu.Data, uploadOption) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index b1440c94f..7fef61451 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -121,7 +121,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } -func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) { +func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) { fid, err := needle.ParseFileIdFromString(fileId) if err != nil { return nil, err @@ -142,12 +142,12 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr resp = &filer_pb.CreateEntryResponse{} - chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry) if err2 != nil { return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } - so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "") + so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "") if err != nil { return nil, err } @@ -177,7 +177,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry) if err2 != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) } @@ -201,11 +201,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } -func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { // remove old chunks if not included in the new ones if existingEntry != nil { - garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks()) + garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks()) if err != nil { return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %v", err) } @@ -214,11 +214,11 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry // files with manifest chunks are usually large and append only, skip calculating covered chunks manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks()) - chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks) + chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks) garbage = append(garbage, coveredChunks...) if newEntry.Attributes != nil { - so, _ := fs.detectStorageOption(fullpath, + so, _ := fs.detectStorageOption(ctx, fullpath, "", "", newEntry.Attributes.TtlSec, @@ -227,7 +227,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry "", "", ) // ignore readonly error for capacity needed to manifestize - chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks) + chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks) if err != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) @@ -271,12 +271,12 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.GetChunks(), req.Chunks...) - so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "") + so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "") if err != nil { glog.Warningf("detectStorageOption: %v", err) return &filer_pb.AppendToEntryResponse{}, err } - entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.GetChunks()) + entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks()) if err != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) @@ -305,7 +305,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol req.DiskType = fs.option.DiskType } - so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode) + so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil @@ -313,7 +313,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) - assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) + assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 991fff425..081d49ba0 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -64,7 +64,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req } // detect storage option - so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "") + so, err := fs.detectStorageOption(ctx, req.Directory, "", "", 0, "", "", "", "") if err != nil { return resp, err } @@ -97,7 +97,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req } // assign one volume server - assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) + assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) if err != nil { fetchAndWriteErr = err return @@ -184,10 +184,10 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req // this skips meta data log events if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil { - fs.filer.DeleteUncommittedChunks(chunks) + fs.filer.DeleteUncommittedChunks(ctx, chunks) return nil, err } - fs.filer.DeleteChunks(entry.FullPath, garbage) + fs.filer.DeleteChunks(ctx, entry.FullPath, garbage) fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil) diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index ca445ef9a..fd22ccd7f 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -3,6 +3,7 @@ package weed_server import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" @@ -31,8 +32,8 @@ func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrit } func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) { - - urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId) + ctx := r.Context() + urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId) if err != nil { glog.Errorf("locate %s: %v", fileId, err) w.WriteHeader(http.StatusInternalServerError) @@ -53,6 +54,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques proxyReq.Header.Set("Host", r.Host) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + util.ReqWithRequestId(proxyReq, ctx) for header, values := range r.Header { for _, value := range values { diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 12371a8f6..fda767c2e 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -2,7 +2,6 @@ package weed_server import ( "bytes" - "context" "encoding/base64" "encoding/hex" "errors" @@ -89,14 +88,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent } func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - + ctx := r.Context() path := r.URL.Path isForDirectory := strings.HasSuffix(path, "/") if isForDirectory && len(path) > 1 { path = path[:len(path)-1] } - entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path)) + entry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) if err != nil { if path == "/" { fs.listDirectoryHandler(w, r) @@ -147,6 +146,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if query.Get("metadata") == "true" { if query.Get("resolveManifest") == "true" { if entry.Chunks, _, err = filer.ResolveChunkManifest( + ctx, fs.filer.MasterClient.GetLookupFileIdFunction(), entry.GetChunks(), 0, math.MaxInt64); err != nil { err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error()) @@ -242,7 +242,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if shouldResize { data := mem.Allocate(int(totalSize)) defer mem.Free(data) - err := filer.ReadAll(data, fs.filer.MasterClient, entry.GetChunks()) + err := filer.ReadAll(ctx, data, fs.filer.MasterClient, entry.GetChunks()) if err != nil { glog.Errorf("failed to read %s: %v", path, err) w.WriteHeader(http.StatusInternalServerError) @@ -268,7 +268,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) chunks := entry.GetChunks() if entry.IsInRemoteOnly() { dir, name := entry.FullPath.DirAndName() - if resp, err := fs.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ + if resp, err := fs.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: dir, Name: name, }); err != nil { @@ -280,7 +280,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() glog.Errorf("failed to prepare stream content %s: %v", r.URL, err) diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 56f0f9cb4..1961a2f83 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "errors" "net/http" "strconv" @@ -18,7 +17,7 @@ import ( // sub directories are listed on the first page, when "lastFileName" // is empty. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { - + ctx := r.Context() if fs.option.ExposeDirectoryData == false { writeJsonError(w, r, http.StatusForbidden, errors.New("ui is disabled")) return @@ -40,7 +39,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque namePattern := r.FormValue("namePattern") namePatternExclude := r.FormValue("namePatternExclude") - entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude) + entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go index 80ea09d53..5f554de1d 100644 --- a/weed/server/filer_server_handlers_tagging.go +++ b/weed/server/filer_server_handlers_tagging.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "net/http" "strings" @@ -14,7 +13,7 @@ import ( // curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + ctx := r.Context() path := r.URL.Path if strings.HasSuffix(path, "/") { @@ -57,7 +56,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) // curl -X DELETE http://localhost:8888/path/to/a/file?tagging func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + ctx := r.Context() path := r.URL.Path if strings.HasSuffix(path, "/") { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 82880c2ac..b71c8fefd 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -34,7 +34,7 @@ type FilerPostResult struct { Error string `json:"error,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc() start := time.Now() @@ -44,7 +44,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u ar, altRequest := so.ToAssignRequests(1) - assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) + assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) err = ae @@ -70,7 +70,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) { - ctx := context.Background() + ctx := r.Context() destination := r.RequestURI if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" { @@ -78,7 +78,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte } query := r.URL.Query() - so, err := fs.detectStorageOption0(destination, + so, err := fs.detectStorageOption0(ctx, destination, query.Get("collection"), query.Get("replication"), query.Get("ttl"), @@ -240,7 +240,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) { +func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) { rule := fs.filer.FilerConf.MatchStorageRule(requestURI) @@ -280,14 +280,14 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication }, nil } -func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) { +func (fs *FilerServer) detectStorageOption0(ctx context.Context, requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) { ttl, err := needle.ReadTTL(qTtl) if err != nil { glog.Errorf("fail to parse ttl %s: %v", qTtl, err) } - so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode) + so, err := fs.detectStorageOption(ctx, requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode) if so != nil { if fsync == "false" { so.Fsync = false diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index b0af7be4b..b16374bea 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, part1, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err } @@ -107,12 +107,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite md5bytes = md5Hash.Sum(nil) headerMd5 := r.Header.Get("Content-Md5") if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.") } filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) if replyerr != nil { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) } return @@ -130,7 +130,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter return nil, nil, err } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err @@ -139,12 +139,12 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter md5bytes = md5Hash.Sum(nil) headerMd5 := r.Header.Get("Content-Md5") if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.") } filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) if replyerr != nil { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) } return @@ -299,14 +299,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } // maybe concatenate small chunks into one whole chunk - mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks) + mergedChunks, replyerr = fs.maybeMergeChunks(ctx, so, newChunks) if replyerr != nil { glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr) mergedChunks = newChunks } // maybe compact entry chunks - mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks) + mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks) if replyerr != nil { glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) return @@ -348,7 +348,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { +func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) { var fileId string @@ -356,7 +356,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs err := util.Retry("saveAsChunk", func() error { // assign one file id for one chunk - assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) + assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) if assignErr != nil { return assignErr } @@ -380,7 +380,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs } var uploadErr error - uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption) + uploadResult, uploadErr, _ = uploader.Upload(ctx, reader, uploadOption) if uploadErr != nil { return uploadErr } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 9c1628749..fb052b5fa 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -19,7 +19,7 @@ import ( // handling single chunk POST or PUT upload func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(so) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so) if err != nil || fileId == "" || urlLocation == "" { return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter) @@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr) } - uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption) + uploadResult, uploadError := uploader.UploadData(ctx, uncompressedData, uploadOption) if uploadError != nil { return nil, fmt.Errorf("upload to volume server: %v", uploadError) } @@ -97,7 +97,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht } if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil { - fs.filer.DeleteUncommittedChunks(entry.GetChunks()) + fs.filer.DeleteUncommittedChunks(ctx, entry.GetChunks()) err = dbErr filerResult.Error = dbErr.Error() return diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go index 2110f485a..c22aa14c2 100644 --- a/weed/server/filer_server_handlers_write_merge.go +++ b/weed/server/filer_server_handlers_write_merge.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -12,7 +13,7 @@ import ( const MergeChunkMinCount int = 1000 -func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { // Only merge small chunks more than half of the file var chunkSize = fs.option.MaxMB * 1024 * 1024 var smallChunk, sumChunk int @@ -33,16 +34,16 @@ func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks return inputChunks, nil } - return fs.mergeChunks(so, inputChunks, minOffset) + return fs.mergeChunks(ctx, so, inputChunks, minOffset) } -func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { - chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks) +func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { + chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks) _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent) if mergeErr != nil { return nil, mergeErr } - mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) + mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) if mergeErr != nil { return } @@ -54,7 +55,7 @@ func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*f } } - garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks) + garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks) if err != nil { glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", mergedChunks, inputChunks) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index e34fe27e6..495fae05a 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -2,6 +2,7 @@ package weed_server import ( "bytes" + "context" "crypto/md5" "fmt" "hash" @@ -27,7 +28,7 @@ var bufPool = sync.Pool{ }, } -func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { +func (fs *FilerServer) uploadRequestToChunks(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { query := r.URL.Query() isAppend := isAppend(r) @@ -45,10 +46,10 @@ func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Requ chunkOffset = offsetInt } - return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so) + return fs.uploadReaderToChunks(ctx, reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so) } -func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { +func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { md5Hash = md5.New() chunkOffset = startOffset @@ -117,7 +118,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, wg.Done() }() - chunks, toChunkErr := fs.dataToChunk(fileName, contentType, buf.Bytes(), offset, so) + chunks, toChunkErr := fs.dataToChunk(ctx, fileName, contentType, buf.Bytes(), offset, so) if toChunkErr != nil { uploadErrLock.Lock() if uploadErr == nil { @@ -152,7 +153,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, for _, chunk := range fileChunks { glog.V(4).Infof("purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) return nil, md5Hash, 0, uploadErr, nil } slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) int { @@ -161,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, return fileChunks, md5Hash, chunkOffset, nil, smallContent } -func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { +func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc() start := time.Now() @@ -184,14 +185,14 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil return nil, err, []byte{} } - uploadResult, err, data := uploader.Upload(limitedReader, uploadOption) + uploadResult, err, data := uploader.Upload(ctx, limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) } return uploadResult, err, data } -func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) { +func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) { dataReader := util.NewBytesReader(data) // retry to assign a different file id @@ -203,14 +204,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch err := util.Retry("filerDataToChunk", func() error { // assign one file id for one chunk - fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so) + fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so) if uploadErr != nil { glog.V(4).Infof("retry later due to assign error: %v", uploadErr) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() return uploadErr } // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth) + uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth) if uploadErr != nil { glog.V(4).Infof("retry later due to upload error: %v", uploadErr) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 4452e019b..0b5fa1cfc 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -77,7 +77,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser return } - if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil { + if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil { err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr) } }(replica.Url) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 7f0fcc871..30f335f5d 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -16,6 +16,7 @@ import ( ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() if e := r.ParseForm(); e != nil { glog.V(0).Infoln("form parse error:", e) writeJsonError(w, r, http.StatusBadRequest, e) @@ -45,7 +46,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5) + isUnchanged, writeError := topology.ReplicatedWrite(ctx, vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5) if writeError != nil { writeJsonError(w, r, http.StatusInternalServerError, writeError) return diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 2b7493ee6..47fa055e7 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -556,7 +556,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.visibleIntervals == nil { - f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize) + f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(context.Background(), filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize) f.reader = nil } if f.reader == nil { diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index f31ec10f4..e5ae54285 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -351,7 +351,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie jwt = security.GenJwtForVolumeServer(security.SigningKey(signingKey), expiresAfterSec, toFid.String()) } - _, err, _ = uploader.Upload(reader, &operation.UploadOption{ + _, err, _ = uploader.Upload(context.Background(), reader, &operation.UploadOption{ UploadUrl: uploadURL, Filename: filename, IsInputCompressed: isCompressed, diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index dcac60874..fd46bf591 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -286,7 +286,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC return nil } } - dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64) + dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(context.Background(), filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64) if resolveErr != nil { return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr) } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index dbd814309..8a76722d4 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -240,7 +240,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m if *c.verbose && entry.Entry.IsDirectory { fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) } - dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64) + dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(context.Background(), filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64) if resolveErr != nil { return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index a2be991fa..10fe35f0a 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -1,6 +1,7 @@ package topology import ( + "context" "encoding/json" "errors" "fmt" @@ -23,7 +24,7 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) { +func ReplicatedWrite(ctx context.Context, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) @@ -121,7 +122,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) return err } - _, err = uploader.UploadData(n.Data, uploadOption) + _, err = uploader.UploadData(ctx, n.Data, uploadOption) if err != nil { glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) } diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 33d978d9e..3cc819a47 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -2,6 +2,7 @@ package http import ( "compress/gzip" + "context" "encoding/json" "errors" "fmt" @@ -214,11 +215,11 @@ func NormalizeUrl(url string) (string, error) { return GetGlobalHttpClient().NormalizeHttpScheme(url) } -func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { +func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { if cipherKey != nil { var n int - _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + _, err := readEncryptedUrl(ctx, fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { n = copy(buf, data) }) return int64(n), err @@ -286,13 +287,13 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC return n, err } -func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { - return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) +func ReadUrlAsStream(ctx context.Context, fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { + return ReadUrlAsStreamAuthenticated(ctx, fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) } -func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { +func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { if cipherKey != nil { - return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) } req, err := http.NewRequest(http.MethodGet, fileUrl, nil) @@ -306,6 +307,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte } else { req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } + util.ReqWithRequestId(req, ctx) r, err := GetGlobalHttpClient().Do(req) if err != nil { @@ -351,7 +353,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte } -func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { +func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt) if err != nil { return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) @@ -447,7 +449,7 @@ func (r *CountingReader) Read(p []byte) (n int, err error) { return n, err } -func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { +func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { var shouldRetry bool @@ -457,7 +459,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, if strings.Contains(urlString, "%") { urlString = url.PathEscape(urlString) } - shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { if n < len(buffer) { x := copy(buffer[n:], data) n += x diff --git a/weed/util/request_id.go b/weed/util/request_id.go index 85ec254dc..7945e7cc4 100644 --- a/weed/util/request_id.go +++ b/weed/util/request_id.go @@ -1,6 +1,9 @@ package util -import "context" +import ( + "context" + "net/http" +) const ( RequestIdHttpHeader = "X-Request-ID" @@ -18,3 +21,7 @@ func GetRequestID(ctx context.Context) string { func WithRequestID(ctx context.Context, id string) context.Context { return context.WithValue(ctx, RequestIDKey, id) } + +func ReqWithRequestId(req *http.Request, ctx context.Context) { + req.Header.Set(RequestIdHttpHeader, GetRequestID(ctx)) +} diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index da46a440b..eccf1d14b 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -56,13 +56,14 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { return mc.LookupFileIdWithFallback } -func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) { - fullUrls, err = mc.vidMap.LookupFileId(fileId) +func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { + fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ + + err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, }) if err != nil { diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 7a2a5bb92..9d5e5d378 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -1,6 +1,7 @@ package wdclient import ( + "context" "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -21,7 +22,7 @@ type HasLookupFileIdFunction interface { GetLookupFileIdFunction() LookupFileIdFunctionType } -type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error) +type LookupFileIdFunctionType func(ctx context.Context, fileId string) (targetUrls []string, err error) type Location struct { Url string `json:"url,omitempty"` @@ -99,7 +100,7 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er return } -func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) { +func (vc *vidMap) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { return nil, errors.New("Invalid fileId " + fileId)