Browse Source

fix(filer): apply default disk type after location-prefix resolution in gRPC AssignVolume (#8836)

* fix(filer): apply default disk type after location-prefix resolution in gRPC AssignVolume

The gRPC AssignVolume path was applying the filer's default DiskType to
the request before calling detectStorageOption. This caused the default
to shadow any disk type configured via a filer location-prefix rule,
diverging from the HTTP write path which applies the default only when
no rule matches.

Extract resolveAssignStorageOption to apply the filer default disk type
after detectStorageOption, so location-prefix rules take precedence.

* fix(filer): apply default disk type after location-prefix resolution in TUS upload path

Same class of bug as the gRPC AssignVolume fix: the TUS tusWriteData
handler called detectStorageOption0 but never applied the filer's
default DiskType when no location-prefix rule matched. This made TUS
uploads ignore the -disk flag entirely.
pull/8729/merge
Chris Lu 13 hours ago
committed by GitHub
parent
commit
e5ad5e8d4a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 21
      weed/server/filer_grpc_server.go
  2. 68
      weed/server/filer_grpc_server_test.go
  3. 5
      weed/server/filer_server_tus_handlers.go

21
weed/server/filer_grpc_server.go

@ -388,11 +388,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
if req.DiskType == "" {
req.DiskType = fs.option.DiskType
}
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
so, err := fs.resolveAssignStorageOption(ctx, req)
if err != nil { if err != nil {
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err) glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
@ -424,6 +420,21 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
}, nil }, nil
} }
func (fs *FilerServer) resolveAssignStorageOption(ctx context.Context, req *filer_pb.AssignVolumeRequest) (*operation.StorageOption, error) {
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
if err != nil {
return nil, err
}
// Mirror the HTTP write path: only apply the filer's default disk when the
// matched locationPrefix rule did not already select one.
if so.DiskType == "" {
so.DiskType = fs.option.DiskType
}
return so, nil
}
func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) { func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
glog.V(4).InfofCtx(ctx, "CollectionList %v", req) glog.V(4).InfofCtx(ctx, "CollectionList %v", req)

68
weed/server/filer_grpc_server_test.go

@ -0,0 +1,68 @@
package weed_server
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
func TestResolveAssignStorageOptionUsesBucketRuleBeforeFilerDiskDefault(t *testing.T) {
fc := filer.NewFilerConf()
if err := fc.SetLocationConf(&filer_pb.FilerConf_PathConf{
LocationPrefix: "/buckets/zot",
DiskType: "disk",
}); err != nil {
t.Fatalf("set location conf: %v", err)
}
fs := &FilerServer{
option: &FilerOption{
DiskType: "hdd",
},
filer: &filer.Filer{
DirBucketsPath: "/buckets",
FilerConf: fc,
MaxFilenameLength: 255,
},
}
so, err := fs.resolveAssignStorageOption(context.Background(), &filer_pb.AssignVolumeRequest{
Path: "/buckets/zot/.uploads/upload-id/0001_part.part",
})
if err != nil {
t.Fatalf("resolve assign storage option: %v", err)
}
if got, want := so.Collection, "zot"; got != want {
t.Fatalf("collection = %q, want %q", got, want)
}
if got, want := so.DiskType, "disk"; got != want {
t.Fatalf("disk type = %q, want %q", got, want)
}
}
func TestResolveAssignStorageOptionFallsBackToFilerDiskDefault(t *testing.T) {
fs := &FilerServer{
option: &FilerOption{
DiskType: "hdd",
},
filer: &filer.Filer{
DirBucketsPath: "/buckets",
FilerConf: filer.NewFilerConf(),
MaxFilenameLength: 255,
},
}
so, err := fs.resolveAssignStorageOption(context.Background(), &filer_pb.AssignVolumeRequest{
Path: "/tmp/unmatched/file.bin",
})
if err != nil {
t.Fatalf("resolve assign storage option: %v", err)
}
if got, want := so.DiskType, "hdd"; got != want {
t.Fatalf("disk type = %q, want %q", got, want)
}
}

5
weed/server/filer_server_tus_handlers.go

@ -321,6 +321,11 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of
return 0, fmt.Errorf("detect storage option: %w", err) return 0, fmt.Errorf("detect storage option: %w", err)
} }
// When DiskType is empty, use filer's -disk
if so.DiskType == "" {
so.DiskType = fs.option.DiskType
}
// Read first bytes for MIME type detection // Read first bytes for MIME type detection
sniffSize := int64(512) sniffSize := int64(512)
if contentLength < sniffSize { if contentLength < sniffSize {

Loading…
Cancel
Save