diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index 8cdd4210b..81cb89678 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -473,7 +473,7 @@ func findWeedBinary() string { func waitForServer(address string, timeout time.Duration) error { start := time.Now() for time.Since(start) < timeout { - if conn, err := grpc.Dial(address, grpc.WithInsecure()); err == nil { + if conn, err := grpc.NewClient(address, grpc.WithInsecure()); err == nil { conn.Close() return nil } diff --git a/test/kafka/go.mod b/test/kafka/go.mod index 9b8008c1f..09f74b545 100644 --- a/test/kafka/go.mod +++ b/test/kafka/go.mod @@ -20,7 +20,7 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect cloud.google.com/go/compute/metadata v0.8.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2 // indirect diff --git a/test/kafka/go.sum b/test/kafka/go.sum index b3723870d..809ee7cf9 100644 --- a/test/kafka/go.sum +++ b/test/kafka/go.sum @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1 h1:5YTBM8QDVIBN3sxBil89WfdAAqDZbyJTgh688DSxX5w= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1/go.mod h1:YD5h/ldMsG0XiIw7PdyNhLxaM317eFh5yNLccNfGdyw= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0 h1:wL5IEG5zb7BVv1Kv0Xm92orq+5hB5Nipn3B5tn4Rqfk= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0/go.mod h1:J7MUC/wtRpfGVbQ5sIItY5/FuVWmvzlY21WAOfQnq/I= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.0 h1:KpMC6LFL7mqpExyMC9jVOYRiVhLmamjeZfRsUpB7l4s= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.0/go.mod h1:J7MUC/wtRpfGVbQ5sIItY5/FuVWmvzlY21WAOfQnq/I= github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2 h1:yz1bePFlP5Vws5+8ez6T3HWXPmwOK7Yvq8QxDBD3SKY= github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2/go.mod h1:Pa9ZNPuoNu/GztvBSKk9J1cDJW6vk/n0zLtV4mgd8N8= github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA= @@ -538,8 +538,8 @@ github.com/putdotio/go-putio/putio v0.0.0-20200123120452-16d982cac2b8 h1:Y258uzX github.com/putdotio/go-putio/putio v0.0.0-20200123120452-16d982cac2b8/go.mod h1:bSJjRokAHHOhA+XFxplld8w2R/dXLH7Z3BZ532vhFwU= github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= -github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= -github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= +github.com/quic-go/quic-go v0.54.1 h1:4ZAWm0AhCb6+hE+l5Q1NAL0iRn/ZrMwqHRGQiFwj2eg= +github.com/quic-go/quic-go v0.54.1/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= github.com/rclone/rclone v1.71.1 h1:cpODfWTRz5i/WAzXsyW85tzfIKNsd1aq8CE8lUB+0zg= github.com/rclone/rclone v1.71.1/go.mod h1:NLyX57FrnZ9nVLTY5TRdMmGelrGKbIRYGcgRkNdqqlA= github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= diff --git a/test/postgres/producer.go b/test/postgres/producer.go index ecaea6344..2d49519e8 100644 --- a/test/postgres/producer.go +++ b/test/postgres/producer.go @@ -299,7 +299,7 @@ func discoverFiler(masterHTTPAddress string) (string, error) { httpAddr := pb.ServerAddress(masterHTTPAddress) masterGRPCAddress := httpAddr.ToGrpcAddress() - conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return "", fmt.Errorf("failed to connect to master at %s: %v", masterGRPCAddress, err) } @@ -334,7 +334,7 @@ func discoverBroker(masterHTTPAddress string) (string, error) { return "", fmt.Errorf("failed to discover filer: %v", err) } - conn, err := grpc.Dial(filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return "", fmt.Errorf("failed to connect to filer at %s: %v", filerAddress, err) } diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 78ba6d7de..74410aab6 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -335,19 +335,15 @@ func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *wo // handleTaskRequest processes task requests from workers func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) { - // glog.Infof("DEBUG handleTaskRequest: Worker %s requesting tasks with capabilities %v", conn.workerID, conn.capabilities) if s.adminServer.maintenanceManager == nil { - glog.Infof("DEBUG handleTaskRequest: maintenance manager is nil") return } // Get next task from maintenance manager task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities) - // glog.Infof("DEBUG handleTaskRequest: GetNextTask returned task: %v", task != nil) if task != nil { - glog.Infof("DEBUG handleTaskRequest: Assigning task %s (type: %s) to worker %s", task.ID, task.Type, conn.workerID) // Use typed params directly - master client should already be configured in the params var taskParams *worker_pb.TaskParams @@ -383,12 +379,10 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo select { case conn.outgoing <- assignment: - glog.Infof("DEBUG handleTaskRequest: Successfully assigned task %s to worker %s", task.ID, conn.workerID) case <-time.After(time.Second): glog.Warningf("Failed to send task assignment to worker %s", conn.workerID) } } else { - // glog.Infof("DEBUG handleTaskRequest: No tasks available for worker %s", conn.workerID) } } diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index e92a50c9d..6335c4174 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -38,7 +38,6 @@ func NewMaintenanceHandlers(adminServer *dash.AdminServer) *MaintenanceHandlers // ShowTaskDetail displays the task detail page func (h *MaintenanceHandlers) ShowTaskDetail(c *gin.Context) { taskID := c.Param("id") - glog.Infof("DEBUG ShowTaskDetail: Starting for task ID: %s", taskID) taskDetail, err := h.adminServer.GetMaintenanceTaskDetail(taskID) if err != nil { @@ -47,7 +46,6 @@ func (h *MaintenanceHandlers) ShowTaskDetail(c *gin.Context) { return } - glog.Infof("DEBUG ShowTaskDetail: got task detail for %s, task type: %s, status: %s", taskID, taskDetail.Task.Type, taskDetail.Task.Status) c.Header("Content-Type", "text/html") taskDetailComponent := app.TaskDetail(taskDetail) @@ -59,7 +57,6 @@ func (h *MaintenanceHandlers) ShowTaskDetail(c *gin.Context) { return } - glog.Infof("DEBUG ShowTaskDetail: template rendered successfully for task %s", taskID) } // ShowMaintenanceQueue displays the maintenance queue page diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index 553f32eb8..20f1ea97d 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -299,42 +299,33 @@ func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetec // CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool { - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Checking task %s (type: %s)", task.ID, task.Type) // Convert existing types to task types using mapping taskType, exists := s.revTaskTypeMap[task.Type] if !exists { - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Unknown task type %s for scheduling, falling back to existing logic", task.Type) return false // Fallback to existing logic for unknown types } - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Mapped task type %s to %s", task.Type, taskType) // Convert task objects taskObject := s.convertTaskToTaskSystem(task) if taskObject == nil { - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Failed to convert task %s for scheduling", task.ID) return false } - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Successfully converted task %s", task.ID) runningTaskObjects := s.convertTasksToTaskSystem(runningTasks) workerObjects := s.convertWorkersToTaskSystem(availableWorkers) - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Converted %d running tasks and %d workers", len(runningTaskObjects), len(workerObjects)) // Get the appropriate scheduler scheduler := s.taskRegistry.GetScheduler(taskType) if scheduler == nil { - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: No scheduler found for task type %s", taskType) return false } - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Found scheduler for task type %s", taskType) canSchedule := scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects) - glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Scheduler decision for task %s: %v", task.ID, canSchedule) return canSchedule } diff --git a/weed/filer_client/filer_discovery.go b/weed/filer_client/filer_discovery.go index c16cb5b75..49cfcd314 100644 --- a/weed/filer_client/filer_discovery.go +++ b/weed/filer_client/filer_discovery.go @@ -51,7 +51,7 @@ func (fds *FilerDiscoveryService) discoverFilersFromMaster(masterAddr pb.ServerA // Convert HTTP master address to gRPC address (HTTP port + 10000) grpcAddr := masterAddr.ToGrpcAddress() - conn, err := grpc.Dial(grpcAddr, fds.grpcDialOption) + conn, err := grpc.NewClient(grpcAddr, fds.grpcDialOption) if err != nil { return nil, fmt.Errorf("failed to connect to master at %s: %v", grpcAddr, err) } diff --git a/weed/iamapi/iamapi_management_handlers.go b/weed/iamapi/iamapi_management_handlers.go index 573d6dabc..1a8f852cd 100644 --- a/weed/iamapi/iamapi_management_handlers.go +++ b/weed/iamapi/iamapi_management_handlers.go @@ -322,14 +322,12 @@ func GetActions(policy *policy_engine.PolicyDocument) ([]string, error) { // Parse "arn:aws:s3:::my-bucket/shared/*" res := strings.Split(resource, ":") if len(res) != 6 || res[0] != "arn" || res[1] != "aws" || res[2] != "s3" { - glog.Infof("not a valid resource: %s", res) continue } for _, action := range statement.Action.Strings() { // Parse "s3:Get*" act := strings.Split(action, ":") if len(act) != 2 || act[0] != "s3" { - glog.Infof("not a valid action: %s", act) continue } statementAction := MapToStatementAction(act[1]) diff --git a/weed/mq/broker/broker_grpc_fetch.go b/weed/mq/broker/broker_grpc_fetch.go index 19024d852..4eb17d4fb 100644 --- a/weed/mq/broker/broker_grpc_fetch.go +++ b/weed/mq/broker/broker_grpc_fetch.go @@ -105,9 +105,6 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM requestedOffset := req.StartOffset // Read messages from LogBuffer (stateless read) - glog.Infof("[FetchMessage] About to read from LogBuffer: topic=%s partition=%v offset=%d maxMessages=%d maxBytes=%d", - t.Name, partition, requestedOffset, maxMessages, maxBytes) - logEntries, nextOffset, highWaterMark, endOfPartition, err := localPartition.LogBuffer.ReadMessagesAtOffset( requestedOffset, maxMessages, @@ -122,9 +119,6 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM nextOffset, endOfPartition, localPartition.LogBuffer.GetLogStartOffset()) } - glog.Infof("[FetchMessage] Read completed: topic=%s partition=%v offset=%d -> %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v", - t.Name, partition, requestedOffset, len(logEntries), nextOffset, highWaterMark, endOfPartition, err) - if err != nil { // Check if this is an "offset out of range" error errMsg := err.Error() diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go index af3330b03..eea1b1907 100644 --- a/weed/mq/kafka/gateway/coordinator_registry.go +++ b/weed/mq/kafka/gateway/coordinator_registry.go @@ -80,7 +80,7 @@ func NewCoordinatorRegistry(gatewayAddress string, masters []pb.ServerAddress, g for _, master := range masters { // Use the same discovery logic as filer_discovery.go grpcAddr := master.ToGrpcAddress() - conn, err := grpc.Dial(grpcAddr, grpcDialOption) + conn, err := grpc.NewClient(grpcAddr, grpcDialOption) if err != nil { continue } diff --git a/weed/mq/kafka/integration/broker_client_fetch.go b/weed/mq/kafka/integration/broker_client_fetch.go index 25af9e809..016f8ccdf 100644 --- a/weed/mq/kafka/integration/broker_client_fetch.go +++ b/weed/mq/kafka/integration/broker_client_fetch.go @@ -80,10 +80,6 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string } } - // CRITICAL DEBUGGING: Log what broker returned - glog.Infof("[FETCH-STATELESS-CLIENT] Broker response for %s[%d] offset %d: messages=%d, nextOffset=%d, hwm=%d, logStart=%d, endOfPartition=%v", - topic, partition, startOffset, len(resp.Messages), resp.NextOffset, resp.HighWaterMark, resp.LogStartOffset, resp.EndOfPartition) - // CRITICAL: If broker returns 0 messages but hwm > startOffset, something is wrong if len(resp.Messages) == 0 && resp.HighWaterMark > startOffset { glog.Errorf("[FETCH-STATELESS-CLIENT] CRITICAL BUG: Broker returned 0 messages for %s[%d] offset %d, but HWM=%d (should have %d messages available)", diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 6b38a71e1..58a96f5d8 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -181,7 +181,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } pending := make([]pendingFetch, 0) - persistentFetchStart := time.Now() // Phase 1: Dispatch all fetch requests to partition readers (non-blocking) for _, topic := range fetchRequest.Topics { @@ -285,8 +284,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } done: - _ = time.Since(persistentFetchStart) // persistentFetchDuration - // ==================================================================== // BUILD RESPONSE FROM FETCHED DATA // Now assemble the response in the correct order using fetched results @@ -1132,7 +1129,6 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return nil } - // For system topics like _schemas, _consumer_offsets, etc., // return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.) // and should NOT be processed as RecordValue protobuf messages. diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index 61cd19f78..192872850 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -8,7 +8,6 @@ import ( "fmt" "hash/crc32" "strings" - "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" @@ -61,7 +60,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName // Assume average message size + batch overhead // Client requested maxBytes, we should use most of it // Start with larger batches to maximize throughput - estimatedMsgSize := int32(1024) // Typical message size with overhead + estimatedMsgSize := int32(1024) // Typical message size with overhead recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently if recordsPerBatch < 100 { recordsPerBatch = 100 // Minimum 100 records per batch @@ -116,9 +115,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName // Fetch records for this batch // Pass context to respect Kafka fetch request's MaxWaitTime - getRecordsStartTime := time.Now() smqRecords, err := f.handler.seaweedMQHandler.GetStoredRecords(ctx, topicName, partitionID, currentOffset, int(recordsToFetch)) - _ = time.Since(getRecordsStartTime) // getRecordsDuration if err != nil || len(smqRecords) == 0 { break diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index 0117e3809..6583c6489 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -2,7 +2,6 @@ package protocol import ( "context" - "fmt" "sync" "time" @@ -120,21 +119,9 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition startTime := time.Now() result := &partitionFetchResult{} - // Log request START with full details - glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID) - defer func() { result.fetchDuration = time.Since(startTime) - // Log request END with results - resultStatus := "EMPTY" - if len(result.recordBatch) > 0 { - resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch)) - } - glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000) - // Send result back to client select { case req.resultChan <- result: @@ -189,9 +176,6 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) result.recordBatch = []byte{} } else { - // Log successful fetch with details - glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) result.recordBatch = recordBatch pr.bufferMu.Lock() pr.currentOffset = newOffset diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index ab1b1cb21..849d1148d 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -590,7 +590,6 @@ func decodeVarint(data []byte) (int64, int) { // handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+) func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { - startTime := time.Now() // For now, use simplified parsing similar to v0/v1 but handle v2+ response format // In v2+, the main differences are: @@ -731,7 +730,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(records) == 0 { errorCode = 42 // INVALID_RECORD } else { - var firstOffsetSet bool for idx, kv := range records { offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) @@ -746,11 +744,8 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if idx == 0 { baseOffset = offsetProduced - firstOffsetSet = true } } - - _ = firstOffsetSet } } else { // Try to extract anyway - this might be a Noop record @@ -815,7 +810,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(response) < 20 { } - _ = time.Since(startTime) // duration return response, nil } diff --git a/weed/mq/offset/benchmark_test.go b/weed/mq/offset/benchmark_test.go index d82729142..0fdacf127 100644 --- a/weed/mq/offset/benchmark_test.go +++ b/weed/mq/offset/benchmark_test.go @@ -227,7 +227,7 @@ func BenchmarkOffsetSubscription(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { subscriptionID := fmt.Sprintf("bench-sub-%d", i) - sub, err := subscriber.CreateSubscription( + _, err := subscriber.CreateSubscription( subscriptionID, "test-namespace", "test-topic", partition, @@ -238,7 +238,6 @@ func BenchmarkOffsetSubscription(b *testing.B) { b.Fatalf("Failed to create subscription: %v", err) } subscriber.CloseSubscription(subscriptionID) - _ = sub } }) @@ -338,7 +337,7 @@ func BenchmarkSMQOffsetIntegration(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { subscriptionID := fmt.Sprintf("integration-sub-%d", i) - sub, err := integration.CreateSubscription( + _, err := integration.CreateSubscription( subscriptionID, "test-namespace", "test-topic", partition, @@ -349,7 +348,6 @@ func BenchmarkSMQOffsetIntegration(b *testing.B) { b.Fatalf("Failed to create subscription: %v", err) } integration.CloseSubscription(subscriptionID) - _ = sub } }) diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 3e6517678..c1b1cab6f 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -58,7 +58,7 @@ func (c *BrokerClient) discoverFiler() error { return nil // already discovered } - conn, err := grpc.Dial(c.masterAddress, c.grpcDialOption) + conn, err := grpc.NewClient(c.masterAddress, c.grpcDialOption) if err != nil { return fmt.Errorf("failed to connect to master at %s: %v", c.masterAddress, err) } @@ -99,7 +99,7 @@ func (c *BrokerClient) findBrokerBalancer() error { return fmt.Errorf("failed to discover filer: %v", err) } - conn, err := grpc.Dial(c.filerAddress, c.grpcDialOption) + conn, err := grpc.NewClient(c.filerAddress, c.grpcDialOption) if err != nil { return fmt.Errorf("failed to connect to filer at %s: %v", c.filerAddress, err) } @@ -143,7 +143,7 @@ type filerClientImpl struct { // WithFilerClient executes a function with a connected filer client func (f *filerClientImpl) WithFilerClient(followRedirect bool, fn func(client filer_pb.SeaweedFilerClient) error) error { - conn, err := grpc.Dial(f.filerAddress, f.grpcDialOption) + conn, err := grpc.NewClient(f.filerAddress, f.grpcDialOption) if err != nil { return fmt.Errorf("failed to connect to filer at %s: %v", f.filerAddress, err) } @@ -317,7 +317,7 @@ func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName return err } - conn, err := grpc.Dial(c.brokerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(c.brokerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return fmt.Errorf("failed to connect to broker at %s: %v", c.brokerAddress, err) } @@ -429,7 +429,7 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi glog.V(2).Infof("Found broker at address: %s", c.brokerAddress) // Step 2: Connect to broker - conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption) + conn, err := grpc.NewClient(c.brokerAddress, c.grpcDialOption) if err != nil { glog.V(2).Infof("Failed to connect to broker %s: %v", c.brokerAddress, err) // Return empty slice if connection fails - prevents double-counting diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index d63e10364..d181d51da 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -486,7 +486,6 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl for _, deleteEntry := range deleteEntries { //delete unused part data - glog.Infof("completeMultipartUpload cleanup %s upload %s unused %s", *input.Bucket, *input.UploadId, deleteEntry.Name) if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil { glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err) } diff --git a/weed/s3api/s3_sse_kms.go b/weed/s3api/s3_sse_kms.go index 11c3bf643..3b721aa26 100644 --- a/weed/s3api/s3_sse_kms.go +++ b/weed/s3api/s3_sse_kms.go @@ -423,10 +423,8 @@ func CreateSSEKMSDecryptedReader(r io.Reader, sseKey *SSEKMSKey) (io.Reader, err var iv []byte if sseKey.ChunkOffset > 0 { iv = calculateIVWithOffset(sseKey.IV, sseKey.ChunkOffset) - glog.Infof("Using calculated IV with offset %d for chunk decryption", sseKey.ChunkOffset) } else { iv = sseKey.IV - // glog.Infof("Using base IV for chunk decryption (offset=0)") } // Create AES cipher with the decrypted data key diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index f1d9d7f7c..47efa728a 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -32,7 +32,6 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { err := pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) if errors.Is(err, filer_pb.ErrNotFound) { - glog.Infof("s3 circuit breaker not configured") return nil } if err != nil { @@ -42,7 +41,6 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { }) if err != nil { - glog.Infof("s3 circuit breaker not configured correctly: %v", err) } return cb diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 75c9a9e91..f30522292 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -921,9 +921,6 @@ func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *htt } } isMultipartSSEKMS = sseKMSChunks > 1 - - glog.Infof("SSE-KMS object detection: chunks=%d, sseKMSChunks=%d, isMultipartSSEKMS=%t", - len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS) } } @@ -1131,10 +1128,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr // Create readers for each chunk, decrypting them independently var readers []io.Reader - for i, chunk := range chunks { - glog.Infof("Processing chunk %d/%d: fileId=%s, offset=%d, size=%d, sse_type=%d", - i+1, len(entry.GetChunks()), chunk.GetFileIdString(), chunk.GetOffset(), chunk.GetSize(), chunk.GetSseType()) - + for _, chunk := range chunks { // Get this chunk's encrypted data chunkReader, err := s3a.createEncryptedChunkReader(chunk) if err != nil { @@ -1153,8 +1147,6 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr } else { // ChunkOffset is already set from the stored metadata (PartOffset) chunkSSEKMSKey = kmsKey - glog.Infof("Using per-chunk SSE-KMS metadata for chunk %s: keyID=%s, IV=%x, partOffset=%d", - chunk.GetFileIdString(), kmsKey.KeyID, kmsKey.IV[:8], kmsKey.ChunkOffset) } } @@ -1170,7 +1162,6 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr kmsKey.ChunkOffset = chunk.GetOffset() chunkSSEKMSKey = kmsKey } - glog.Infof("Using fallback object-level SSE-KMS metadata for chunk %s with offset %d", chunk.GetFileIdString(), chunk.GetOffset()) } } } @@ -1410,7 +1401,6 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return nil, fmt.Errorf("failed to create SSE-C decrypted reader for chunk %s: %v", chunk.GetFileIdString(), decErr) } readers = append(readers, decryptedReader) - glog.Infof("Created SSE-C decrypted reader for chunk %s using stored metadata", chunk.GetFileIdString()) } else { return nil, fmt.Errorf("SSE-C chunk %s missing required metadata", chunk.GetFileIdString()) } diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 45972b600..a71b52a39 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -1113,20 +1113,9 @@ func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([] // copyMultipartSSECChunks handles copying multipart SSE-C objects // Returns chunks and destination metadata that should be applied to the destination entry func (s3a *S3ApiServer) copyMultipartSSECChunks(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - glog.Infof("copyMultipartSSECChunks called: copySourceKey=%v, destKey=%v, path=%s", copySourceKey != nil, destKey != nil, dstPath) - - var sourceKeyMD5, destKeyMD5 string - if copySourceKey != nil { - sourceKeyMD5 = copySourceKey.KeyMD5 - } - if destKey != nil { - destKeyMD5 = destKey.KeyMD5 - } - glog.Infof("Key MD5 comparison: source=%s, dest=%s, equal=%t", sourceKeyMD5, destKeyMD5, sourceKeyMD5 == destKeyMD5) // For multipart SSE-C, always use decrypt/reencrypt path to ensure proper metadata handling // The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing - glog.Infof("Taking multipart SSE-C reencrypt path to preserve metadata: %s", dstPath) // Different keys or key changes: decrypt and re-encrypt each chunk individually glog.V(2).Infof("Multipart SSE-C reencrypt copy (different keys): %s", dstPath) @@ -1175,11 +1164,9 @@ func (s3a *S3ApiServer) copyMultipartSSECChunks(entry *filer_pb.Entry, copySourc // copyMultipartSSEKMSChunks handles copying multipart SSE-KMS objects (unified with SSE-C approach) // Returns chunks and destination metadata that should be applied to the destination entry func (s3a *S3ApiServer) copyMultipartSSEKMSChunks(entry *filer_pb.Entry, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - glog.Infof("copyMultipartSSEKMSChunks called: destKeyID=%s, path=%s", destKeyID, dstPath) // For multipart SSE-KMS, always use decrypt/reencrypt path to ensure proper metadata handling // The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing - glog.Infof("Taking multipart SSE-KMS reencrypt path to preserve metadata: %s", dstPath) var dstChunks []*filer_pb.FileChunk @@ -1217,7 +1204,6 @@ func (s3a *S3ApiServer) copyMultipartSSEKMSChunks(entry *filer_pb.Entry, destKey } if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil { dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata - glog.Infof("Created object-level KMS metadata for GET compatibility") } else { glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr) } @@ -1444,10 +1430,6 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo // copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios // This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - glog.Infof("copyMultipartCrossEncryption called: %s→%s, path=%s", - s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, false), - s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, false), dstPath) - var dstChunks []*filer_pb.FileChunk // Parse destination encryption parameters @@ -1462,16 +1444,13 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h if err != nil { return nil, nil, fmt.Errorf("failed to parse destination SSE-C headers: %w", err) } - glog.Infof("Destination SSE-C: keyMD5=%s", destSSECKey.KeyMD5) } else if state.DstSSEKMS { var err error destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, err = ParseSSEKMSCopyHeaders(r) if err != nil { return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err) } - glog.Infof("Destination SSE-KMS: keyID=%s, bucketKey=%t", destKMSKeyID, destKMSBucketKeyEnabled) } else { - glog.Infof("Destination: Unencrypted") } // Parse source encryption parameters @@ -1482,7 +1461,6 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h if err != nil { return nil, nil, fmt.Errorf("failed to parse source SSE-C headers: %w", err) } - glog.Infof("Source SSE-C: keyMD5=%s", sourceSSECKey.KeyMD5) } // Process each chunk with unified cross-encryption logic @@ -1529,7 +1507,6 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h StoreIVInMetadata(dstMetadata, iv) dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destSSECKey.KeyMD5) - glog.Infof("Created SSE-C object-level metadata from first chunk") } } } @@ -1545,7 +1522,6 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h } if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil { dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata - glog.Infof("Created SSE-KMS object-level metadata") } else { glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr) } @@ -1738,7 +1714,6 @@ func (s3a *S3ApiServer) getEncryptionTypeString(isSSEC, isSSEKMS, isSSES3 bool) // copyChunksWithSSEC handles SSE-C aware copying with smart fast/slow path selection // Returns chunks and destination metadata that should be applied to the destination entry func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Request) ([]*filer_pb.FileChunk, map[string][]byte, error) { - glog.Infof("copyChunksWithSSEC called for %s with %d chunks", r.URL.Path, len(entry.GetChunks())) // Parse SSE-C headers copySourceKey, err := ParseSSECCopySourceHeaders(r) @@ -1764,8 +1739,6 @@ func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Reques } isMultipartSSEC = sseCChunks > 1 - glog.Infof("SSE-C copy analysis: total chunks=%d, sseC chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseCChunks, isMultipartSSEC) - if isMultipartSSEC { glog.V(2).Infof("Detected multipart SSE-C object with %d encrypted chunks for copy", sseCChunks) return s3a.copyMultipartSSECChunks(entry, copySourceKey, destKey, r.URL.Path) @@ -1933,7 +1906,6 @@ func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, cop // copyChunksWithSSEKMS handles SSE-KMS aware copying with smart fast/slow path selection // Returns chunks and destination metadata like SSE-C for consistency func (s3a *S3ApiServer) copyChunksWithSSEKMS(entry *filer_pb.Entry, r *http.Request, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - glog.Infof("copyChunksWithSSEKMS called for %s with %d chunks", r.URL.Path, len(entry.GetChunks())) // Parse SSE-KMS headers from copy request destKeyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r) @@ -1952,8 +1924,6 @@ func (s3a *S3ApiServer) copyChunksWithSSEKMS(entry *filer_pb.Entry, r *http.Requ } isMultipartSSEKMS = sseKMSChunks > 1 - glog.Infof("SSE-KMS copy analysis: total chunks=%d, sseKMS chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS) - if isMultipartSSEKMS { glog.V(2).Infof("Detected multipart SSE-KMS object with %d encrypted chunks for copy", sseKMSChunks) return s3a.copyMultipartSSEKMSChunks(entry, destKeyID, encryptionContext, bucketKeyEnabled, r.URL.Path, bucket) diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 3d83b585b..ef1182fc2 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -318,16 +318,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ // Check for SSE-C headers in the current request first sseCustomerAlgorithm := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm) if sseCustomerAlgorithm != "" { - glog.Infof("PutObjectPartHandler: detected SSE-C headers, handling as SSE-C part upload") // SSE-C part upload - headers are already present, let putToFiler handle it } else { // No SSE-C headers, check for SSE-KMS settings from upload directory - glog.Infof("PutObjectPartHandler: attempting to retrieve upload entry for bucket %s, uploadID %s", bucket, uploadID) if uploadEntry, err := s3a.getEntry(s3a.genUploadsFolder(bucket), uploadID); err == nil { - glog.Infof("PutObjectPartHandler: upload entry found, Extended metadata: %v", uploadEntry.Extended != nil) if uploadEntry.Extended != nil { // Check if this upload uses SSE-KMS - glog.Infof("PutObjectPartHandler: checking for SSE-KMS key in extended metadata") if keyIDBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; exists { keyID := string(keyIDBytes) @@ -385,7 +381,6 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ // Pass the base IV to putToFiler via header r.Header.Set(s3_constants.SeaweedFSSSEKMSBaseIVHeader, base64.StdEncoding.EncodeToString(baseIV)) - glog.Infof("PutObjectPartHandler: inherited SSE-KMS settings from upload %s, keyID %s - letting putToFiler handle encryption", uploadID, keyID) } else { // Check if this upload uses SSE-S3 if err := s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID); err != nil { @@ -396,7 +391,6 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } } } else { - glog.Infof("PutObjectPartHandler: failed to retrieve upload entry: %v", err) } } @@ -501,9 +495,7 @@ type CompletedPart struct { // handleSSES3MultipartHeaders handles SSE-S3 multipart upload header setup to reduce nesting complexity func (s3a *S3ApiServer) handleSSES3MultipartHeaders(r *http.Request, uploadEntry *filer_pb.Entry, uploadID string) error { - glog.Infof("PutObjectPartHandler: checking for SSE-S3 settings in extended metadata") if encryptionTypeBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSES3Encryption]; exists && string(encryptionTypeBytes) == s3_constants.SSEAlgorithmAES256 { - glog.Infof("PutObjectPartHandler: found SSE-S3 encryption type, setting up headers") // Set SSE-S3 headers to indicate server-side encryption r.Header.Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) @@ -538,7 +530,6 @@ func (s3a *S3ApiServer) handleSSES3MultipartHeaders(r *http.Request, uploadEntry // Pass the base IV to putToFiler via header for offset calculation r.Header.Set(s3_constants.SeaweedFSSSES3BaseIVHeader, base64.StdEncoding.EncodeToString(baseIV)) - glog.Infof("PutObjectPartHandler: inherited SSE-S3 settings from upload %s - letting putToFiler handle encryption", uploadID) } return nil } diff --git a/weed/shell/command_mount_configure.go b/weed/shell/command_mount_configure.go index 5b224c39e..185857b9a 100644 --- a/weed/shell/command_mount_configure.go +++ b/weed/shell/command_mount_configure.go @@ -4,12 +4,13 @@ import ( "context" "flag" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" _ "google.golang.org/grpc/resolver/passthrough" - "io" ) func init() { @@ -53,7 +54,7 @@ func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer } localSocket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", mountDirHash) - clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) + clientConn, err := grpc.NewClient("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index e8140d3aa..2ee4309e4 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -152,8 +152,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano() } // collect each volume file ids - eg, gCtx := errgroup.WithContext(context.Background()) - _ = gCtx + eg, _ := errgroup.WithContext(context.Background()) for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo { dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo eg.Go(func() error { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 692c47b44..5aeb285ca 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -615,7 +615,6 @@ func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() { defer logBuffer.diskChunkCache.mu.Unlock() if len(logBuffer.diskChunkCache.chunks) > 0 { - glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks)) logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk) } } diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index b57f7742f..b48413bc8 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -34,9 +34,6 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages endOfPartition bool, err error, ) { - glog.Infof("[StatelessRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d", - startOffset, maxMessages, maxBytes) - // Quick validation if maxMessages <= 0 { maxMessages = 100 // Default reasonable batch size @@ -54,13 +51,9 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages bufferStartOffset := logBuffer.bufferStartOffset highWaterMark = currentBufferEnd - glog.Infof("[StatelessRead] Buffer state: startOffset=%d, bufferStart=%d, bufferEnd=%d, HWM=%d, pos=%d", - startOffset, bufferStartOffset, currentBufferEnd, highWaterMark, logBuffer.pos) - // Special case: empty buffer (no data written yet) if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 { logBuffer.RUnlock() - glog.Infof("[StatelessRead] PATH: Empty buffer (no data written yet)") // Return empty result - partition exists but has no data yet // Preserve the requested offset in nextOffset return messages, startOffset, 0, true, nil @@ -68,7 +61,6 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // Check if requested offset is in current buffer if startOffset >= bufferStartOffset && startOffset < currentBufferEnd { - glog.Infof("[StatelessRead] PATH: Attempting to read from current/previous memory buffers") // Read from current buffer glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d", bufferStartOffset, currentBufferEnd) @@ -137,9 +129,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages } else { // Offset is not in current buffer - check previous buffers FIRST before going to disk // This handles the case where data was just flushed but is still in prevBuffers - glog.Infof("[StatelessRead] PATH: Offset %d not in current buffer [%d-%d), checking previous buffers first", - startOffset, bufferStartOffset, currentBufferEnd) - + for _, prevBuf := range logBuffer.prevBuffers.buffers { if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset { if prevBuf.size > 0 { @@ -155,9 +145,6 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages return nil, startOffset, highWaterMark, false, err } - glog.Infof("[StatelessRead] SUCCESS: Found %d messages in previous buffer, nextOffset=%d", - len(messages), nextOffset) - endOfPartition = false // More data might exist return messages, nextOffset, highWaterMark, endOfPartition, nil } @@ -177,8 +164,6 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // 1. startOffset < bufferStartOffset: Historical data // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above) if startOffset < currentBufferEnd { - glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ") - // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured if startOffset < bufferStartOffset { glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d", @@ -199,8 +184,6 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages startOffset, bufferStartOffset, currentBufferEnd) } - glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...") - // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) // The ReadFromDiskFn should handle its own timeouts and not block indefinitely diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( @@ -215,9 +198,6 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages if len(diskMessages) == 0 { glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)", startOffset, highWaterMark, bufferStartOffset) - } else { - glog.Infof("[StatelessRead] SUCCESS: Disk read returned %d messages, nextOffset=%d", - len(diskMessages), diskNextOffset) } // Return disk data @@ -243,22 +223,14 @@ func readHistoricalDataFromDisk( ) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { const chunkSize = 1000 // Size of each cached chunk - glog.Infof("[DiskRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d, HWM=%d", - startOffset, maxMessages, maxBytes, highWaterMark) - // Calculate chunk start offset (aligned to chunkSize boundary) chunkStartOffset := (startOffset / chunkSize) * chunkSize - glog.Infof("[DiskRead] Calculated chunkStartOffset=%d (aligned from %d)", chunkStartOffset, startOffset) - // Try to get from cache first cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset) if cacheHit { // Found in cache - extract requested messages - glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d", - chunkStartOffset, startOffset, len(cachedMessages)) - result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) if err != nil { @@ -271,7 +243,6 @@ func readHistoricalDataFromDisk( // it's not on disk. glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)", startOffset, chunkStartOffset, len(cachedMessages)) - glog.Infof("[DiskCache] Returning empty to let memory buffers handle offset %d", startOffset) // Return empty but NO ERROR - this signals "not on disk, try memory" return nil, startOffset, nil @@ -281,9 +252,6 @@ func readHistoricalDataFromDisk( return result, nextOff, nil } - glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn", - chunkStartOffset) - // Not in cache - read entire chunk from disk for caching chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize) chunkNextOffset := chunkStartOffset @@ -309,7 +277,6 @@ func readHistoricalDataFromDisk( } // Read chunk from disk - glog.Infof("[DiskRead] Calling ReadFromDiskFn with position offset=%d...", chunkStartOffset) _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn) if readErr != nil { @@ -317,20 +284,15 @@ func readHistoricalDataFromDisk( return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) } - glog.Infof("[DiskRead] ReadFromDiskFn completed successfully, read %d messages", len(chunkMessages)) - // Cache the chunk for future reads if len(chunkMessages) > 0 { cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages) - glog.Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)", - chunkStartOffset, chunkNextOffset-1, len(chunkMessages)) } else { glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset) } // Extract requested messages from the chunk result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes) - glog.Infof("[DiskRead] EXIT: Returning %d messages, nextOffset=%d, err=%v", len(result), resNextOffset, resErr) return result, resNextOffset, resErr } @@ -356,7 +318,6 @@ func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) { if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { delete(logBuffer.diskChunkCache.chunks, chunkStartOffset) - glog.Infof("[DiskCache] Invalidated chunk at offset %d", chunkStartOffset) } } @@ -415,10 +376,6 @@ func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset in // Requested offset is beyond the cached chunk // This happens when disk files only contain partial data // The requested offset might be in the gap between disk and memory - glog.Infof("[DiskCache] Requested offset %d is beyond cached chunk (chunkStart=%d, cachedSize=%d, positionInChunk=%d)", - startOffset, chunkStartOffset, len(chunkMessages), positionInChunk) - glog.Infof("[DiskCache] Chunk contains offsets %d-%d, requested %d - data not on disk", - chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset) // Return empty (data not on disk) - caller will check memory buffers return nil, startOffset, nil @@ -488,7 +445,6 @@ func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, max if !foundStart { // Find the first message at or after startOffset if logEntry.Offset >= startOffset { - glog.Infof("[parseMessages] Found first message at/after startOffset %d: logEntry.Offset=%d", startOffset, logEntry.Offset) foundStart = true nextOffset = logEntry.Offset } else { @@ -510,9 +466,6 @@ func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, max pos += 4 + int(size) } - glog.Infof("[parseMessages] Parsed buffer: requested startOffset=%d, messagesInBuffer=%d, messagesReturned=%d, nextOffset=%d", - startOffset, messagesInBuffer, len(messages), nextOffset) - glog.V(4).Infof("[parseMessages] Parsed %d messages, nextOffset=%d, totalBytes=%d", len(messages), nextOffset, totalBytes)