Browse Source

Add context with request (#6824)

pull/5490/merge
Aleksey Kosov 2 weeks ago
committed by GitHub
parent
commit
283d9e0079
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 6
      unmaintained/repeated_vacuum/repeated_vacuum.go
  2. 4
      weed/command/benchmark.go
  3. 4
      weed/command/filer_cat.go
  4. 3
      weed/filer/filechunk_group.go
  5. 25
      weed/filer/filechunk_manifest.go
  6. 19
      weed/filer/filechunks.go
  7. 3
      weed/filer/filechunks2_test.go
  8. 15
      weed/filer/filechunks_test.go
  9. 2
      weed/filer/filer.go
  10. 10
      weed/filer/filer_delete_entry.go
  11. 17
      weed/filer/filer_deletion.go
  12. 4
      weed/filer/filer_notify_append.go
  13. 2
      weed/filer/filer_notify_read.go
  14. 4
      weed/filer/reader_at.go
  15. 5
      weed/filer/reader_cache.go
  16. 41
      weed/filer/stream.go
  17. 2
      weed/mount/weedfs.go
  18. 2
      weed/mount/weedfs_file_sync.go
  19. 4
      weed/mq/logstore/log_to_parquet.go
  20. 2
      weed/mq/logstore/read_log_from_disk.go
  21. 2
      weed/mq/logstore/read_parquet_to_log.go
  22. 6
      weed/operation/assign_file_id.go
  23. 2
      weed/operation/assign_file_id_test.go
  24. 5
      weed/operation/needle_parse_test.go
  25. 12
      weed/operation/submit.go
  26. 29
      weed/operation/upload_content.go
  27. 9
      weed/pb/grpc_client_server.go
  28. 5
      weed/replication/repl_util/replication_util.go
  29. 2
      weed/replication/sink/azuresink/azure_sink.go
  30. 2
      weed/replication/sink/b2sink/b2_sink.go
  31. 8
      weed/replication/sink/filersink/filer_sink.go
  32. 2
      weed/replication/sink/gcssink/gcs_sink.go
  33. 3
      weed/replication/sink/localsink/local_sink.go
  34. 6
      weed/replication/source/filer_source.go
  35. 5
      weed/server/common.go
  36. 26
      weed/server/filer_grpc_server.go
  37. 8
      weed/server/filer_grpc_server_remote.go
  38. 6
      weed/server/filer_server_handlers_proxy.go
  39. 12
      weed/server/filer_server_handlers_read.go
  40. 5
      weed/server/filer_server_handlers_read_dir.go
  41. 5
      weed/server/filer_server_handlers_tagging.go
  42. 14
      weed/server/filer_server_handlers_write.go
  43. 22
      weed/server/filer_server_handlers_write_autochunk.go
  44. 6
      weed/server/filer_server_handlers_write_cipher.go
  45. 13
      weed/server/filer_server_handlers_write_merge.go
  46. 21
      weed/server/filer_server_handlers_write_upload.go
  47. 2
      weed/server/volume_grpc_remote.go
  48. 3
      weed/server/volume_server_handlers_write.go
  49. 2
      weed/server/webdav_server.go
  50. 2
      weed/shell/command_fs_merge_volumes.go
  51. 2
      weed/shell/command_fs_verify.go
  52. 2
      weed/shell/command_volume_fsck.go
  53. 5
      weed/topology/store_replicate.go
  54. 20
      weed/util/http/http_global_client_util.go
  55. 9
      weed/util/request_id.go
  56. 9
      weed/wdclient/masterclient.go
  57. 5
      weed/wdclient/vid_map.go

6
unmaintained/repeated_vacuum/repeated_vacuum.go

@ -1,13 +1,13 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"log" "log"
"math/rand" "math/rand"
"time" "time"
"context"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -56,7 +56,7 @@ func main() {
} }
func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) { func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) {
assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
assignResult, err := operation.Assign(context.Background(), func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
Count: 1, Count: 1,
Replication: *replication, Replication: *replication,
}) })
@ -84,7 +84,7 @@ func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, st
log.Fatalf("upload: %v", err) log.Fatalf("upload: %v", err)
} }
_, err = uploader.UploadData(data, uploadOption)
_, err = uploader.UploadData(context.Background(), data, uploadOption)
if err != nil { if err != nil {
log.Fatalf("upload: %v", err) log.Fatalf("upload: %v", err)
} }

4
weed/command/benchmark.go

@ -241,7 +241,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Replication: *b.replication, Replication: *b.replication,
DiskType: *b.diskType, DiskType: *b.diskType,
} }
if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
if assignResult, err := operation.Assign(context.Background(), b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection
if !isSecure && assignResult.Auth != "" { if !isSecure && assignResult.Auth != "" {
isSecure = true isSecure = true
@ -288,7 +288,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now() start := time.Now()
var bytesRead int var bytesRead int
var err error var err error
urls, err := b.masterClient.LookupFileId(fid)
urls, err := b.masterClient.LookupFileId(context.Background(), fid)
if err != nil { if err != nil {
s.failed++ s.failed++
println("!!!! ", fid, " location not found!!!!!") println("!!!! ", fid, " location not found!!!!!")

4
weed/command/filer_cat.go

@ -28,9 +28,9 @@ type FilerCatOptions struct {
} }
func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
return func(fileId string) (targetUrls []string, err error) {
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := filer.VolumeId(fileId) vid := filer.VolumeId(fileId)
resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
resp, err := fco.filerClient.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid}, VolumeIds: []string{vid},
}) })
if err != nil { if err != nil {

3
weed/filer/filechunk_group.go

@ -1,6 +1,7 @@
package filer package filer
import ( import (
"context"
"io" "io"
"sync" "sync"
@ -89,7 +90,7 @@ func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
continue continue
} }
resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk)
resolvedChunks, err := ResolveOneChunkManifest(context.Background(), group.lookupFn, chunk)
if err != nil { if err != nil {
return err return err
} }

25
weed/filer/filechunk_manifest.go

@ -2,6 +2,7 @@ package filer
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -48,7 +49,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return return
} }
func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
func ResolveChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this // TODO maybe parallel this
for _, chunk := range chunks { for _, chunk := range chunks {
@ -61,14 +62,14 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
continue continue
} }
resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
resolvedChunks, err := ResolveOneChunkManifest(ctx, lookupFileIdFn, chunk)
if err != nil { if err != nil {
return dataChunks, nil, err return dataChunks, nil, err
} }
manifestChunks = append(manifestChunks, chunk) manifestChunks = append(manifestChunks, chunk)
// recursive // recursive
subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(ctx, lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil { if subErr != nil {
return dataChunks, nil, subErr return dataChunks, nil, subErr
} }
@ -78,7 +79,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
return return
} }
func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
func ResolveOneChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
if !chunk.IsChunkManifest { if !chunk.IsChunkManifest {
return return
} }
@ -87,7 +88,7 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer) bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
bytesBuffer.Reset() bytesBuffer.Reset()
defer bytesBufferPool.Put(bytesBuffer) defer bytesBufferPool.Put(bytesBuffer)
err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
err := fetchWholeChunk(ctx, bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil { if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
} }
@ -102,13 +103,13 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
} }
// TODO fetch from cache for weed mount? // TODO fetch from cache for weed mount?
func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
urlStrings, err := lookupFileIdFn(fileId)
func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
urlStrings, err := lookupFileIdFn(ctx, fileId)
if err != nil { if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return err return err
} }
err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
if err != nil { if err != nil {
return err return err
} }
@ -116,15 +117,15 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi
} }
func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
urlStrings, err := lookupFileIdFn(fileId)
urlStrings, err := lookupFileIdFn(context.Background(), fileId)
if err != nil { if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err return 0, err
} }
return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset)
} }
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
var shouldRetry bool var shouldRetry bool
var totalWritten int var totalWritten int
@ -135,7 +136,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri
retriedCnt++ retriedCnt++
var localProcessed int var localProcessed int
var writeErr error var writeErr error
shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
if totalWritten > localProcessed { if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped { if len(data) <= toBeSkipped {

19
weed/filer/filechunks.go

@ -2,6 +2,7 @@ package filer
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/wdclient"
"math" "math"
@ -61,9 +62,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks)) return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks))
} }
func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
func CompactFileChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64)
visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64)
compacted, garbage = SeparateGarbageChunks(visibles, chunks) compacted, garbage = SeparateGarbageChunks(visibles, chunks)
@ -98,13 +99,13 @@ func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, st
return return
} }
func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
func MinusChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64)
aData, aMeta, aErr := ResolveChunkManifest(ctx, lookupFileIdFn, as, 0, math.MaxInt64)
if aErr != nil { if aErr != nil {
return nil, aErr return nil, aErr
} }
bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64)
bData, bMeta, bErr := ResolveChunkManifest(ctx, lookupFileIdFn, bs, 0, math.MaxInt64)
if bErr != nil { if bErr != nil {
return nil, bErr return nil, bErr
} }
@ -180,9 +181,9 @@ func (cv *ChunkView) IsFullChunk() bool {
return cv.ViewSize == cv.ChunkSize return cv.ViewSize == cv.ChunkSize
} }
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
func ViewFromChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size)
visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, offset, offset+size)
return ViewFromVisibleIntervals(visibles, offset, size) return ViewFromVisibleIntervals(visibles, offset, size)
@ -264,9 +265,9 @@ func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory // NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest // If the file chunk content is a chunk manifest
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
func NonOverlappingVisibleIntervals(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
chunks, _, err = ResolveChunkManifest(ctx, lookupFileIdFn, chunks, startOffset, stopOffset)
if err != nil { if err != nil {
return return
} }

3
weed/filer/filechunks2_test.go

@ -1,6 +1,7 @@
package filer package filer
import ( import (
"context"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"log" "log"
"slices" "slices"
@ -65,7 +66,7 @@ func TestCompactFileChunksRealCase(t *testing.T) {
printChunks("before", chunks) printChunks("before", chunks)
compacted, garbage := CompactFileChunks(nil, chunks)
compacted, garbage := CompactFileChunks(context.Background(), nil, chunks)
printChunks("compacted", compacted) printChunks("compacted", compacted)
printChunks("garbage", garbage) printChunks("garbage", garbage)

15
weed/filer/filechunks_test.go

@ -1,6 +1,7 @@
package filer package filer
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"math" "math"
@ -21,7 +22,7 @@ func TestCompactFileChunks(t *testing.T) {
{Offset: 110, Size: 200, FileId: "jkl", ModifiedTsNs: 300}, {Offset: 110, Size: 200, FileId: "jkl", ModifiedTsNs: 300},
} }
compacted, garbage := CompactFileChunks(nil, chunks)
compacted, garbage := CompactFileChunks(context.Background(), nil, chunks)
if len(compacted) != 3 { if len(compacted) != 3 {
t.Fatalf("unexpected compacted: %d", len(compacted)) t.Fatalf("unexpected compacted: %d", len(compacted))
@ -54,7 +55,7 @@ func TestCompactFileChunks2(t *testing.T) {
}) })
} }
compacted, garbage := CompactFileChunks(nil, chunks)
compacted, garbage := CompactFileChunks(context.Background(), nil, chunks)
if len(compacted) != 4 { if len(compacted) != 4 {
t.Fatalf("unexpected compacted: %d", len(compacted)) t.Fatalf("unexpected compacted: %d", len(compacted))
@ -90,7 +91,7 @@ func TestRandomFileChunksCompact(t *testing.T) {
} }
} }
visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64)
visibles, _ := NonOverlappingVisibleIntervals(context.Background(), nil, chunks, 0, math.MaxInt64)
for visible := visibles.Front(); visible != nil; visible = visible.Next { for visible := visibles.Front(); visible != nil; visible = visible.Next {
v := visible.Value v := visible.Value
@ -228,7 +229,7 @@ func TestIntervalMerging(t *testing.T) {
for i, testcase := range testcases { for i, testcase := range testcases {
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64)
intervals, _ := NonOverlappingVisibleIntervals(context.Background(), nil, testcase.Chunks, 0, math.MaxInt64)
x := -1 x := -1
for visible := intervals.Front(); visible != nil; visible = visible.Next { for visible := intervals.Front(); visible != nil; visible = visible.Next {
x++ x++
@ -426,7 +427,7 @@ func TestChunksReading(t *testing.T) {
// continue // continue
} }
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
chunks := ViewFromChunks(context.Background(), nil, testcase.Chunks, testcase.Offset, testcase.Size)
x := -1 x := -1
for c := chunks.Front(); c != nil; c = c.Next { for c := chunks.Front(); c != nil; c = c.Next {
x++ x++
@ -473,7 +474,7 @@ func BenchmarkCompactFileChunks(b *testing.B) {
} }
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
CompactFileChunks(nil, chunks)
CompactFileChunks(context.Background(), nil, chunks)
} }
} }
@ -562,7 +563,7 @@ func TestCompactFileChunks3(t *testing.T) {
{Offset: 300, Size: 100, FileId: "def", ModifiedTsNs: 200}, {Offset: 300, Size: 100, FileId: "def", ModifiedTsNs: 200},
} }
compacted, _ := CompactFileChunks(nil, chunks)
compacted, _ := CompactFileChunks(context.Background(), nil, chunks)
if len(compacted) != 4 { if len(compacted) != 4 {
t.Fatalf("unexpected compacted: %d", len(compacted)) t.Fatalf("unexpected compacted: %d", len(compacted))

2
weed/filer/filer.go

@ -235,7 +235,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
f.deleteChunksIfNotNew(ctx, oldEntry, entry)
glog.V(4).Infof("CreateEntry %s: created", entry.FullPath) glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)

10
weed/filer/filer_delete_entry.go

@ -36,7 +36,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
// A case not handled: // A case not handled:
// what if the chunk is in a different collection? // what if the chunk is in a different collection?
if shouldDeleteChunks { if shouldDeleteChunks {
f.maybeDeleteHardLinks(hardLinkIds)
f.maybeDeleteHardLinks(ctx, hardLinkIds)
} }
return nil return nil
}) })
@ -53,7 +53,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
} }
if shouldDeleteChunks && !isDeleteCollection { if shouldDeleteChunks && !isDeleteCollection {
f.DeleteChunks(p, entry.GetChunks())
f.DeleteChunks(ctx, p, entry.GetChunks())
} }
if isDeleteCollection { if isDeleteCollection {
@ -117,7 +117,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
} }
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
f.DeleteChunks(entry.FullPath, chunksToDelete)
f.DeleteChunks(ctx, entry.FullPath, chunksToDelete)
return nil return nil
} }
@ -150,9 +150,9 @@ func (f *Filer) DoDeleteCollection(collectionName string) (err error) {
} }
func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) {
func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLinkId) {
for _, hardLinkId := range hardLinkIds { for _, hardLinkId := range hardLinkIds {
if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil {
if err := f.Store.DeleteHardLink(ctx, hardLinkId); err != nil {
glog.Errorf("delete hard link id %d : %v", hardLinkId, err) glog.Errorf("delete hard link id %d : %v", hardLinkId, err)
} }
} }

17
weed/filer/filer_deletion.go

@ -1,6 +1,7 @@
package filer package filer
import ( import (
"context"
"strings" "strings"
"time" "time"
@ -72,25 +73,25 @@ func (f *Filer) loopProcessingDeletion() {
} }
} }
func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk) {
f.doDeleteChunks(chunks)
func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
f.doDeleteChunks(ctx, chunks)
} }
func (f *Filer) DeleteChunks(fullpath util.FullPath, chunks []*filer_pb.FileChunk) {
func (f *Filer) DeleteChunks(ctx context.Context, fullpath util.FullPath, chunks []*filer_pb.FileChunk) {
rule := f.FilerConf.MatchStorageRule(string(fullpath)) rule := f.FilerConf.MatchStorageRule(string(fullpath))
if rule.DisableChunkDeletion { if rule.DisableChunkDeletion {
return return
} }
f.doDeleteChunks(chunks)
f.doDeleteChunks(ctx, chunks)
} }
func (f *Filer) doDeleteChunks(chunks []*filer_pb.FileChunk) {
func (f *Filer) doDeleteChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks { for _, chunk := range chunks {
if !chunk.IsChunkManifest { if !chunk.IsChunkManifest {
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
continue continue
} }
dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
dataChunks, manifestResolveErr := ResolveOneChunkManifest(ctx, f.MasterClient.LookupFileId, chunk)
if manifestResolveErr != nil { if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
} }
@ -107,7 +108,7 @@ func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
} }
} }
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
func (f *Filer) deleteChunksIfNotNew(ctx context.Context, oldEntry, newEntry *Entry) {
var oldChunks, newChunks []*filer_pb.FileChunk var oldChunks, newChunks []*filer_pb.FileChunk
if oldEntry != nil { if oldEntry != nil {
oldChunks = oldEntry.GetChunks() oldChunks = oldEntry.GetChunks()
@ -116,7 +117,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
newChunks = newEntry.GetChunks() newChunks = newEntry.GetChunks()
} }
toDelete, err := MinusChunks(f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
toDelete, err := MinusChunks(ctx, f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
if err != nil { if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks) glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks)
return return

4
weed/filer/filer_notify_append.go

@ -58,7 +58,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
WritableVolumeCount: rule.VolumeGrowthCount, WritableVolumeCount: rule.VolumeGrowthCount,
} }
assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
assignResult, err := operation.Assign(context.Background(), f.GetMaster, f.GrpcDialOption, assignRequest)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("AssignVolume: %v", err) return nil, nil, fmt.Errorf("AssignVolume: %v", err)
} }
@ -83,7 +83,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
} }
uploadResult, err := uploader.UploadData(data, uploadOption)
uploadResult, err := uploader.UploadData(context.Background(), data, uploadOption)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
} }

2
weed/filer/filer_notify_read.go

@ -323,7 +323,7 @@ type LogFileIterator struct {
func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator { func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
return &LogFileIterator{ return &LogFileIterator{
r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks),
r: NewChunkStreamReaderFromFiler(context.Background(), masterClient, fileEntry.Chunks),
sizeBuf: make([]byte, 4), sizeBuf: make([]byte, 4),
startTsNs: startTsNs, startTsNs: startTsNs,
stopTsNs: stopTsNs, stopTsNs: stopTsNs,

4
weed/filer/reader_at.go

@ -29,7 +29,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
vidCache := make(map[string]*filer_pb.Locations) vidCache := make(map[string]*filer_pb.Locations)
var vicCacheLock sync.RWMutex var vicCacheLock sync.RWMutex
return func(fileId string) (targetUrls []string, err error) {
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId) vid := VolumeId(fileId)
vicCacheLock.RLock() vicCacheLock.RLock()
locations, found := vidCache[vid] locations, found := vidCache[vid]
@ -38,7 +38,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
if !found { if !found {
util.Retry("lookup volume "+vid, func() error { util.Retry("lookup volume "+vid, func() error {
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid}, VolumeIds: []string{vid},
}) })
if err != nil { if err != nil {

5
weed/filer/reader_cache.go

@ -1,6 +1,7 @@
package filer package filer
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -169,7 +170,7 @@ func (s *SingleChunkCacher) startCaching() {
s.cacheStartedCh <- struct{}{} // means this has been started s.cacheStartedCh <- struct{}{} // means this has been started
urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId)
if err != nil { if err != nil {
s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
return return
@ -177,7 +178,7 @@ func (s *SingleChunkCacher) startCaching() {
s.data = mem.Allocate(s.chunkSize) s.data = mem.Allocate(s.chunkSize)
_, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
_, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
if s.err != nil { if s.err != nil {
mem.Free(s.data) mem.Free(s.data)
s.data = nil s.data = nil

41
weed/filer/stream.go

@ -2,6 +2,7 @@ package filer
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -71,7 +72,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
type DoStreamContent func(writer io.Writer) error type DoStreamContent func(writer io.Writer) error
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0)
return PrepareStreamContentWithThrottler(context.Background(), masterClient, jwtFunc, chunks, offset, size, 0)
} }
type VolumeServerJwtFunction func(fileId string) string type VolumeServerJwtFunction func(fileId string) string
@ -80,9 +81,9 @@ func noJwtFunc(string) string {
return "" return ""
} }
func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string) fileId2Url := make(map[string][]string)
@ -91,7 +92,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
var urlStrings []string var urlStrings []string
var err error var err error
for _, backoff := range getLookupFileIdBackoffSchedule { for _, backoff := range getLookupFileIdBackoffSchedule {
urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId)
if err == nil && len(urlStrings) > 0 { if err == nil && len(urlStrings) > 0 {
break break
} }
@ -127,7 +128,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
urlStrings := fileId2Url[chunkView.FileId] urlStrings := fileId2Url[chunkView.FileId]
start := time.Now() start := time.Now()
jwt := jwtFunc(chunkView.FileId) jwt := jwtFunc(chunkView.FileId)
err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
offset += int64(chunkView.ViewSize) offset += int64(chunkView.ViewSize)
remaining -= int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
@ -177,25 +178,25 @@ func writeZero(w io.Writer, size int64) (err error) {
return return
} }
func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
func ReadAll(ctx context.Context, buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(ctx, fileId)
} }
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, int64(len(buffer)))
idx := 0 idx := 0
for x := chunkViews.Front(); x != nil; x = x.Next { for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value chunkView := x.Value
urlStrings, err := lookupFileIdFn(chunkView.FileId)
urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err return err
} }
n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
n, err := util_http.RetriedFetchChunkData(ctx, buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil { if err != nil {
return err return err
} }
@ -220,9 +221,9 @@ type ChunkStreamReader struct {
var _ = io.ReadSeeker(&ChunkStreamReader{}) var _ = io.ReadSeeker(&ChunkStreamReader{})
var _ = io.ReaderAt(&ChunkStreamReader{}) var _ = io.ReaderAt(&ChunkStreamReader{})
func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
func doNewChunkStreamReader(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64)
var totalSize int64 var totalSize int64
for x := chunkViews.Front(); x != nil; x = x.Next { for x := chunkViews.Front(); x != nil; x = x.Next {
@ -238,20 +239,20 @@ func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, ch
} }
} }
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
func NewChunkStreamReaderFromFiler(ctx context.Context, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
return masterClient.LookupFileId(fileId)
lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrl []string, err error) {
return masterClient.LookupFileId(ctx, fileId)
} }
return doNewChunkStreamReader(lookupFileIdFn, chunks)
return doNewChunkStreamReader(ctx, lookupFileIdFn, chunks)
} }
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
lookupFileIdFn := LookupFn(filerClient) lookupFileIdFn := LookupFn(filerClient)
return doNewChunkStreamReader(lookupFileIdFn, chunks)
return doNewChunkStreamReader(context.Background(), lookupFileIdFn, chunks)
} }
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
@ -343,7 +344,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
} }
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
urlStrings, err := c.lookupFileId(chunkView.FileId)
urlStrings, err := c.lookupFileId(context.Background(), chunkView.FileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err return err
@ -351,7 +352,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer var buffer bytes.Buffer
var shouldRetry bool var shouldRetry bool
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data) buffer.Write(data)
}) })
if !shouldRetry { if !shouldRetry {

2
weed/mount/weedfs.go

@ -212,7 +212,7 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" { if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
} }
} }

2
weed/mount/weedfs_file_sync.go

@ -148,7 +148,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks()) manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks())
chunks, _ := filer.CompactFileChunks(wfs.LookupFn(), nonManifestChunks)
chunks, _ := filer.CompactFileChunks(context.Background(), wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks) chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks)
if manifestErr != nil { if manifestErr != nil {
// not good, but should be ok // not good, but should be ok

4
weed/mq/logstore/log_to_parquet.go

@ -378,7 +378,7 @@ func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry
return err return err
} }
func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(ctx context.Context, fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
if len(entry.Content) > 0 { if len(entry.Content) > 0 {
// skip .offset files // skip .offset files
return return
@ -392,7 +392,7 @@ func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetU
fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name) fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
return return
} }
urlStrings, err = lookupFileIdFn(chunk.FileId)
urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId)
if err != nil { if err != nil {
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
return return

2
weed/mq/logstore/read_log_from_disk.go

@ -75,7 +75,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
return return
} }
urlStrings, err = lookupFileIdFn(chunk.FileId)
urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId)
if err != nil { if err != nil {
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
return return

2
weed/mq/logstore/read_parquet_to_log.go

@ -55,7 +55,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
// create readerAt for the parquet file // create readerAt for the parquet file
fileSize := filer.FileSize(entry) fileSize := filer.FileSize(entry)
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize)) readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize))

6
weed/operation/assign_file_id.go

@ -139,7 +139,7 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri
return return
} }
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest) requests = append(requests, primaryRequest)
@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue continue
} }
lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{ req := &master_pb.AssignRequest{
Count: request.Count, Count: request.Count,
Replication: request.Replication, Replication: request.Replication,
@ -165,7 +165,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
DataNode: request.DataNode, DataNode: request.DataNode,
WritableVolumeCount: request.WritableVolumeCount, WritableVolumeCount: request.WritableVolumeCount,
} }
resp, grpcErr := masterClient.Assign(context.Background(), req)
resp, grpcErr := masterClient.Assign(ctx, req)
if grpcErr != nil { if grpcErr != nil {
return grpcErr return grpcErr
} }

2
weed/operation/assign_file_id_test.go

@ -60,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) {
func BenchmarkUnaryAssign(b *testing.B) { func BenchmarkUnaryAssign(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
Assign(func(_ context.Context) pb.ServerAddress {
Assign(context.Background(), func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333") return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), &VolumeAssignRequest{ }, grpc.WithInsecure(), &VolumeAssignRequest{
Count: 1, Count: 1,

5
weed/operation/needle_parse_test.go

@ -2,6 +2,7 @@ package operation
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -58,7 +59,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil, PairMap: nil,
Jwt: "", Jwt: "",
} }
uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption)
uploadResult, err, data := uploader.Upload(context.Background(), bytes.NewReader([]byte(textContent)), uploadOption)
if len(data) != len(textContent) { if len(data) != len(textContent) {
t.Errorf("data actual %d expected %d", len(data), len(textContent)) t.Errorf("data actual %d expected %d", len(data), len(textContent))
} }
@ -86,7 +87,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil, PairMap: nil,
Jwt: "", Jwt: "",
} }
uploader.Upload(bytes.NewReader(gzippedData), uploadOption)
uploader.Upload(context.Background(), bytes.NewReader(gzippedData), uploadOption)
} }
/* /*

12
weed/operation/submit.go

@ -62,7 +62,7 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*
Ttl: pref.Ttl, Ttl: pref.Ttl,
DiskType: pref.DiskType, DiskType: pref.DiskType,
} }
ret, err := Assign(masterFn, grpcDialOption, ar)
ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil { if err != nil {
for index := range files { for index := range files {
results[index].Error = err.Error() results[index].Error = err.Error()
@ -155,7 +155,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
Ttl: fi.Pref.Ttl, Ttl: fi.Pref.Ttl,
DiskType: fi.Pref.DiskType, DiskType: fi.Pref.DiskType,
} }
ret, err = Assign(masterFn, grpcDialOption, ar)
ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil { if err != nil {
return return
} }
@ -169,7 +169,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
Ttl: fi.Pref.Ttl, Ttl: fi.Pref.Ttl,
DiskType: fi.Pref.DiskType, DiskType: fi.Pref.DiskType,
} }
ret, err = Assign(masterFn, grpcDialOption, ar)
ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil { if err != nil {
// delete all uploaded chunks // delete all uploaded chunks
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
@ -223,7 +223,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
return 0, e return 0, e
} }
ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
ret, e, _ := uploader.Upload(context.Background(), fi.Reader, uploadOption)
if e != nil { if e != nil {
return 0, e return 0, e
} }
@ -267,7 +267,7 @@ func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn,
return 0, uploaderError return 0, uploaderError
} }
uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
uploadResult, uploadError, _ := uploader.Upload(context.Background(), reader, uploadOption)
if uploadError != nil { if uploadError != nil {
return 0, uploadError return 0, uploadError
} }
@ -299,6 +299,6 @@ func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt secu
return e return e
} }
_, e = uploader.UploadData(buf, uploadOption)
_, e = uploader.UploadData(context.Background(), buf, uploadOption)
return e return e
} }

29
weed/operation/upload_content.go

@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
uploadOption.Jwt = auth uploadOption.Jwt = auth
var uploadErr error var uploadErr error
uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
return uploadErr return uploadErr
} }
if uploadOption.RetryForever { if uploadOption.RetryForever {
@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
} }
// Upload sends a POST request to a volume server to upload the content with adjustable compression level // Upload sends a POST request to a volume server to upload the content with adjustable compression level
func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
uploadResult, err = uploader.retriedUploadData(data, option)
func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
uploadResult, err = uploader.retriedUploadData(ctx, data, option)
return return
} }
// Upload sends a POST request to a volume server to upload the content with fast compression // Upload sends a POST request to a volume server to upload the content with fast compression
func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
uploadResult, err, data = uploader.doUpload(reader, option)
func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
uploadResult, err, data = uploader.doUpload(ctx, reader, option)
return return
} }
func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader) bytesReader, ok := reader.(*util.BytesReader)
if ok { if ok {
data = bytesReader.Bytes data = bytesReader.Bytes
@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo
return return
} }
} }
uploadResult, uploadErr := uploader.retriedUploadData(data, option)
uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
return uploadResult, uploadErr, data return uploadResult, uploadErr, data
} }
func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
if i > 0 { if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1))) time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
} }
uploadResult, err = uploader.doUploadData(data, option)
uploadResult, err = uploader.doUploadData(ctx, data, option)
if err == nil { if err == nil {
uploadResult.RetryCount = i uploadResult.RetryCount = i
return return
@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (
return return
} }
func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false shouldGzipNow := false
if !option.IsInputCompressed { if !option.IsInputCompressed {
@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
} }
// upload data // upload data
uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(encryptedData) _, err = w.Write(encryptedData)
return return
}, len(encryptedData), &UploadOption{ }, len(encryptedData), &UploadOption{
@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
} }
} else { } else {
// upload data // upload data
uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(data) _, err = w.Write(data)
return return
}, len(data), &UploadOption{ }, len(data), &UploadOption{
@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
return uploadResult, err return uploadResult, err
} }
func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer var body_writer *multipart.Writer
var reqReader *bytes.Reader var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer var buf *bytebufferpool.ByteBuffer
@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er
if option.Jwt != "" { if option.Jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
} }
util.ReqWithRequestId(req, ctx)
// print("+") // print("+")
resp, post_err := uploader.httpClient.Do(req) resp, post_err := uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp) defer util_http.CloseResponse(resp)

9
weed/pb/grpc_client_server.go

@ -127,8 +127,8 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler, handler grpc.UnaryHandler,
) (interface{}, error) { ) (interface{}, error) {
md, _ := metadata.FromIncomingContext(ctx)
idList := md.Get(util.RequestIDKey)
incomingMd, _ := metadata.FromIncomingContext(ctx)
idList := incomingMd.Get(util.RequestIDKey)
var reqID string var reqID string
if len(idList) > 0 { if len(idList) > 0 {
reqID = idList[0] reqID = idList[0]
@ -137,6 +137,11 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
reqID = uuid.New().String() reqID = uuid.New().String()
} }
ctx = metadata.NewOutgoingContext(ctx,
metadata.New(map[string]string{
util.RequestIDKey: reqID,
}))
ctx = util.WithRequestID(ctx, reqID) ctx = util.WithRequestID(ctx, reqID)
grpc.SetTrailer(ctx, metadata.Pairs(util.RequestIDKey, reqID)) grpc.SetTrailer(ctx, metadata.Pairs(util.RequestIDKey, reqID))

5
weed/replication/repl_util/replication_util.go

@ -1,6 +1,7 @@
package repl_util package repl_util
import ( import (
"context"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/replication/source" "github.com/seaweedfs/seaweedfs/weed/replication/source"
@ -12,7 +13,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS
for x := chunkViews.Front(); x != nil; x = x.Next { for x := chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value chunk := x.Value
fileUrls, err := filerSource.LookupFileId(chunk.FileId)
fileUrls, err := filerSource.LookupFileId(context.Background(), chunk.FileId)
if err != nil { if err != nil {
return err return err
} }
@ -21,7 +22,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS
var shouldRetry bool var shouldRetry bool
for _, fileUrl := range fileUrls { for _, fileUrl := range fileUrls {
shouldRetry, err = util_http.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
writeErr = writeFunc(data) writeErr = writeFunc(data)
}) })
if err != nil { if err != nil {

2
weed/replication/sink/azuresink/azure_sink.go

@ -105,7 +105,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
} }
totalSize := filer.FileSize(entry) totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
// Create a URL that references a to-be-created blob in your // Create a URL that references a to-be-created blob in your
// Azure Storage account's container. // Azure Storage account's container.

2
weed/replication/sink/b2sink/b2_sink.go

@ -99,7 +99,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
} }
totalSize := filer.FileSize(entry) totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
bucket, err := g.client.Bucket(context.Background(), g.bucket) bucket, err := g.client.Bucket(context.Background(), g.bucket)
if err != nil { if err != nil {

8
weed/replication/sink/filersink/filer_sink.go

@ -198,7 +198,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
glog.V(2).Infof("late updates %s", key) glog.V(2).Infof("late updates %s", key)
} else { } else {
// find out what changed // find out what changed
deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
deletedChunks, newChunks, err := compareChunks(context.Background(), filer.LookupFn(fs), oldEntry, newEntry)
if err != nil { if err != nil {
return true, fmt.Errorf("replicate %s compare chunks error: %v", key, err) return true, fmt.Errorf("replicate %s compare chunks error: %v", key, err)
} }
@ -242,12 +242,12 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
}) })
} }
func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64)
func compareChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := filer.ResolveChunkManifest(ctx, lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64)
if aErr != nil { if aErr != nil {
return nil, nil, aErr return nil, nil, aErr
} }
bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64)
bData, bMeta, bErr := filer.ResolveChunkManifest(ctx, lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64)
if bErr != nil { if bErr != nil {
return nil, nil, bErr return nil, nil, bErr
} }

2
weed/replication/sink/gcssink/gcs_sink.go

@ -97,7 +97,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
} }
totalSize := filer.FileSize(entry) totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
defer wc.Close() defer wc.Close()

3
weed/replication/sink/localsink/local_sink.go

@ -1,6 +1,7 @@
package localsink package localsink
import ( import (
"context"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -75,7 +76,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
glog.V(4).Infof("Create Entry key: %s", key) glog.V(4).Infof("Create Entry key: %s", key)
totalSize := filer.FileSize(entry) totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
chunkViews := filer.ViewFromChunks(context.Background(), localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
dir := filepath.Dir(key) dir := filepath.Dir(key)

6
weed/replication/source/filer_source.go

@ -55,7 +55,7 @@ func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, rea
return nil return nil
} }
func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) {
func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls []string, err error) {
vid2Locations := make(map[string]*filer_pb.Locations) vid2Locations := make(map[string]*filer_pb.Locations)
@ -63,7 +63,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid}, VolumeIds: []string{vid},
}) })
if err != nil { if err != nil {
@ -110,7 +110,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
} }
fileUrls, err := fs.LookupFileId(fileId)
fileUrls, err := fs.LookupFileId(context.Background(), fileId)
if err != nil { if err != nil {
return "", nil, nil, err return "", nil, nil, err
} }

5
weed/server/common.go

@ -129,6 +129,7 @@ func debug(params ...interface{}) {
} }
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) { func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) {
ctx := r.Context()
m := make(map[string]interface{}) m := make(map[string]interface{})
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@ -163,7 +164,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
Ttl: r.FormValue("ttl"), Ttl: r.FormValue("ttl"),
DiskType: r.FormValue("disk"), DiskType: r.FormValue("disk"),
} }
assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar)
assignResult, ae := operation.Assign(ctx, masterFn, grpcDialOption, ar)
if ae != nil { if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae) writeJsonError(w, r, http.StatusInternalServerError, ae)
return return
@ -189,7 +190,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return
} }
uploadResult, err := uploader.UploadData(pu.Data, uploadOption)
uploadResult, err := uploader.UploadData(ctx, pu.Data, uploadOption)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return

26
weed/server/filer_grpc_server.go

@ -121,7 +121,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil return resp, nil
} }
func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId) fid, err := needle.ParseFileIdFromString(fileId)
if err != nil { if err != nil {
return nil, err return nil, err
@ -142,12 +142,12 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
resp = &filer_pb.CreateEntryResponse{} resp = &filer_pb.CreateEntryResponse{}
chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
if err2 != nil { if err2 != nil {
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
} }
so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -177,7 +177,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
} }
chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry)
chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry)
if err2 != nil { if err2 != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
} }
@ -201,11 +201,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err return &filer_pb.UpdateEntryResponse{}, err
} }
func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
// remove old chunks if not included in the new ones // remove old chunks if not included in the new ones
if existingEntry != nil { if existingEntry != nil {
garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
if err != nil { if err != nil {
return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %v", err) return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %v", err)
} }
@ -214,11 +214,11 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
// files with manifest chunks are usually large and append only, skip calculating covered chunks // files with manifest chunks are usually large and append only, skip calculating covered chunks
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks()) manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks())
chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks)
chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks)
garbage = append(garbage, coveredChunks...) garbage = append(garbage, coveredChunks...)
if newEntry.Attributes != nil { if newEntry.Attributes != nil {
so, _ := fs.detectStorageOption(fullpath,
so, _ := fs.detectStorageOption(ctx, fullpath,
"", "",
"", "",
newEntry.Attributes.TtlSec, newEntry.Attributes.TtlSec,
@ -227,7 +227,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
"", "",
"", "",
) // ignore readonly error for capacity needed to manifestize ) // ignore readonly error for capacity needed to manifestize
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
if err != nil { if err != nil {
// not good, but should be ok // not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", err) glog.V(0).Infof("MaybeManifestize: %v", err)
@ -271,12 +271,12 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
} }
entry.Chunks = append(entry.GetChunks(), req.Chunks...) entry.Chunks = append(entry.GetChunks(), req.Chunks...)
so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "")
so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
if err != nil { if err != nil {
glog.Warningf("detectStorageOption: %v", err) glog.Warningf("detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err return &filer_pb.AppendToEntryResponse{}, err
} }
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.GetChunks())
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
if err != nil { if err != nil {
// not good, but should be ok // not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", err) glog.V(0).Infof("MaybeManifestize: %v", err)
@ -305,7 +305,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
req.DiskType = fs.option.DiskType req.DiskType = fs.option.DiskType
} }
so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
if err != nil { if err != nil {
glog.V(3).Infof("AssignVolume: %v", err) glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
@ -313,7 +313,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil { if err != nil {
glog.V(3).Infof("AssignVolume: %v", err) glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil

8
weed/server/filer_grpc_server_remote.go

@ -64,7 +64,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
} }
// detect storage option // detect storage option
so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
so, err := fs.detectStorageOption(ctx, req.Directory, "", "", 0, "", "", "", "")
if err != nil { if err != nil {
return resp, err return resp, err
} }
@ -97,7 +97,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
} }
// assign one volume server // assign one volume server
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil { if err != nil {
fetchAndWriteErr = err fetchAndWriteErr = err
return return
@ -184,10 +184,10 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
// this skips meta data log events // this skips meta data log events
if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil { if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
fs.filer.DeleteUncommittedChunks(chunks)
fs.filer.DeleteUncommittedChunks(ctx, chunks)
return nil, err return nil, err
} }
fs.filer.DeleteChunks(entry.FullPath, garbage)
fs.filer.DeleteChunks(ctx, entry.FullPath, garbage)
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil) fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)

6
weed/server/filer_server_handlers_proxy.go

@ -3,6 +3,7 @@ package weed_server
import ( import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/util/mem"
"io" "io"
@ -31,8 +32,8 @@ func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrit
} }
func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) { func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId)
ctx := r.Context()
urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
if err != nil { if err != nil {
glog.Errorf("locate %s: %v", fileId, err) glog.Errorf("locate %s: %v", fileId, err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
@ -53,6 +54,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
proxyReq.Header.Set("Host", r.Host) proxyReq.Header.Set("Host", r.Host)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
util.ReqWithRequestId(proxyReq, ctx)
for header, values := range r.Header { for header, values := range r.Header {
for _, value := range values { for _, value := range values {

12
weed/server/filer_server_handlers_read.go

@ -2,7 +2,6 @@ package weed_server
import ( import (
"bytes" "bytes"
"context"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"errors" "errors"
@ -89,14 +88,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent
} }
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
path := r.URL.Path path := r.URL.Path
isForDirectory := strings.HasSuffix(path, "/") isForDirectory := strings.HasSuffix(path, "/")
if isForDirectory && len(path) > 1 { if isForDirectory && len(path) > 1 {
path = path[:len(path)-1] path = path[:len(path)-1]
} }
entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path))
entry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
if err != nil { if err != nil {
if path == "/" { if path == "/" {
fs.listDirectoryHandler(w, r) fs.listDirectoryHandler(w, r)
@ -147,6 +146,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if query.Get("metadata") == "true" { if query.Get("metadata") == "true" {
if query.Get("resolveManifest") == "true" { if query.Get("resolveManifest") == "true" {
if entry.Chunks, _, err = filer.ResolveChunkManifest( if entry.Chunks, _, err = filer.ResolveChunkManifest(
ctx,
fs.filer.MasterClient.GetLookupFileIdFunction(), fs.filer.MasterClient.GetLookupFileIdFunction(),
entry.GetChunks(), 0, math.MaxInt64); err != nil { entry.GetChunks(), 0, math.MaxInt64); err != nil {
err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error()) err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error())
@ -242,7 +242,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if shouldResize { if shouldResize {
data := mem.Allocate(int(totalSize)) data := mem.Allocate(int(totalSize))
defer mem.Free(data) defer mem.Free(data)
err := filer.ReadAll(data, fs.filer.MasterClient, entry.GetChunks())
err := filer.ReadAll(ctx, data, fs.filer.MasterClient, entry.GetChunks())
if err != nil { if err != nil {
glog.Errorf("failed to read %s: %v", path, err) glog.Errorf("failed to read %s: %v", path, err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
@ -268,7 +268,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
chunks := entry.GetChunks() chunks := entry.GetChunks()
if entry.IsInRemoteOnly() { if entry.IsInRemoteOnly() {
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
if resp, err := fs.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
if resp, err := fs.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: dir, Directory: dir,
Name: name, Name: name,
}); err != nil { }); err != nil {
@ -280,7 +280,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
} }
} }
streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil { if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to prepare stream content %s: %v", r.URL, err) glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)

5
weed/server/filer_server_handlers_read_dir.go

@ -1,7 +1,6 @@
package weed_server package weed_server
import ( import (
"context"
"errors" "errors"
"net/http" "net/http"
"strconv" "strconv"
@ -18,7 +17,7 @@ import (
// sub directories are listed on the first page, when "lastFileName" // sub directories are listed on the first page, when "lastFileName"
// is empty. // is empty.
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if fs.option.ExposeDirectoryData == false { if fs.option.ExposeDirectoryData == false {
writeJsonError(w, r, http.StatusForbidden, errors.New("ui is disabled")) writeJsonError(w, r, http.StatusForbidden, errors.New("ui is disabled"))
return return
@ -40,7 +39,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
namePattern := r.FormValue("namePattern") namePattern := r.FormValue("namePattern")
namePatternExclude := r.FormValue("namePatternExclude") namePatternExclude := r.FormValue("namePatternExclude")
entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
if err != nil { if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)

5
weed/server/filer_server_handlers_tagging.go

@ -1,7 +1,6 @@
package weed_server package weed_server
import ( import (
"context"
"net/http" "net/http"
"strings" "strings"
@ -14,7 +13,7 @@ import (
// curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging // curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging
func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
ctx := r.Context()
path := r.URL.Path path := r.URL.Path
if strings.HasSuffix(path, "/") { if strings.HasSuffix(path, "/") {
@ -57,7 +56,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
// curl -X DELETE http://localhost:8888/path/to/a/file?tagging // curl -X DELETE http://localhost:8888/path/to/a/file?tagging
func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
ctx := r.Context()
path := r.URL.Path path := r.URL.Path
if strings.HasSuffix(path, "/") { if strings.HasSuffix(path, "/") {

14
weed/server/filer_server_handlers_write.go

@ -34,7 +34,7 @@ type FilerPostResult struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc() stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc()
start := time.Now() start := time.Now()
@ -44,7 +44,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
ar, altRequest := so.ToAssignRequests(1) ar, altRequest := so.ToAssignRequests(1)
assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
if ae != nil { if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae) glog.Errorf("failing to assign a file id: %v", ae)
err = ae err = ae
@ -70,7 +70,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
} }
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) { func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
ctx := context.Background()
ctx := r.Context()
destination := r.RequestURI destination := r.RequestURI
if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" { if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" {
@ -78,7 +78,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
} }
query := r.URL.Query() query := r.URL.Query()
so, err := fs.detectStorageOption0(destination,
so, err := fs.detectStorageOption0(ctx, destination,
query.Get("collection"), query.Get("collection"),
query.Get("replication"), query.Get("replication"),
query.Get("ttl"), query.Get("ttl"),
@ -240,7 +240,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
rule := fs.filer.FilerConf.MatchStorageRule(requestURI) rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
@ -280,14 +280,14 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}, nil }, nil
} }
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
func (fs *FilerServer) detectStorageOption0(ctx context.Context, requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
ttl, err := needle.ReadTTL(qTtl) ttl, err := needle.ReadTTL(qTtl)
if err != nil { if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err) glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
} }
so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
so, err := fs.detectStorageOption(ctx, requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
if so != nil { if so != nil {
if fsync == "false" { if fsync == "false" {
so.Fsync = false so.Fsync = false

22
weed/server/filer_server_handlers_write_autochunk.go

@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return return
} }
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -107,12 +107,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
md5bytes = md5Hash.Sum(nil) md5bytes = md5Hash.Sum(nil)
headerMd5 := r.Header.Get("Content-Md5") headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) { if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
fs.filer.DeleteUncommittedChunks(fileChunks)
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.") return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
} }
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil { if replyerr != nil {
fs.filer.DeleteUncommittedChunks(fileChunks)
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
} }
return return
@ -130,7 +130,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return nil, nil, err return nil, nil, err
} }
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -139,12 +139,12 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
md5bytes = md5Hash.Sum(nil) md5bytes = md5Hash.Sum(nil)
headerMd5 := r.Header.Get("Content-Md5") headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) { if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
fs.filer.DeleteUncommittedChunks(fileChunks)
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.") return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
} }
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil { if replyerr != nil {
fs.filer.DeleteUncommittedChunks(fileChunks)
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
} }
return return
@ -299,14 +299,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
} }
// maybe concatenate small chunks into one whole chunk // maybe concatenate small chunks into one whole chunk
mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks)
mergedChunks, replyerr = fs.maybeMergeChunks(ctx, so, newChunks)
if replyerr != nil { if replyerr != nil {
glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr) glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
mergedChunks = newChunks mergedChunks = newChunks
} }
// maybe compact entry chunks // maybe compact entry chunks
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks)
if replyerr != nil { if replyerr != nil {
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
return return
@ -348,7 +348,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return filerResult, replyerr return filerResult, replyerr
} }
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) { return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) {
var fileId string var fileId string
@ -356,7 +356,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
err := util.Retry("saveAsChunk", func() error { err := util.Retry("saveAsChunk", func() error {
// assign one file id for one chunk // assign one file id for one chunk
assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
if assignErr != nil { if assignErr != nil {
return assignErr return assignErr
} }
@ -380,7 +380,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
} }
var uploadErr error var uploadErr error
uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption)
uploadResult, uploadErr, _ = uploader.Upload(ctx, reader, uploadOption)
if uploadErr != nil { if uploadErr != nil {
return uploadErr return uploadErr
} }

6
weed/server/filer_server_handlers_write_cipher.go

@ -19,7 +19,7 @@ import (
// handling single chunk POST or PUT upload // handling single chunk POST or PUT upload
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) { func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) {
fileId, urlLocation, auth, err := fs.assignNewFileInfo(so)
fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
if err != nil || fileId == "" || urlLocation == "" { if err != nil || fileId == "" || urlLocation == "" {
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter) return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter)
@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr) return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr)
} }
uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption)
uploadResult, uploadError := uploader.UploadData(ctx, uncompressedData, uploadOption)
if uploadError != nil { if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError) return nil, fmt.Errorf("upload to volume server: %v", uploadError)
} }
@ -97,7 +97,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
} }
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil { if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
fs.filer.DeleteUncommittedChunks(entry.GetChunks())
fs.filer.DeleteUncommittedChunks(ctx, entry.GetChunks())
err = dbErr err = dbErr
filerResult.Error = dbErr.Error() filerResult.Error = dbErr.Error()
return return

13
weed/server/filer_server_handlers_write_merge.go

@ -1,6 +1,7 @@
package weed_server package weed_server
import ( import (
"context"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
@ -12,7 +13,7 @@ import (
const MergeChunkMinCount int = 1000 const MergeChunkMinCount int = 1000
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
// Only merge small chunks more than half of the file // Only merge small chunks more than half of the file
var chunkSize = fs.option.MaxMB * 1024 * 1024 var chunkSize = fs.option.MaxMB * 1024 * 1024
var smallChunk, sumChunk int var smallChunk, sumChunk int
@ -33,16 +34,16 @@ func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks
return inputChunks, nil return inputChunks, nil
} }
return fs.mergeChunks(so, inputChunks, minOffset)
return fs.mergeChunks(ctx, so, inputChunks, minOffset)
} }
func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks)
_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent) _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
if mergeErr != nil { if mergeErr != nil {
return nil, mergeErr return nil, mergeErr
} }
mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
if mergeErr != nil { if mergeErr != nil {
return return
} }
@ -54,7 +55,7 @@ func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*f
} }
} }
garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
if err != nil { if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
mergedChunks, inputChunks) mergedChunks, inputChunks)

21
weed/server/filer_server_handlers_write_upload.go

@ -2,6 +2,7 @@ package weed_server
import ( import (
"bytes" "bytes"
"context"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"hash" "hash"
@ -27,7 +28,7 @@ var bufPool = sync.Pool{
}, },
} }
func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
func (fs *FilerServer) uploadRequestToChunks(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
query := r.URL.Query() query := r.URL.Query()
isAppend := isAppend(r) isAppend := isAppend(r)
@ -45,10 +46,10 @@ func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Requ
chunkOffset = offsetInt chunkOffset = offsetInt
} }
return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
return fs.uploadReaderToChunks(ctx, reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
} }
func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
md5Hash = md5.New() md5Hash = md5.New()
chunkOffset = startOffset chunkOffset = startOffset
@ -117,7 +118,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
wg.Done() wg.Done()
}() }()
chunks, toChunkErr := fs.dataToChunk(fileName, contentType, buf.Bytes(), offset, so)
chunks, toChunkErr := fs.dataToChunk(ctx, fileName, contentType, buf.Bytes(), offset, so)
if toChunkErr != nil { if toChunkErr != nil {
uploadErrLock.Lock() uploadErrLock.Lock()
if uploadErr == nil { if uploadErr == nil {
@ -152,7 +153,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
for _, chunk := range fileChunks { for _, chunk := range fileChunks {
glog.V(4).Infof("purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) glog.V(4).Infof("purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
} }
fs.filer.DeleteUncommittedChunks(fileChunks)
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, md5Hash, 0, uploadErr, nil return nil, md5Hash, 0, uploadErr, nil
} }
slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) int { slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) int {
@ -161,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
return fileChunks, md5Hash, chunkOffset, nil, smallContent return fileChunks, md5Hash, chunkOffset, nil, smallContent
} }
func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc() stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc()
start := time.Now() start := time.Now()
@ -184,14 +185,14 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
return nil, err, []byte{} return nil, err, []byte{}
} }
uploadResult, err, data := uploader.Upload(limitedReader, uploadOption)
uploadResult, err, data := uploader.Upload(ctx, limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 { if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
} }
return uploadResult, err, data return uploadResult, err, data
} }
func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
dataReader := util.NewBytesReader(data) dataReader := util.NewBytesReader(data)
// retry to assign a different file id // retry to assign a different file id
@ -203,14 +204,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
err := util.Retry("filerDataToChunk", func() error { err := util.Retry("filerDataToChunk", func() error {
// assign one file id for one chunk // assign one file id for one chunk
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so)
if uploadErr != nil { if uploadErr != nil {
glog.V(4).Infof("retry later due to assign error: %v", uploadErr) glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
return uploadErr return uploadErr
} }
// upload the chunk to the volume server // upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil { if uploadErr != nil {
glog.V(4).Infof("retry later due to upload error: %v", uploadErr) glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()

2
weed/server/volume_grpc_remote.go

@ -77,7 +77,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
return return
} }
if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil {
err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr) err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
} }
}(replica.Url) }(replica.Url)

3
weed/server/volume_server_handlers_write.go

@ -16,6 +16,7 @@ import (
) )
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if e := r.ParseForm(); e != nil { if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e) glog.V(0).Infoln("form parse error:", e)
writeJsonError(w, r, http.StatusBadRequest, e) writeJsonError(w, r, http.StatusBadRequest, e)
@ -45,7 +46,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
isUnchanged, writeError := topology.ReplicatedWrite(ctx, vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
if writeError != nil { if writeError != nil {
writeJsonError(w, r, http.StatusInternalServerError, writeError) writeJsonError(w, r, http.StatusInternalServerError, writeError)
return return

2
weed/server/webdav_server.go

@ -556,7 +556,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF return 0, io.EOF
} }
if f.visibleIntervals == nil { if f.visibleIntervals == nil {
f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(context.Background(), filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.reader = nil f.reader = nil
} }
if f.reader == nil { if f.reader == nil {

2
weed/shell/command_fs_merge_volumes.go

@ -351,7 +351,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
jwt = security.GenJwtForVolumeServer(security.SigningKey(signingKey), expiresAfterSec, toFid.String()) jwt = security.GenJwtForVolumeServer(security.SigningKey(signingKey), expiresAfterSec, toFid.String())
} }
_, err, _ = uploader.Upload(reader, &operation.UploadOption{
_, err, _ = uploader.Upload(context.Background(), reader, &operation.UploadOption{
UploadUrl: uploadURL, UploadUrl: uploadURL,
Filename: filename, Filename: filename,
IsInputCompressed: isCompressed, IsInputCompressed: isCompressed,

2
weed/shell/command_fs_verify.go

@ -286,7 +286,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
return nil return nil
} }
} }
dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(context.Background(), filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
if resolveErr != nil { if resolveErr != nil {
return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr) return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
} }

2
weed/shell/command_volume_fsck.go

@ -240,7 +240,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
if *c.verbose && entry.Entry.IsDirectory { if *c.verbose && entry.Entry.IsDirectory {
fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
} }
dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(context.Background(), filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
if resolveErr != nil { if resolveErr != nil {
return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr) return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
} }

5
weed/topology/store_replicate.go

@ -1,6 +1,7 @@
package topology package topology
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -23,7 +24,7 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
) )
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
func ReplicatedWrite(ctx context.Context, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.GetJwt(r)
@ -121,7 +122,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
return err return err
} }
_, err = uploader.UploadData(n.Data, uploadOption)
_, err = uploader.UploadData(ctx, n.Data, uploadOption)
if err != nil { if err != nil {
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
} }

20
weed/util/http/http_global_client_util.go

@ -2,6 +2,7 @@ package http
import ( import (
"compress/gzip" "compress/gzip"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -214,11 +215,11 @@ func NormalizeUrl(url string) (string, error) {
return GetGlobalHttpClient().NormalizeHttpScheme(url) return GetGlobalHttpClient().NormalizeHttpScheme(url)
} }
func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
if cipherKey != nil { if cipherKey != nil {
var n int var n int
_, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
_, err := readEncryptedUrl(ctx, fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data) n = copy(buf, data)
}) })
return int64(n), err return int64(n), err
@ -286,13 +287,13 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err return n, err
} }
func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
func ReadUrlAsStream(ctx context.Context, fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
return ReadUrlAsStreamAuthenticated(ctx, fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
} }
func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil { if cipherKey != nil {
return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
} }
req, err := http.NewRequest(http.MethodGet, fileUrl, nil) req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
@ -306,6 +307,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
} else { } else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
} }
util.ReqWithRequestId(req, ctx)
r, err := GetGlobalHttpClient().Do(req) r, err := GetGlobalHttpClient().Do(req)
if err != nil { if err != nil {
@ -351,7 +353,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
} }
func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt) encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
if err != nil { if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
@ -447,7 +449,7 @@ func (r *CountingReader) Read(p []byte) (n int, err error) {
return n, err return n, err
} }
func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
var shouldRetry bool var shouldRetry bool
@ -457,7 +459,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
if strings.Contains(urlString, "%") { if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString) urlString = url.PathEscape(urlString)
} }
shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
if n < len(buffer) { if n < len(buffer) {
x := copy(buffer[n:], data) x := copy(buffer[n:], data)
n += x n += x

9
weed/util/request_id.go

@ -1,6 +1,9 @@
package util package util
import "context"
import (
"context"
"net/http"
)
const ( const (
RequestIdHttpHeader = "X-Request-ID" RequestIdHttpHeader = "X-Request-ID"
@ -18,3 +21,7 @@ func GetRequestID(ctx context.Context) string {
func WithRequestID(ctx context.Context, id string) context.Context { func WithRequestID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, RequestIDKey, id) return context.WithValue(ctx, RequestIDKey, id)
} }
func ReqWithRequestId(req *http.Request, ctx context.Context) {
req.Header.Set(RequestIdHttpHeader, GetRequestID(ctx))
}

9
weed/wdclient/masterclient.go

@ -56,13 +56,14 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
return mc.LookupFileIdWithFallback return mc.LookupFileIdWithFallback
} }
func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
fullUrls, err = mc.vidMap.LookupFileId(fileId)
func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId)
if err == nil && len(fullUrls) > 0 { if err == nil && len(fullUrls) > 0 {
return return
} }
err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId}, VolumeOrFileIds: []string{fileId},
}) })
if err != nil { if err != nil {

5
weed/wdclient/vid_map.go

@ -1,6 +1,7 @@
package wdclient package wdclient
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
@ -21,7 +22,7 @@ type HasLookupFileIdFunction interface {
GetLookupFileIdFunction() LookupFileIdFunctionType GetLookupFileIdFunction() LookupFileIdFunctionType
} }
type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
type LookupFileIdFunctionType func(ctx context.Context, fileId string) (targetUrls []string, err error)
type Location struct { type Location struct {
Url string `json:"url,omitempty"` Url string `json:"url,omitempty"`
@ -99,7 +100,7 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return return
} }
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
func (vc *vidMap) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",") parts := strings.Split(fileId, ",")
if len(parts) != 2 { if len(parts) != 2 {
return nil, errors.New("Invalid fileId " + fileId) return nil, errors.New("Invalid fileId " + fileId)

Loading…
Cancel
Save