diff --git a/weed/filer/entry.go b/weed/filer/entry.go index 4757d5c9e..25fc26feb 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -92,7 +92,12 @@ func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) { return } message.IsDirectory = entry.IsDirectory() - message.Attributes = EntryAttributeToPb(entry) + // Reuse pre-allocated attributes if available, otherwise allocate + if message.Attributes != nil { + EntryAttributeToExistingPb(entry, message.Attributes) + } else { + message.Attributes = EntryAttributeToPb(entry) + } message.Chunks = entry.GetChunks() message.Extended = entry.Extended message.HardLinkId = entry.HardLinkId diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go index ce9c0484b..1c096c911 100644 --- a/weed/filer/entry_codec.go +++ b/weed/filer/entry_codec.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "os" + "sync" "time" "google.golang.org/protobuf/proto" @@ -11,15 +12,61 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) +// pbEntryPool reduces allocations in EncodeAttributesAndChunks and DecodeAttributesAndChunks +// which are called on every filer store operation +var pbEntryPool = sync.Pool{ + New: func() interface{} { + return &filer_pb.Entry{ + Attributes: &filer_pb.FuseAttributes{}, // Pre-allocate attributes + } + }, +} + +// resetPbEntry clears a protobuf Entry for reuse +func resetPbEntry(e *filer_pb.Entry) { + // Use struct assignment to clear all fields including protobuf internal fields + // (unknownFields, sizeCache) that field-by-field reset would miss + attrs := e.Attributes + *e = filer_pb.Entry{} + if attrs == nil { + attrs = &filer_pb.FuseAttributes{} + } else { + resetFuseAttributes(attrs) + } + e.Attributes = attrs +} + +// resetFuseAttributes clears FuseAttributes for reuse +func resetFuseAttributes(a *filer_pb.FuseAttributes) { + // Use struct assignment to clear all fields including protobuf internal fields + *a = filer_pb.FuseAttributes{} +} + func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { - message := &filer_pb.Entry{} + message := pbEntryPool.Get().(*filer_pb.Entry) + defer func() { + resetPbEntry(message) + pbEntryPool.Put(message) + }() + entry.ToExistingProtoEntry(message) - return proto.Marshal(message) + + data, err := proto.Marshal(message) + if err != nil { + return nil, err + } + + // Copy the data to a new slice since proto.Marshal may return a slice + // that shares memory with the message (not guaranteed to be a copy) + return append([]byte(nil), data...), nil } func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { - - message := &filer_pb.Entry{} + message := pbEntryPool.Get().(*filer_pb.Entry) + defer func() { + resetPbEntry(message) + pbEntryPool.Put(message) + }() if err := proto.Unmarshal(blob, message); err != nil { return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err) @@ -50,6 +97,28 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { } } +// EntryAttributeToExistingPb fills an existing FuseAttributes to avoid allocation. +// Safe to call with nil attr (will return early without populating). +func EntryAttributeToExistingPb(entry *Entry, attr *filer_pb.FuseAttributes) { + if attr == nil { + return + } + attr.Crtime = entry.Attr.Crtime.Unix() + attr.Mtime = entry.Attr.Mtime.Unix() + attr.FileMode = uint32(entry.Attr.Mode) + attr.Uid = entry.Uid + attr.Gid = entry.Gid + attr.Mime = entry.Mime + attr.TtlSec = entry.Attr.TtlSec + attr.UserName = entry.Attr.UserName + attr.GroupName = entry.Attr.GroupNames + attr.SymlinkTarget = entry.Attr.SymlinkTarget + attr.Md5 = entry.Attr.Md5 + attr.FileSize = entry.Attr.FileSize + attr.Rdev = entry.Attr.Rdev + attr.Inode = entry.Attr.Inode +} + func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t := Attr{} diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index 869b3b93d..b5219df20 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -160,18 +160,79 @@ func (fc *FilerConf) DeleteLocationConf(locationPrefix string) { return true }) fc.rules = rules - return } +// emptyPathConf is a singleton for paths with no matching rules +// Callers must NOT mutate the returned value +var emptyPathConf = &filer_pb.FilerConf_PathConf{} + func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf) { + // Convert once to avoid allocation in multi-match case + pathBytes := []byte(path) + + // Fast path: check if any rules match before allocating + // This avoids allocation for paths with no configured rules (common case) + var firstMatch *filer_pb.FilerConf_PathConf + matchCount := 0 + + fc.rules.MatchPrefix(pathBytes, func(key []byte, value *filer_pb.FilerConf_PathConf) bool { + matchCount++ + if matchCount == 1 { + firstMatch = value + return true // continue to check for more matches + } + // Stop after 2 matches - we only need to know if there are multiple + return false + }) + + // No rules match - return singleton (callers must NOT mutate) + if matchCount == 0 { + return emptyPathConf + } + + // Single rule matches - return directly (callers must NOT mutate) + if matchCount == 1 { + return firstMatch + } + + // Multiple rules match - need to merge (allocate new) pathConf = &filer_pb.FilerConf_PathConf{} - fc.rules.MatchPrefix([]byte(path), func(key []byte, value *filer_pb.FilerConf_PathConf) bool { + fc.rules.MatchPrefix(pathBytes, func(key []byte, value *filer_pb.FilerConf_PathConf) bool { mergePathConf(pathConf, value) return true }) return pathConf } +// ClonePathConf creates a mutable copy of an existing PathConf. +// Use this when you need to modify a config (e.g., before calling SetLocationConf). +// +// IMPORTANT: Keep in sync with filer_pb.FilerConf_PathConf fields. +// When adding new fields to the protobuf, update this function accordingly. +func ClonePathConf(src *filer_pb.FilerConf_PathConf) *filer_pb.FilerConf_PathConf { + if src == nil { + return &filer_pb.FilerConf_PathConf{} + } + return &filer_pb.FilerConf_PathConf{ + LocationPrefix: src.LocationPrefix, + Collection: src.Collection, + Replication: src.Replication, + Ttl: src.Ttl, + DiskType: src.DiskType, + Fsync: src.Fsync, + VolumeGrowthCount: src.VolumeGrowthCount, + ReadOnly: src.ReadOnly, + MaxFileNameLength: src.MaxFileNameLength, + DataCenter: src.DataCenter, + Rack: src.Rack, + DataNode: src.DataNode, + DisableChunkDeletion: src.DisableChunkDeletion, + Worm: src.Worm, + WormGracePeriodSeconds: src.WormGracePeriodSeconds, + WormRetentionTimeSeconds: src.WormRetentionTimeSeconds, + } +} + func (fc *FilerConf) GetCollectionTtls(collection string) (ttls map[string]string) { ttls = make(map[string]string) fc.rules.Walk(func(key []byte, value *filer_pb.FilerConf_PathConf) bool { diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go index 02615b814..121ea7e18 100644 --- a/weed/filer/filer_conf_test.go +++ b/weed/filer/filer_conf_test.go @@ -1,6 +1,7 @@ package filer import ( + "reflect" "testing" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -47,3 +48,83 @@ func TestFilerConf(t *testing.T) { assert.Equal(t, false, fc.MatchStorageRule("/buckets/other").ReadOnly) } + +// TestClonePathConf verifies that ClonePathConf copies all exported fields. +// Uses reflection to automatically detect new fields added to the protobuf, +// ensuring the test fails if ClonePathConf is not updated for new fields. +func TestClonePathConf(t *testing.T) { + // Create a fully-populated PathConf with non-zero values for all fields + src := &filer_pb.FilerConf_PathConf{ + LocationPrefix: "/test/path", + Collection: "test_collection", + Replication: "001", + Ttl: "7d", + DiskType: "ssd", + Fsync: true, + VolumeGrowthCount: 5, + ReadOnly: true, + MaxFileNameLength: 255, + DataCenter: "dc1", + Rack: "rack1", + DataNode: "node1", + DisableChunkDeletion: true, + Worm: true, + WormGracePeriodSeconds: 3600, + WormRetentionTimeSeconds: 86400, + } + + clone := ClonePathConf(src) + + // Verify it's a different object + assert.NotSame(t, src, clone, "ClonePathConf should return a new object, not the same pointer") + + // Use reflection to compare all exported fields + // This will automatically catch any new fields added to the protobuf + srcVal := reflect.ValueOf(src).Elem() + cloneVal := reflect.ValueOf(clone).Elem() + srcType := srcVal.Type() + + for i := 0; i < srcType.NumField(); i++ { + field := srcType.Field(i) + + // Skip unexported fields (protobuf internal fields like sizeCache, unknownFields) + if !field.IsExported() { + continue + } + + srcField := srcVal.Field(i) + cloneField := cloneVal.Field(i) + + // Compare field values + if !reflect.DeepEqual(srcField.Interface(), cloneField.Interface()) { + t.Errorf("Field %s not copied correctly: src=%v, clone=%v", + field.Name, srcField.Interface(), cloneField.Interface()) + } + } + + // Additionally verify that all exported fields in src are non-zero + // This ensures we're testing with fully populated data + for i := 0; i < srcType.NumField(); i++ { + field := srcType.Field(i) + if !field.IsExported() { + continue + } + + srcField := srcVal.Field(i) + if srcField.IsZero() { + t.Errorf("Test setup error: field %s has zero value, update test to set a non-zero value", field.Name) + } + } + + // Verify mutation of clone doesn't affect source + clone.Collection = "modified" + clone.ReadOnly = false + assert.Equal(t, "test_collection", src.Collection, "Modifying clone should not affect source Collection") + assert.Equal(t, true, src.ReadOnly, "Modifying clone should not affect source ReadOnly") +} + +func TestClonePathConfNil(t *testing.T) { + clone := ClonePathConf(nil) + assert.NotNil(t, clone, "ClonePathConf(nil) should return a non-nil empty PathConf") + assert.Equal(t, "", clone.LocationPrefix, "ClonePathConf(nil) should return empty PathConf") +} diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 8694db984..5114955c7 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -32,9 +32,10 @@ type VirtualFilerStore interface { } type FilerStoreWrapper struct { - defaultStore FilerStore - pathToStore ptrie.Trie[string] - storeIdToStore map[string]FilerStore + defaultStore FilerStore + pathToStore ptrie.Trie[string] + storeIdToStore map[string]FilerStore + hasPathSpecificStore bool // fast check to skip MatchPrefix when no path-specific stores } func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { @@ -82,10 +83,15 @@ func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, if err != nil { glog.Fatalf("put path specific store: %v", err) } + fsw.hasPathSpecificStore = true } func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) { store = fsw.defaultStore + // Fast path: skip MatchPrefix if no path-specific stores are configured (common case) + if !fsw.hasPathSpecificStore { + return + } if path == "/" || path == "//" { return } diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go index 852ad2701..cbbdc96b2 100644 --- a/weed/filer/foundationdb/foundationdb_store.go +++ b/weed/filer/foundationdb/foundationdb_store.go @@ -730,9 +730,28 @@ func (store *FoundationDBStore) Shutdown() { glog.V(0).Infof("FoundationDB store shutdown") } +// tuplePool reduces allocations in genKey which is called on every FDB operation +var tuplePool = sync.Pool{ + New: func() interface{} { + // Pre-allocate slice with capacity 2 for (dirPath, fileName) + t := make(tuple.Tuple, 2) + return &t + }, +} + // Helper functions func (store *FoundationDBStore) genKey(dirPath, fileName string) fdb.Key { - return store.seaweedfsDir.Pack(tuple.Tuple{dirPath, fileName}) + // Get a tuple from pool to avoid slice allocation + tp := tuplePool.Get().(*tuple.Tuple) + defer func() { + // Clear references before returning to pool to avoid memory leaks + (*tp)[0] = nil + (*tp)[1] = nil + tuplePool.Put(tp) + }() + (*tp)[0] = dirPath + (*tp)[1] = fileName + return store.seaweedfsDir.Pack(*tp) } func (store *FoundationDBStore) extractFileName(key fdb.Key) (string, error) { diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 8460d5949..e199cddbe 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -138,23 +138,25 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor { info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { - incomingMd, _ := metadata.FromIncomingContext(ctx) - idList := incomingMd.Get(request_id.AmzRequestIDHeader) + // Get request ID from incoming metadata var reqID string - if len(idList) > 0 { - reqID = idList[0] + if incomingMd, ok := metadata.FromIncomingContext(ctx); ok { + if idList := incomingMd.Get(request_id.AmzRequestIDHeader); len(idList) > 0 { + reqID = idList[0] + } } if reqID == "" { reqID = uuid.New().String() } - ctx = metadata.NewOutgoingContext(ctx, - metadata.New(map[string]string{ - request_id.AmzRequestIDHeader: reqID, - })) - + // Store request ID in context for handlers to access ctx = request_id.Set(ctx, reqID) + // Also set outgoing context so handlers making downstream gRPC calls + // will automatically propagate the request ID + ctx = metadata.AppendToOutgoingContext(ctx, request_id.AmzRequestIDHeader, reqID) + + // Set trailer with request ID for response grpc.SetTrailer(ctx, metadata.Pairs(request_id.AmzRequestIDHeader, reqID)) return handler(ctx, req) @@ -187,8 +189,8 @@ func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientCon } else { ctx := context.Background() if signature != 0 { - md := metadata.New(map[string]string{"sw-client-id": fmt.Sprintf("%d", signature)}) - ctx = metadata.NewOutgoingContext(ctx, md) + // Optimize: Use AppendToOutgoingContext instead of creating new map + ctx = metadata.AppendToOutgoingContext(ctx, "sw-client-id", fmt.Sprintf("%d", signature)) } grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...) if err != nil { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index a7bd73c35..ae9c46fb2 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -254,8 +254,10 @@ func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCol return nil, ErrReadOnly } - if rule.MaxFileNameLength == 0 { - rule.MaxFileNameLength = fs.filer.MaxFilenameLength + // Use local variable instead of mutating shared rule + maxFileNameLength := rule.MaxFileNameLength + if maxFileNameLength == 0 { + maxFileNameLength = fs.filer.MaxFilenameLength } // required by buckets folder @@ -282,7 +284,7 @@ func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCol DiskType: util.Nvl(diskType, rule.DiskType), Fsync: rule.Fsync, VolumeGrowthCount: rule.VolumeGrowthCount, - MaxFileNameLength: rule.MaxFileNameLength, + MaxFileNameLength: maxFileNameLength, }, nil } diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go index bb54b73a4..c92b52117 100644 --- a/weed/shell/command_s3_bucket_quota_check.go +++ b/weed/shell/command_s3_bucket_quota_check.go @@ -5,10 +5,11 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "io" "math" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) func init() { @@ -104,7 +105,10 @@ func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, filerBucketsPath string, entry *filer_pb.Entry, writer io.Writer, collectionSize float64) (hasConfChanges bool) { locPrefix := filerBucketsPath + "/" + entry.Name + "/" - locConf := fc.MatchStorageRule(locPrefix) + existingConf := fc.MatchStorageRule(locPrefix) + + // Create a mutable copy for modification + locConf := filer.ClonePathConf(existingConf) locConf.LocationPrefix = locPrefix if entry.Quota > 0 { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 853cbe475..1c65fc916 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -17,7 +17,7 @@ import ( ) const BufferSize = 8 * 1024 * 1024 -const PreviousBufferCount = 32 +const PreviousBufferCount = 4 // Errors that can be returned by log buffer operations var ( @@ -862,6 +862,18 @@ var bufferPool = sync.Pool{ }, } +// logEntryPool reduces allocations in readTs which is called frequently during binary search +var logEntryPool = sync.Pool{ + New: func() interface{} { + return &filer_pb.LogEntry{} + }, +} + +// resetLogEntry clears a LogEntry for pool reuse +func resetLogEntry(e *filer_pb.LogEntry) { + proto.Reset(e) +} + func copiedBytes(buf []byte) (copied *bytes.Buffer) { copied = bufferPool.Get().(*bytes.Buffer) copied.Reset() @@ -883,7 +895,13 @@ func readTs(buf []byte, pos int) (size int, ts int64, err error) { } entryData := buf[pos+4 : pos+4+size] - logEntry := &filer_pb.LogEntry{} + + // Use pooled LogEntry to avoid allocation on every call + logEntry := logEntryPool.Get().(*filer_pb.LogEntry) + defer func() { + resetLogEntry(logEntry) + logEntryPool.Put(logEntry) + }() err = proto.Unmarshal(entryData, logEntry) if err != nil { @@ -891,6 +909,6 @@ func readTs(buf []byte, pos int) (size int, ts int64, err error) { // This allows caller to handle corruption gracefully return 0, 0, fmt.Errorf("corrupted log buffer: failed to unmarshal LogEntry at pos %d, size %d: %w", pos, size, err) } - return size, logEntry.TsNs, nil + return size, logEntry.TsNs, nil }