From 93d07793189de1a1bfe756acad648c4817a21aa3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 15 Dec 2025 17:36:35 -0800 Subject: [PATCH] fix: add S3 bucket traffic sent metric tracking (#7774) * fix: add S3 bucket traffic sent metric tracking The BucketTrafficSent() function was defined but never called, causing the S3 Bucket Traffic Sent Grafana dashboard panel to not display data. Added BucketTrafficSent() calls in the streaming functions: - streamFromVolumeServers: for inline and chunked content - streamFromVolumeServersWithSSE: for encrypted range and full object requests The traffic received metric already worked because BucketTrafficReceived() was properly called in putToFiler() for both regular and multipart uploads. * feat: add S3 API Calls per Bucket panel to Grafana dashboards Added a new panel showing API calls per bucket using the existing SeaweedFS_s3_request_total metric aggregated by bucket. Updated all Grafana dashboard files: - other/metrics/grafana_seaweedfs.json - other/metrics/grafana_seaweedfs_k8s.json - other/metrics/grafana_seaweedfs_heartbeat.json - k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json * address PR comments: use actual bytes written for traffic metrics - Use actual bytes written from w.Write instead of expected size for inline content - Add countingWriter wrapper to track actual bytes for chunked content streaming - Update streamDecryptedRangeFromChunks to return actual bytes written for SSE - Remove redundant nil check that caused linter warning - Fix duplicate panel id 86 in grafana_seaweedfs.json (changed to 90) - Fix overlapping panel positions in grafana_seaweedfs_k8s.json (rebalanced x positions) * fix grafana k8s dashboard: rebalance S3 panels to avoid overlap - Panel 86 (S3 API Calls per Bucket): w:6, x:0, y:15 - Panel 67 (S3 Request Duration 95th): w:6, x:6, y:15 - Panel 68 (S3 Request Duration 80th): w:6, x:12, y:15 - Panel 65 (S3 Request Duration 99th): w:6, x:18, y:15 All four S3 panels now fit in a single row (y:15) with width 6 each. Filer row header at y:22 and subsequent panels remain correctly positioned. * add input validation and clarify comments in adjustRangeForPart - Add validation that partStartOffset <= partEndOffset at function start - Add clarifying comments for suffix-range handling where clientEnd temporarily holds the suffix length before being reassigned * align pluginVersion for panel 86 to 10.3.1 in k8s dashboard * track partial writes for accurate egress traffic accounting - Change condition from 'err == nil' to 'written > 0' for inline content - Move BucketTrafficSent before error check for chunked content streaming - Track traffic even on partial SSE range writes - Track traffic even on partial full SSE object copies This ensures egress traffic is counted even when writes fail partway through, providing more accurate bandwidth metrics. --- .../seaweedfs-grafana-dashboard.json | 99 ++++++++++++++++- other/metrics/grafana_seaweedfs.json | 97 +++++++++++++++- .../metrics/grafana_seaweedfs_heartbeat.json | 79 +++++++++++++ other/metrics/grafana_seaweedfs_k8s.json | 105 +++++++++++++++++- weed/s3api/s3api_object_handlers.go | 85 +++++++++----- 5 files changed, 428 insertions(+), 37 deletions(-) diff --git a/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json b/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json index 30b43f867..c4939592f 100644 --- a/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json +++ b/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json @@ -1652,17 +1652,112 @@ } ] }, - "unit": "s", + "unit": "reqps", "unitScale": true }, "overrides": [] }, "gridPos": { "h": 7, - "w": 24, + "w": 12, "x": 0, "y": 41 }, + "id": 86, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.3.1", + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total{namespace=\"$NAMESPACE\"}[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "title": "S3 API Calls per Bucket", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s", + "unitScale": true + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 41 + }, "id": 72, "links": [], "options": { diff --git a/other/metrics/grafana_seaweedfs.json b/other/metrics/grafana_seaweedfs.json index 147677294..36705b450 100644 --- a/other/metrics/grafana_seaweedfs.json +++ b/other/metrics/grafana_seaweedfs.json @@ -1319,6 +1319,99 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fill": 1, + "fillGradient": 0, + "fieldConfig": { + "defaults": { + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 16 + }, + "hiddenSeries": false, + "id": 90, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "8.1.2", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "S3 API Calls per Bucket", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "reqps", + "logBase": 1, + "min": 0, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -1332,8 +1425,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 24, - "x": 0, + "w": 12, + "x": 12, "y": 16 }, "hiddenSeries": false, diff --git a/other/metrics/grafana_seaweedfs_heartbeat.json b/other/metrics/grafana_seaweedfs_heartbeat.json index e3ab94eb9..59821f136 100644 --- a/other/metrics/grafana_seaweedfs_heartbeat.json +++ b/other/metrics/grafana_seaweedfs_heartbeat.json @@ -875,6 +875,85 @@ } ] }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS-DEV}", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 16 + }, + "hiddenSeries": false, + "id": 86, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "S3 API Calls per Bucket", + "tooltip": { + "msResolution": true, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "reqps", + "logBase": 1, + "min": 0, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": false + } + ] + }, { "aliasColors": {}, "bars": false, diff --git a/other/metrics/grafana_seaweedfs_k8s.json b/other/metrics/grafana_seaweedfs_k8s.json index 50f56c7bd..adae8a157 100644 --- a/other/metrics/grafana_seaweedfs_k8s.json +++ b/other/metrics/grafana_seaweedfs_k8s.json @@ -447,6 +447,99 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_PROMETHEUS", + "fill": 1, + "fillGradient": 0, + "fieldConfig": { + "defaults": { + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 15 + }, + "hiddenSeries": false, + "id": 86, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "8.1.2", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total{namespace=\"$namespace\",service=~\"$service-api\"}[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "S3 API Calls per Bucket", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "reqps", + "logBase": 1, + "min": 0, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -466,8 +559,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 8, - "x": 0, + "w": 6, + "x": 12, "y": 15 }, "hiddenSeries": false, @@ -577,8 +670,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 8, - "x": 8, + "w": 6, + "x": 6, "y": 15 }, "hiddenSeries": false, @@ -688,8 +781,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 8, - "x": 16, + "w": 6, + "x": 18, "y": 15 }, "hiddenSeries": false, diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 93d857cdc..42d198673 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -24,8 +24,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "github.com/seaweedfs/seaweedfs/weed/util/mem" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -45,6 +45,18 @@ var corsHeaders = []string{ // Package-level to avoid per-call allocations in writeZeroBytes var zeroBuf = make([]byte, 32*1024) +// countingWriter wraps an io.Writer to count bytes written +type countingWriter struct { + w io.Writer + written int64 +} + +func (cw *countingWriter) Write(p []byte) (int, error) { + n, err := cw.w.Write(p) + cw.written += int64(n) + return n, err +} + // adjustRangeForPart adjusts a client's Range header to absolute offsets within a part. // Parameters: // - partStartOffset: the absolute start offset of the part in the object @@ -56,6 +68,11 @@ var zeroBuf = make([]byte, 32*1024) // - adjustedEnd: the adjusted absolute end offset // - error: nil on success, error if the range is invalid func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader string) (adjustedStart, adjustedEnd int64, err error) { + // Validate inputs + if partStartOffset > partEndOffset { + return 0, 0, fmt.Errorf("invalid part boundaries: start %d > end %d", partStartOffset, partEndOffset) + } + // If no range header, return the full part if clientRangeHeader == "" || !strings.HasPrefix(clientRangeHeader, "bytes=") { return partStartOffset, partEndOffset, nil @@ -92,14 +109,15 @@ func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader } // Handle suffix-range (e.g., "bytes=-100" means last 100 bytes) + // When parts[0] is empty, the parsed clientEnd value represents the suffix length, + // not the actual end position. We compute the actual start/end from the suffix length. if parts[0] == "" { - // suffix-range: clientEnd is actually the suffix length - suffixLength := clientEnd + suffixLength := clientEnd // clientEnd temporarily holds the suffix length if suffixLength > partSize { suffixLength = partSize } clientStart = partSize - suffixLength - clientEnd = partSize - 1 + clientEnd = partSize - 1 // Now clientEnd holds the actual end position } // Validate range is within part boundaries @@ -201,7 +219,6 @@ func removeDuplicateSlashes(object string) string { return result.String() } - // hasChildren checks if a path has any child objects (is a directory with contents) // // This helper function is used to distinguish implicit directories from regular files or empty directories. @@ -720,13 +737,6 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // This eliminates the 19ms filer proxy overhead // SSE decryption is handled inline during streaming - // Safety check: entry must be valid before streaming - if objectEntryForSSE == nil { - glog.Errorf("GetObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - // Detect SSE encryption type primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) @@ -887,13 +897,19 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) w.WriteHeader(http.StatusPartialContent) - _, err := w.Write(entry.Content[start:end]) + written, err := w.Write(entry.Content[start:end]) + if written > 0 { + BucketTrafficSent(int64(written), r) + } return err } // Non-range request for inline content s3a.setResponseHeaders(w, r, entry, totalSize) w.WriteHeader(http.StatusOK) - _, err := w.Write(entry.Content) + written, err := w.Write(entry.Content) + if written > 0 { + BucketTrafficSent(int64(written), r) + } return err } @@ -977,17 +993,22 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R w.WriteHeader(http.StatusOK) } - // Stream directly to response + // Stream directly to response with counting wrapper tStreamExec := time.Now() glog.V(4).Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size) - err = streamFn(w) + cw := &countingWriter{w: w} + err = streamFn(cw) streamExecTime = time.Since(tStreamExec) + // Track traffic even on partial writes for accurate egress accounting + if cw.written > 0 { + BucketTrafficSent(cw.written, r) + } if err != nil { - glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err) + glog.Errorf("streamFromVolumeServers: streamFn failed after writing %d bytes: %v", cw.written, err) // Streaming error after WriteHeader was called - response already partially written return newStreamErrorWithResponse(err) } - glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully") + glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully, wrote %d bytes", cw.written) return nil } @@ -1189,9 +1210,13 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r if isRangeRequest { glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size) streamFetchTime = 0 // No full stream fetch in range-aware path - err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) + written, err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) decryptSetupTime = time.Since(tDecryptSetup) copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path + // Track traffic even on partial writes for accurate egress accounting + if written > 0 { + BucketTrafficSent(written, r) + } if err != nil { // Error after WriteHeader - response already written return newStreamErrorWithResponse(err) @@ -1338,6 +1363,10 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r buf := make([]byte, 128*1024) copied, copyErr := io.CopyBuffer(w, decryptedReader, buf) copyTime = time.Since(tCopy) + // Track traffic even on partial writes for accurate egress accounting + if copied > 0 { + BucketTrafficSent(copied, r) + } if copyErr != nil { glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) // Error after WriteHeader - response already written @@ -1349,7 +1378,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r // streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks // This implements the filer's ViewFromChunks approach for optimal range performance -func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error { +// Returns the number of bytes written and any error +func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) (int64, error) { // Use filer's ViewFromChunks to resolve only needed chunks for the range lookupFileIdFn := s3a.createLookupFileIdFunction() chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size) @@ -1366,7 +1396,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io gap := chunkView.ViewOffset - targetOffset glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset) if err := writeZeroBytes(w, gap); err != nil { - return fmt.Errorf("failed to write zero padding: %w", err) + return totalWritten, fmt.Errorf("failed to write zero padding: %w", err) } totalWritten += gap targetOffset = chunkView.ViewOffset @@ -1381,7 +1411,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io } } if fileChunk == nil { - return fmt.Errorf("chunk %s not found in entry", chunkView.FileId) + return totalWritten, fmt.Errorf("chunk %s not found in entry", chunkView.FileId) } // Fetch and decrypt this chunk view @@ -1401,7 +1431,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io } if err != nil { - return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) + return totalWritten, fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) } // Copy the decrypted chunk data @@ -1414,12 +1444,12 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io } if copyErr != nil { glog.Errorf("streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v", written, chunkView.ViewSize, copyErr) - return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) + return totalWritten, fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) } if written != int64(chunkView.ViewSize) { glog.Errorf("streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d", written, chunkView.ViewSize) - return fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId) + return totalWritten, fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId) } totalWritten += written @@ -1432,12 +1462,13 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io if remaining > 0 { glog.V(4).Infof("Writing %d trailing zero bytes", remaining) if err := writeZeroBytes(w, remaining); err != nil { - return fmt.Errorf("failed to write trailing zeros: %w", err) + return totalWritten, fmt.Errorf("failed to write trailing zeros: %w", err) } + totalWritten += remaining } glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size) - return nil + return totalWritten, nil } // writeZeroBytes writes n zero bytes to writer using the package-level zero buffer