diff --git a/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json b/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json index 30b43f867..c4939592f 100644 --- a/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json +++ b/k8s/charts/seaweedfs/dashboards/seaweedfs-grafana-dashboard.json @@ -1652,17 +1652,112 @@ } ] }, - "unit": "s", + "unit": "reqps", "unitScale": true }, "overrides": [] }, "gridPos": { "h": 7, - "w": 24, + "w": 12, "x": 0, "y": 41 }, + "id": 86, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.3.1", + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total{namespace=\"$NAMESPACE\"}[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "title": "S3 API Calls per Bucket", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s", + "unitScale": true + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 41 + }, "id": 72, "links": [], "options": { diff --git a/other/metrics/grafana_seaweedfs.json b/other/metrics/grafana_seaweedfs.json index 147677294..36705b450 100644 --- a/other/metrics/grafana_seaweedfs.json +++ b/other/metrics/grafana_seaweedfs.json @@ -1319,6 +1319,99 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fill": 1, + "fillGradient": 0, + "fieldConfig": { + "defaults": { + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 16 + }, + "hiddenSeries": false, + "id": 90, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "8.1.2", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "S3 API Calls per Bucket", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "reqps", + "logBase": 1, + "min": 0, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -1332,8 +1425,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 24, - "x": 0, + "w": 12, + "x": 12, "y": 16 }, "hiddenSeries": false, diff --git a/other/metrics/grafana_seaweedfs_heartbeat.json b/other/metrics/grafana_seaweedfs_heartbeat.json index e3ab94eb9..59821f136 100644 --- a/other/metrics/grafana_seaweedfs_heartbeat.json +++ b/other/metrics/grafana_seaweedfs_heartbeat.json @@ -875,6 +875,85 @@ } ] }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS-DEV}", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 16 + }, + "hiddenSeries": false, + "id": 86, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "S3 API Calls per Bucket", + "tooltip": { + "msResolution": true, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "reqps", + "logBase": 1, + "min": 0, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": false + } + ] + }, { "aliasColors": {}, "bars": false, diff --git a/other/metrics/grafana_seaweedfs_k8s.json b/other/metrics/grafana_seaweedfs_k8s.json index 50f56c7bd..adae8a157 100644 --- a/other/metrics/grafana_seaweedfs_k8s.json +++ b/other/metrics/grafana_seaweedfs_k8s.json @@ -447,6 +447,99 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_PROMETHEUS", + "fill": 1, + "fillGradient": 0, + "fieldConfig": { + "defaults": { + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 15 + }, + "hiddenSeries": false, + "id": 86, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "8.1.2", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(SeaweedFS_s3_request_total{namespace=\"$namespace\",service=~\"$service-api\"}[$__interval])) by (bucket)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{bucket}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "S3 API Calls per Bucket", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "reqps", + "logBase": 1, + "min": 0, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -466,8 +559,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 8, - "x": 0, + "w": 6, + "x": 12, "y": 15 }, "hiddenSeries": false, @@ -577,8 +670,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 8, - "x": 8, + "w": 6, + "x": 6, "y": 15 }, "hiddenSeries": false, @@ -688,8 +781,8 @@ "grid": {}, "gridPos": { "h": 7, - "w": 8, - "x": 16, + "w": 6, + "x": 18, "y": 15 }, "hiddenSeries": false, diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 93d857cdc..42d198673 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -24,8 +24,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "github.com/seaweedfs/seaweedfs/weed/util/mem" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -45,6 +45,18 @@ var corsHeaders = []string{ // Package-level to avoid per-call allocations in writeZeroBytes var zeroBuf = make([]byte, 32*1024) +// countingWriter wraps an io.Writer to count bytes written +type countingWriter struct { + w io.Writer + written int64 +} + +func (cw *countingWriter) Write(p []byte) (int, error) { + n, err := cw.w.Write(p) + cw.written += int64(n) + return n, err +} + // adjustRangeForPart adjusts a client's Range header to absolute offsets within a part. // Parameters: // - partStartOffset: the absolute start offset of the part in the object @@ -56,6 +68,11 @@ var zeroBuf = make([]byte, 32*1024) // - adjustedEnd: the adjusted absolute end offset // - error: nil on success, error if the range is invalid func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader string) (adjustedStart, adjustedEnd int64, err error) { + // Validate inputs + if partStartOffset > partEndOffset { + return 0, 0, fmt.Errorf("invalid part boundaries: start %d > end %d", partStartOffset, partEndOffset) + } + // If no range header, return the full part if clientRangeHeader == "" || !strings.HasPrefix(clientRangeHeader, "bytes=") { return partStartOffset, partEndOffset, nil @@ -92,14 +109,15 @@ func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader } // Handle suffix-range (e.g., "bytes=-100" means last 100 bytes) + // When parts[0] is empty, the parsed clientEnd value represents the suffix length, + // not the actual end position. We compute the actual start/end from the suffix length. if parts[0] == "" { - // suffix-range: clientEnd is actually the suffix length - suffixLength := clientEnd + suffixLength := clientEnd // clientEnd temporarily holds the suffix length if suffixLength > partSize { suffixLength = partSize } clientStart = partSize - suffixLength - clientEnd = partSize - 1 + clientEnd = partSize - 1 // Now clientEnd holds the actual end position } // Validate range is within part boundaries @@ -201,7 +219,6 @@ func removeDuplicateSlashes(object string) string { return result.String() } - // hasChildren checks if a path has any child objects (is a directory with contents) // // This helper function is used to distinguish implicit directories from regular files or empty directories. @@ -720,13 +737,6 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // This eliminates the 19ms filer proxy overhead // SSE decryption is handled inline during streaming - // Safety check: entry must be valid before streaming - if objectEntryForSSE == nil { - glog.Errorf("GetObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - // Detect SSE encryption type primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) @@ -887,13 +897,19 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) w.WriteHeader(http.StatusPartialContent) - _, err := w.Write(entry.Content[start:end]) + written, err := w.Write(entry.Content[start:end]) + if written > 0 { + BucketTrafficSent(int64(written), r) + } return err } // Non-range request for inline content s3a.setResponseHeaders(w, r, entry, totalSize) w.WriteHeader(http.StatusOK) - _, err := w.Write(entry.Content) + written, err := w.Write(entry.Content) + if written > 0 { + BucketTrafficSent(int64(written), r) + } return err } @@ -977,17 +993,22 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R w.WriteHeader(http.StatusOK) } - // Stream directly to response + // Stream directly to response with counting wrapper tStreamExec := time.Now() glog.V(4).Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size) - err = streamFn(w) + cw := &countingWriter{w: w} + err = streamFn(cw) streamExecTime = time.Since(tStreamExec) + // Track traffic even on partial writes for accurate egress accounting + if cw.written > 0 { + BucketTrafficSent(cw.written, r) + } if err != nil { - glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err) + glog.Errorf("streamFromVolumeServers: streamFn failed after writing %d bytes: %v", cw.written, err) // Streaming error after WriteHeader was called - response already partially written return newStreamErrorWithResponse(err) } - glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully") + glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully, wrote %d bytes", cw.written) return nil } @@ -1189,9 +1210,13 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r if isRangeRequest { glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size) streamFetchTime = 0 // No full stream fetch in range-aware path - err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) + written, err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) decryptSetupTime = time.Since(tDecryptSetup) copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path + // Track traffic even on partial writes for accurate egress accounting + if written > 0 { + BucketTrafficSent(written, r) + } if err != nil { // Error after WriteHeader - response already written return newStreamErrorWithResponse(err) @@ -1338,6 +1363,10 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r buf := make([]byte, 128*1024) copied, copyErr := io.CopyBuffer(w, decryptedReader, buf) copyTime = time.Since(tCopy) + // Track traffic even on partial writes for accurate egress accounting + if copied > 0 { + BucketTrafficSent(copied, r) + } if copyErr != nil { glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) // Error after WriteHeader - response already written @@ -1349,7 +1378,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r // streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks // This implements the filer's ViewFromChunks approach for optimal range performance -func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error { +// Returns the number of bytes written and any error +func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) (int64, error) { // Use filer's ViewFromChunks to resolve only needed chunks for the range lookupFileIdFn := s3a.createLookupFileIdFunction() chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size) @@ -1366,7 +1396,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io gap := chunkView.ViewOffset - targetOffset glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset) if err := writeZeroBytes(w, gap); err != nil { - return fmt.Errorf("failed to write zero padding: %w", err) + return totalWritten, fmt.Errorf("failed to write zero padding: %w", err) } totalWritten += gap targetOffset = chunkView.ViewOffset @@ -1381,7 +1411,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io } } if fileChunk == nil { - return fmt.Errorf("chunk %s not found in entry", chunkView.FileId) + return totalWritten, fmt.Errorf("chunk %s not found in entry", chunkView.FileId) } // Fetch and decrypt this chunk view @@ -1401,7 +1431,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io } if err != nil { - return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) + return totalWritten, fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) } // Copy the decrypted chunk data @@ -1414,12 +1444,12 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io } if copyErr != nil { glog.Errorf("streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v", written, chunkView.ViewSize, copyErr) - return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) + return totalWritten, fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) } if written != int64(chunkView.ViewSize) { glog.Errorf("streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d", written, chunkView.ViewSize) - return fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId) + return totalWritten, fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId) } totalWritten += written @@ -1432,12 +1462,13 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io if remaining > 0 { glog.V(4).Infof("Writing %d trailing zero bytes", remaining) if err := writeZeroBytes(w, remaining); err != nil { - return fmt.Errorf("failed to write trailing zeros: %w", err) + return totalWritten, fmt.Errorf("failed to write trailing zeros: %w", err) } + totalWritten += remaining } glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size) - return nil + return totalWritten, nil } // writeZeroBytes writes n zero bytes to writer using the package-level zero buffer