Browse Source

fix: use client ID instead of timeout for gRPC signature parameter

The pb.WithGrpcFilerClient signature parameter is meant to be a client
identifier for logging and tracking (added as 'sw-client-id' gRPC metadata
in streaming mode), not a timeout value.

Problem:
  timeoutMs := int32(fc.grpcTimeout.Milliseconds())  // 5000 (5 seconds)
  err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, ...)

  - Passing timeout (5000ms) as signature/client ID
  - Misuse of API: signature should be a unique client identifier
  - Timeout is already handled by timeoutCtx passed to gRPC call
  - Inconsistent with other callers (all use 0 or proper client ID)

How WithGrpcFilerClient uses signature parameter:
  func WithGrpcClient(..., signature int32, ...) {
    if streamingMode && signature != 0 {
      md := metadata.New(map[string]string{"sw-client-id": fmt.Sprintf("%d", signature)})
      ctx = metadata.NewOutgoingContext(ctx, md)
    }
    ...
  }

It's for client identification, not timeout control!

Fix:
  1. Added clientId int32 field to FilerClient struct
  2. Initialize with rand.Int31() in NewFilerClient for unique ID
  3. Removed timeoutMs variable (and misleading comment)
  4. Use fc.clientId in pb.WithGrpcFilerClient call

Before:
  err := pb.WithGrpcFilerClient(false, timeoutMs, ...)
                                      ^^^^^^^^^ Wrong! (5000)

After:
  err := pb.WithGrpcFilerClient(false, fc.clientId, ...)
                                      ^^^^^^^^^^^^ Correct! (random int31)

Benefits:
  - Correct API usage (signature = client ID, not timeout)
  - Timeout still works via timeoutCtx (unchanged)
  - Consistent with other pb.WithGrpcFilerClient callers
  - Enables proper client tracking on filer side via gRPC metadata
  - Each FilerClient instance has unique ID for debugging

Examples of correct usage elsewhere:
  weed/iamapi/iamapi_server.go:145     pb.WithGrpcFilerClient(false, 0, ...)
  weed/command/s3.go:215               pb.WithGrpcFilerClient(false, 0, ...)
  weed/shell/commands.go:110           pb.WithGrpcFilerClient(streamingMode, 0, ...)

All use 0 (or a proper signature), not a timeout value.
pull/7518/head
chrislu 2 weeks ago
parent
commit
5426dc4027
  1. 10
      weed/wdclient/filer_client.go

10
weed/wdclient/filer_client.go

@ -3,6 +3,7 @@ package wdclient
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -42,7 +43,8 @@ type FilerClient struct {
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
urlPreference UrlPreference urlPreference UrlPreference
grpcTimeout time.Duration grpcTimeout time.Duration
cacheSize int // Number of historical vidMap snapshots to keep
cacheSize int // Number of historical vidMap snapshots to keep
clientId int32 // Unique client identifier for gRPC metadata
} }
// filerVolumeProvider implements VolumeLocationProvider by querying filer // filerVolumeProvider implements VolumeLocationProvider by querying filer
@ -99,6 +101,7 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
urlPreference: urlPref, urlPreference: urlPref,
grpcTimeout: grpcTimeout, grpcTimeout: grpcTimeout,
cacheSize: cacheSize, cacheSize: cacheSize,
clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
} }
// Create provider that references this FilerClient for failover support // Create provider that references this FilerClient for failover support
@ -240,9 +243,6 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
fc := p.filerClient fc := p.filerClient
result := make(map[string][]Location) result := make(map[string][]Location)
// Convert grpcTimeout to milliseconds for the signature parameter
timeoutMs := int32(fc.grpcTimeout.Milliseconds())
// Retry transient failures with exponential backoff // Retry transient failures with exponential backoff
var lastErr error var lastErr error
waitTime := time.Second waitTime := time.Second
@ -271,7 +271,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Create a fresh timeout context for each filer attempt // Create a fresh timeout context for each filer attempt
// This ensures each retry gets the full grpcTimeout, not a diminishing deadline // This ensures each retry gets the full grpcTimeout, not a diminishing deadline
timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout) timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithGrpcFilerClient(false, fc.clientId, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{ resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{
VolumeIds: volumeIds, VolumeIds: volumeIds,
}) })

Loading…
Cancel
Save