Browse Source

Merge branch 'master' into 6649

pull/7403/head
Chris Lu 1 month ago
committed by GitHub
parent
commit
61afec8ce6
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 9
      .github/workflows/test-s3-over-https-using-awscli.yml
  2. 15
      test/s3/cors/s3_cors_http_test.go
  3. 51
      weed/s3api/cors/middleware.go
  4. 405
      weed/s3api/cors/middleware_test.go
  5. 41
      weed/s3api/s3api_bucket_cors_handlers.go
  6. 81
      weed/s3api/s3api_conditional_headers_test.go
  7. 5
      weed/s3api/s3api_object_handlers_put.go
  8. 40
      weed/util/log_buffer/log_buffer_flush_gap_test.go

9
.github/workflows/test-s3-over-https-using-awscli.yml

@ -77,3 +77,12 @@ jobs:
aws --no-verify-ssl s3 cp --no-progress s3://bucket/test-multipart downloaded
diff -q generated downloaded
rm -f generated downloaded
- name: Test GetObject with If-Match
run: |
set -e
dd if=/dev/urandom of=generated bs=1M count=32
ETAG=$(aws --no-verify-ssl s3api put-object --bucket bucket --key test-get-obj --body generated | jq -r .ETag)
aws --no-verify-ssl s3api get-object --bucket bucket --key test-get-obj --if-match ${ETAG:1:32} downloaded
diff -q generated downloaded
rm -f generated downloaded

15
test/s3/cors/s3_cors_http_test.go

@ -398,13 +398,15 @@ func TestCORSHeaderMatching(t *testing.T) {
}
}
// TestCORSWithoutConfiguration tests CORS behavior when no configuration is set
// TestCORSWithoutConfiguration tests CORS behavior when no bucket-level configuration is set
// With the fallback feature, buckets without explicit CORS config will use the global CORS settings
func TestCORSWithoutConfiguration(t *testing.T) {
client := getS3Client(t)
bucketName := createTestBucket(t, client)
defer cleanupTestBucket(t, client, bucketName)
// Test preflight request without CORS configuration
// Test preflight request without bucket-level CORS configuration
// The global CORS fallback (default: "*") should be used
httpClient := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequest("OPTIONS", fmt.Sprintf("%s/%s/test-object", getDefaultConfig().Endpoint, bucketName), nil)
@ -412,15 +414,16 @@ func TestCORSWithoutConfiguration(t *testing.T) {
req.Header.Set("Origin", "https://example.com")
req.Header.Set("Access-Control-Request-Method", "GET")
req.Header.Set("Access-Control-Request-Headers", "Content-Type")
resp, err := httpClient.Do(req)
require.NoError(t, err, "Should be able to send OPTIONS request")
defer resp.Body.Close()
// Without CORS configuration, CORS headers should not be present
assert.Empty(t, resp.Header.Get("Access-Control-Allow-Origin"), "Should not have Allow-Origin header without CORS config")
assert.Empty(t, resp.Header.Get("Access-Control-Allow-Methods"), "Should not have Allow-Methods header without CORS config")
assert.Empty(t, resp.Header.Get("Access-Control-Allow-Headers"), "Should not have Allow-Headers header without CORS config")
// With fallback CORS (global default: "*"), CORS headers should be present
assert.Equal(t, "https://example.com", resp.Header.Get("Access-Control-Allow-Origin"), "Should have Allow-Origin header from global fallback")
assert.Contains(t, resp.Header.Get("Access-Control-Allow-Methods"), "GET", "Should have GET in Allow-Methods from global fallback")
assert.Contains(t, resp.Header.Get("Access-Control-Allow-Headers"), "Content-Type", "Should have requested headers in Allow-Headers from global fallback")
}
// TestCORSMethodMatching tests method matching

51
weed/s3api/cors/middleware.go

@ -22,16 +22,47 @@ type CORSConfigGetter interface {
type Middleware struct {
bucketChecker BucketChecker
corsConfigGetter CORSConfigGetter
fallbackConfig *CORSConfiguration // Global CORS configuration as fallback
}
// NewMiddleware creates a new CORS middleware instance
func NewMiddleware(bucketChecker BucketChecker, corsConfigGetter CORSConfigGetter) *Middleware {
// NewMiddleware creates a new CORS middleware instance with optional global fallback config
func NewMiddleware(bucketChecker BucketChecker, corsConfigGetter CORSConfigGetter, fallbackConfig *CORSConfiguration) *Middleware {
return &Middleware{
bucketChecker: bucketChecker,
corsConfigGetter: corsConfigGetter,
fallbackConfig: fallbackConfig,
}
}
// getCORSConfig retrieves the applicable CORS configuration, trying bucket-specific first, then fallback.
// Returns the configuration and a boolean indicating if any configuration was found.
// Only falls back to global config when there's explicitly no bucket-level config.
// For other errors (e.g., access denied), returns false to let the handler deny the request.
func (m *Middleware) getCORSConfig(bucket string) (*CORSConfiguration, bool) {
config, errCode := m.corsConfigGetter.GetCORSConfiguration(bucket)
switch errCode {
case s3err.ErrNone:
if config != nil {
// Found a bucket-specific config, use it.
return config, true
}
// No bucket config, proceed to fallback.
case s3err.ErrNoSuchCORSConfiguration:
// No bucket config, proceed to fallback.
default:
// Any other error means we should not proceed.
return nil, false
}
// No bucket-level config found, try global fallback
if m.fallbackConfig != nil {
return m.fallbackConfig, true
}
return nil, false
}
// Handler returns the CORS middleware handler
func (m *Middleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -58,10 +89,10 @@ func (m *Middleware) Handler(next http.Handler) http.Handler {
return
}
// Load CORS configuration from cache
config, errCode := m.corsConfigGetter.GetCORSConfiguration(bucket)
if errCode != s3err.ErrNone || config == nil {
// No CORS configuration, handle based on request type
// Get CORS configuration (bucket-specific or fallback)
config, found := m.getCORSConfig(bucket)
if !found {
// No CORS configuration at all, handle based on request type
if corsReq.IsPreflightRequest {
// Preflight request without CORS config should fail
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
@ -126,10 +157,10 @@ func (m *Middleware) HandleOptionsRequest(w http.ResponseWriter, r *http.Request
return
}
// Load CORS configuration from cache
config, errCode := m.corsConfigGetter.GetCORSConfiguration(bucket)
if errCode != s3err.ErrNone || config == nil {
// No CORS configuration for OPTIONS request should return access denied
// Get CORS configuration (bucket-specific or fallback)
config, found := m.getCORSConfig(bucket)
if !found {
// No CORS configuration at all for OPTIONS request should return access denied
if corsReq.IsPreflightRequest {
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return

405
weed/s3api/cors/middleware_test.go

@ -0,0 +1,405 @@
package cors
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// Mock implementations for testing
type mockBucketChecker struct {
bucketExists bool
}
func (m *mockBucketChecker) CheckBucket(r *http.Request, bucket string) s3err.ErrorCode {
if m.bucketExists {
return s3err.ErrNone
}
return s3err.ErrNoSuchBucket
}
type mockCORSConfigGetter struct {
config *CORSConfiguration
errCode s3err.ErrorCode
}
func (m *mockCORSConfigGetter) GetCORSConfiguration(bucket string) (*CORSConfiguration, s3err.ErrorCode) {
return m.config, m.errCode
}
// TestMiddlewareFallbackConfig tests that the middleware uses fallback config when bucket-level config is not available
func TestMiddlewareFallbackConfig(t *testing.T) {
tests := []struct {
name string
bucketConfig *CORSConfiguration
fallbackConfig *CORSConfiguration
requestOrigin string
requestMethod string
isOptions bool
expectedStatus int
expectedOriginHeader string
description string
}{
{
name: "No bucket config, fallback to global config with wildcard",
bucketConfig: nil,
fallbackConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "HEAD"},
AllowedHeaders: []string{"*"},
},
},
},
requestOrigin: "https://example.com",
requestMethod: "GET",
isOptions: false,
expectedStatus: http.StatusOK,
expectedOriginHeader: "https://example.com",
description: "Should use fallback global config when no bucket config exists",
},
{
name: "No bucket config, fallback to global config with specific origin",
bucketConfig: nil,
fallbackConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"https://example.com"},
AllowedMethods: []string{"GET", "POST"},
AllowedHeaders: []string{"*"},
},
},
},
requestOrigin: "https://example.com",
requestMethod: "GET",
isOptions: false,
expectedStatus: http.StatusOK,
expectedOriginHeader: "https://example.com",
description: "Should use fallback config with specific origin match",
},
{
name: "No bucket config, fallback rejects non-matching origin",
bucketConfig: nil,
fallbackConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"https://allowed.com"},
AllowedMethods: []string{"GET"},
AllowedHeaders: []string{"*"},
},
},
},
requestOrigin: "https://notallowed.com",
requestMethod: "GET",
isOptions: false,
expectedStatus: http.StatusOK,
expectedOriginHeader: "",
description: "Should not apply CORS headers when origin doesn't match fallback config",
},
{
name: "Bucket config takes precedence over fallback",
bucketConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"https://bucket-specific.com"},
AllowedMethods: []string{"GET"},
AllowedHeaders: []string{"*"},
},
},
},
fallbackConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST"},
AllowedHeaders: []string{"*"},
},
},
},
requestOrigin: "https://bucket-specific.com",
requestMethod: "GET",
isOptions: false,
expectedStatus: http.StatusOK,
expectedOriginHeader: "https://bucket-specific.com",
description: "Bucket-level config should be used instead of fallback",
},
{
name: "Bucket config rejects, even though fallback would allow",
bucketConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"https://restricted.com"},
AllowedMethods: []string{"GET"},
AllowedHeaders: []string{"*"},
},
},
},
fallbackConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST"},
AllowedHeaders: []string{"*"},
},
},
},
requestOrigin: "https://example.com",
requestMethod: "GET",
isOptions: false,
expectedStatus: http.StatusOK,
expectedOriginHeader: "",
description: "Bucket-level config is authoritative, fallback should not apply",
},
{
name: "No config at all, no CORS headers",
bucketConfig: nil,
fallbackConfig: nil,
requestOrigin: "https://example.com",
requestMethod: "GET",
isOptions: false,
expectedStatus: http.StatusOK,
expectedOriginHeader: "",
description: "Without any config, no CORS headers should be applied",
},
{
name: "OPTIONS preflight with fallback config",
bucketConfig: nil,
fallbackConfig: &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"https://example.com"},
AllowedMethods: []string{"GET", "POST"},
AllowedHeaders: []string{"*"},
},
},
},
requestOrigin: "https://example.com",
requestMethod: "OPTIONS",
isOptions: true,
expectedStatus: http.StatusOK,
expectedOriginHeader: "https://example.com",
description: "OPTIONS preflight should work with fallback config",
},
{
name: "OPTIONS preflight without any config should fail",
bucketConfig: nil,
fallbackConfig: nil,
requestOrigin: "https://example.com",
requestMethod: "OPTIONS",
isOptions: true,
expectedStatus: http.StatusForbidden,
expectedOriginHeader: "",
description: "OPTIONS preflight without config should return 403",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup mocks
bucketChecker := &mockBucketChecker{bucketExists: true}
configGetter := &mockCORSConfigGetter{
config: tt.bucketConfig,
errCode: s3err.ErrNone,
}
// Create middleware with optional fallback
middleware := NewMiddleware(bucketChecker, configGetter, tt.fallbackConfig)
// Create request with mux variables
req := httptest.NewRequest(tt.requestMethod, "/testbucket/testobject", nil)
req = mux.SetURLVars(req, map[string]string{
"bucket": "testbucket",
"object": "testobject",
})
if tt.requestOrigin != "" {
req.Header.Set("Origin", tt.requestOrigin)
}
if tt.isOptions {
req.Header.Set("Access-Control-Request-Method", "GET")
}
// Create response recorder
w := httptest.NewRecorder()
// Create a simple handler that returns 200 OK
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
// Execute middleware
if tt.isOptions {
middleware.HandleOptionsRequest(w, req)
} else {
middleware.Handler(nextHandler).ServeHTTP(w, req)
}
// Check status code
if w.Code != tt.expectedStatus {
t.Errorf("%s: expected status %d, got %d", tt.description, tt.expectedStatus, w.Code)
}
// Check CORS header
actualOrigin := w.Header().Get("Access-Control-Allow-Origin")
if actualOrigin != tt.expectedOriginHeader {
t.Errorf("%s: expected Access-Control-Allow-Origin='%s', got '%s'",
tt.description, tt.expectedOriginHeader, actualOrigin)
}
})
}
}
// TestMiddlewareFallbackConfigWithMultipleOrigins tests fallback with multiple allowed origins
func TestMiddlewareFallbackConfigWithMultipleOrigins(t *testing.T) {
fallbackConfig := &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"https://example1.com", "https://example2.com"},
AllowedMethods: []string{"GET", "POST"},
AllowedHeaders: []string{"*"},
},
},
}
bucketChecker := &mockBucketChecker{bucketExists: true}
configGetter := &mockCORSConfigGetter{
config: nil, // No bucket config
errCode: s3err.ErrNone,
}
middleware := NewMiddleware(bucketChecker, configGetter, fallbackConfig)
tests := []struct {
origin string
shouldMatch bool
description string
}{
{
origin: "https://example1.com",
shouldMatch: true,
description: "First allowed origin should match",
},
{
origin: "https://example2.com",
shouldMatch: true,
description: "Second allowed origin should match",
},
{
origin: "https://example3.com",
shouldMatch: false,
description: "Non-allowed origin should not match",
},
}
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
req := httptest.NewRequest("GET", "/testbucket/testobject", nil)
req = mux.SetURLVars(req, map[string]string{
"bucket": "testbucket",
"object": "testobject",
})
req.Header.Set("Origin", tt.origin)
w := httptest.NewRecorder()
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
middleware.Handler(nextHandler).ServeHTTP(w, req)
actualOrigin := w.Header().Get("Access-Control-Allow-Origin")
if tt.shouldMatch {
if actualOrigin != tt.origin {
t.Errorf("%s: expected Access-Control-Allow-Origin='%s', got '%s'",
tt.description, tt.origin, actualOrigin)
}
} else {
if actualOrigin != "" {
t.Errorf("%s: expected no Access-Control-Allow-Origin header, got '%s'",
tt.description, actualOrigin)
}
}
})
}
}
// TestMiddlewareFallbackWithError tests that real errors (not "no config") don't trigger fallback
func TestMiddlewareFallbackWithError(t *testing.T) {
fallbackConfig := &CORSConfiguration{
CORSRules: []CORSRule{
{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST"},
AllowedHeaders: []string{"*"},
},
},
}
tests := []struct {
name string
errCode s3err.ErrorCode
expectedOriginHeader string
description string
}{
{
name: "ErrAccessDenied should not trigger fallback",
errCode: s3err.ErrAccessDenied,
expectedOriginHeader: "",
description: "Access denied errors should not expose CORS headers",
},
{
name: "ErrInternalError should not trigger fallback",
errCode: s3err.ErrInternalError,
expectedOriginHeader: "",
description: "Internal errors should not expose CORS headers",
},
{
name: "ErrNoSuchBucket should not trigger fallback",
errCode: s3err.ErrNoSuchBucket,
expectedOriginHeader: "",
description: "Bucket not found errors should not expose CORS headers",
},
{
name: "ErrNoSuchCORSConfiguration should trigger fallback",
errCode: s3err.ErrNoSuchCORSConfiguration,
expectedOriginHeader: "https://example.com",
description: "Explicit no CORS config should use fallback",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bucketChecker := &mockBucketChecker{bucketExists: true}
configGetter := &mockCORSConfigGetter{
config: nil,
errCode: tt.errCode,
}
middleware := NewMiddleware(bucketChecker, configGetter, fallbackConfig)
req := httptest.NewRequest("GET", "/testbucket/testobject", nil)
req = mux.SetURLVars(req, map[string]string{
"bucket": "testbucket",
"object": "testobject",
})
req.Header.Set("Origin", "https://example.com")
w := httptest.NewRecorder()
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
middleware.Handler(nextHandler).ServeHTTP(w, req)
actualOrigin := w.Header().Get("Access-Control-Allow-Origin")
if actualOrigin != tt.expectedOriginHeader {
t.Errorf("%s: expected Access-Control-Allow-Origin='%s', got '%s'",
tt.description, tt.expectedOriginHeader, actualOrigin)
}
})
}
}

41
weed/s3api/s3api_bucket_cors_handlers.go

@ -10,6 +10,19 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// Default CORS configuration for global fallback
var (
defaultFallbackAllowedMethods = []string{"GET", "PUT", "POST", "DELETE", "HEAD"}
defaultFallbackExposeHeaders = []string{
"ETag",
"Content-Length",
"Content-Type",
"Last-Modified",
"x-amz-request-id",
"x-amz-version-id",
}
)
// S3BucketChecker implements cors.BucketChecker interface
type S3BucketChecker struct {
server *S3ApiServer
@ -28,12 +41,36 @@ func (g *S3CORSConfigGetter) GetCORSConfiguration(bucket string) (*cors.CORSConf
return g.server.getCORSConfiguration(bucket)
}
// getCORSMiddleware returns a CORS middleware instance with caching
// getCORSMiddleware returns a CORS middleware instance with global fallback config
func (s3a *S3ApiServer) getCORSMiddleware() *cors.Middleware {
bucketChecker := &S3BucketChecker{server: s3a}
corsConfigGetter := &S3CORSConfigGetter{server: s3a}
return cors.NewMiddleware(bucketChecker, corsConfigGetter)
// Create fallback CORS configuration from global AllowedOrigins setting
fallbackConfig := s3a.createFallbackCORSConfig()
return cors.NewMiddleware(bucketChecker, corsConfigGetter, fallbackConfig)
}
// createFallbackCORSConfig creates a CORS configuration from global AllowedOrigins
func (s3a *S3ApiServer) createFallbackCORSConfig() *cors.CORSConfiguration {
if len(s3a.option.AllowedOrigins) == 0 {
return nil
}
// Create a permissive CORS rule based on global allowed origins
// This matches the behavior of handleCORSOriginValidation
rule := cors.CORSRule{
AllowedOrigins: s3a.option.AllowedOrigins,
AllowedMethods: defaultFallbackAllowedMethods,
AllowedHeaders: []string{"*"},
ExposeHeaders: defaultFallbackExposeHeaders,
MaxAgeSeconds: nil, // No max age by default
}
return &cors.CORSConfiguration{
CORSRules: []cors.CORSRule{rule},
}
}
// GetBucketCorsHandler handles Get bucket CORS configuration

81
weed/s3api/s3api_conditional_headers_test.go

@ -2,6 +2,7 @@ package s3api
import (
"bytes"
"encoding/hex"
"fmt"
"net/http"
"net/url"
@ -671,6 +672,86 @@ func TestETagMatching(t *testing.T) {
}
}
// TestGetObjectETagWithMd5AndChunks tests the fix for issue #7274
// When an object has both Attributes.Md5 and multiple chunks, getObjectETag should
// prefer Attributes.Md5 to match the behavior of HeadObject and filer.ETag
func TestGetObjectETagWithMd5AndChunks(t *testing.T) {
s3a := NewS3ApiServerForTest()
if s3a == nil {
t.Skip("S3ApiServer not available for testing")
}
// Create an object with both Md5 and multiple chunks (like in issue #7274)
// Md5: ZjcmMwrCVGNVgb4HoqHe9g== (base64) = 663726330ac254635581be07a2a1def6 (hex)
md5HexString := "663726330ac254635581be07a2a1def6"
md5Bytes, err := hex.DecodeString(md5HexString)
if err != nil {
t.Fatalf("failed to decode md5 hex string: %v", err)
}
entry := &filer_pb.Entry{
Name: "test-multipart-object",
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
FileSize: 5597744,
Md5: md5Bytes,
},
// Two chunks - if we only used ETagChunks, it would return format "hash-2"
Chunks: []*filer_pb.FileChunk{
{
FileId: "chunk1",
Offset: 0,
Size: 4194304,
ETag: "9+yCD2DGwMG5uKwAd+y04Q==",
},
{
FileId: "chunk2",
Offset: 4194304,
Size: 1403440,
ETag: "cs6SVSTgZ8W3IbIrAKmklg==",
},
},
}
// getObjectETag should return the Md5 in hex with quotes
expectedETag := "\"" + md5HexString + "\""
actualETag := s3a.getObjectETag(entry)
if actualETag != expectedETag {
t.Errorf("Expected ETag %s, got %s", expectedETag, actualETag)
}
// Now test that conditional headers work with this ETag
bucket := "test-bucket"
object := "/test-object"
// Test If-Match with the Md5-based ETag (should succeed)
t.Run("IfMatch_WithMd5BasedETag_ShouldSucceed", func(t *testing.T) {
getter := createMockEntryGetter(entry)
req := createTestGetRequest(bucket, object)
// Client sends the ETag from HeadObject (without quotes)
req.Header.Set(s3_constants.IfMatch, md5HexString)
result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object)
if result.ErrorCode != s3err.ErrNone {
t.Errorf("Expected ErrNone when If-Match uses Md5-based ETag, got %v (ETag was %s)", result.ErrorCode, actualETag)
}
})
// Test If-Match with chunk-based ETag format (should fail - this was the old incorrect behavior)
t.Run("IfMatch_WithChunkBasedETag_ShouldFail", func(t *testing.T) {
getter := createMockEntryGetter(entry)
req := createTestGetRequest(bucket, object)
// If we incorrectly calculated ETag from chunks, it would be in format "hash-2"
req.Header.Set(s3_constants.IfMatch, "123294de680f28bde364b81477549f7d-2")
result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object)
if result.ErrorCode != s3err.ErrPreconditionFailed {
t.Errorf("Expected ErrPreconditionFailed when If-Match uses chunk-based ETag format, got %v", result.ErrorCode)
}
})
}
// TestConditionalHeadersIntegration tests conditional headers with full integration
func TestConditionalHeadersIntegration(t *testing.T) {
// This would be a full integration test that requires a running SeaweedFS instance

5
weed/s3api/s3api_object_handlers_put.go

@ -1257,6 +1257,11 @@ func (s3a *S3ApiServer) getObjectETag(entry *filer_pb.Entry) string {
if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
return string(etagBytes)
}
// Check for Md5 in Attributes (matches filer.ETag behavior)
// Note: len(nil slice) == 0 in Go, so no need for explicit nil check
if entry.Attributes != nil && len(entry.Attributes.Md5) > 0 {
return fmt.Sprintf("\"%x\"", entry.Attributes.Md5)
}
// Fallback: calculate ETag from chunks
return s3a.calculateETagFromChunks(entry.Chunks)
}

40
weed/util/log_buffer/log_buffer_flush_gap_test.go

@ -127,16 +127,16 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) {
t.Logf(" Gap: %d offsets", gap)
if gap > 0 {
t.Errorf("CRITICAL BUG REPRODUCED: OFFSET GAP DETECTED!")
t.Errorf("CRITICAL BUG REPRODUCED: OFFSET GAP DETECTED!")
t.Errorf(" Disk has offsets %d-%d", minFlushedOffset, maxFlushedOffset)
t.Errorf(" Memory buffer starts at: %d", bufferStartOffset)
t.Errorf(" MISSING OFFSETS: %d-%d (%d messages)", maxFlushedOffset+1, bufferStartOffset-1, gap)
t.Errorf(" These messages are LOST - neither on disk nor in memory!")
} else if gap < 0 {
t.Errorf("OFFSET OVERLAP: Memory buffer starts BEFORE last flushed offset!")
t.Errorf("OFFSET OVERLAP: Memory buffer starts BEFORE last flushed offset!")
t.Errorf(" This indicates data corruption or race condition")
} else {
t.Logf("PASS: No gap detected - offsets are continuous")
t.Logf("PASS: No gap detected - offsets are continuous")
}
// Check if we can read all expected offsets
@ -147,9 +147,9 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) {
buf, _, err := logBuffer.ReadFromBuffer(requestPosition)
isReadable := (buf != nil && len(buf.Bytes()) > 0) || err == ResumeFromDiskError
status := ""
status := "OK"
if !isReadable && err == nil {
status = "NOT READABLE"
status = "NOT READABLE"
}
t.Logf(" Offset %d: %s (buf=%v, err=%v)", testOffset, status, buf != nil, err)
@ -178,9 +178,9 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) {
t.Logf(" Missing: %d messages", expectedMessageCount-totalAccountedFor)
if totalAccountedFor < expectedMessageCount {
t.Errorf("DATA LOSS CONFIRMED: %d messages are missing!", expectedMessageCount-totalAccountedFor)
t.Errorf("DATA LOSS CONFIRMED: %d messages are missing!", expectedMessageCount-totalAccountedFor)
} else {
t.Logf("All messages accounted for")
t.Logf("All messages accounted for")
}
}
@ -249,7 +249,7 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) {
// CRITICAL: Check if bufferStartOffset advanced correctly
expectedNewStart := beforeFlushOffset
if afterFlushStart != expectedNewStart {
t.Errorf(" bufferStartOffset mismatch!")
t.Errorf(" bufferStartOffset mismatch!")
t.Errorf(" Expected: %d (= offset before flush)", expectedNewStart)
t.Errorf(" Actual: %d", afterFlushStart)
t.Errorf(" Gap: %d offsets", expectedNewStart-afterFlushStart)
@ -331,7 +331,7 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) {
t.Logf(" Missing: %d", expectedCount-totalAccountedFor)
if totalAccountedFor < expectedCount {
t.Errorf("DATA LOSS in concurrent scenario: %d messages missing!", expectedCount-totalAccountedFor)
t.Errorf("DATA LOSS in concurrent scenario: %d messages missing!", expectedCount-totalAccountedFor)
}
}
@ -441,7 +441,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) {
for _, msg := range flush.messages {
if allOffsets[msg.Offset] {
t.Errorf(" DUPLICATE: Offset %d appears multiple times!", msg.Offset)
t.Errorf(" DUPLICATE: Offset %d appears multiple times!", msg.Offset)
}
allOffsets[msg.Offset] = true
}
@ -457,7 +457,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) {
}
if len(missingOffsets) > 0 {
t.Errorf("\nMISSING OFFSETS DETECTED: %d offsets missing", len(missingOffsets))
t.Errorf("\nMISSING OFFSETS DETECTED: %d offsets missing", len(missingOffsets))
if len(missingOffsets) <= 20 {
t.Errorf("Missing: %v", missingOffsets)
} else {
@ -465,7 +465,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) {
}
t.Errorf("\nThis reproduces the production bug!")
} else {
t.Logf("\nSUCCESS: All %d Kafka offsets accounted for (0-%d)", nextKafkaOffset, nextKafkaOffset-1)
t.Logf("\nSUCCESS: All %d Kafka offsets accounted for (0-%d)", nextKafkaOffset, nextKafkaOffset-1)
}
// Check buffer offset consistency
@ -480,7 +480,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) {
t.Logf(" Expected (nextKafkaOffset): %d", nextKafkaOffset)
if finalOffset != nextKafkaOffset {
t.Errorf("logBuffer.offset mismatch: expected %d, got %d", nextKafkaOffset, finalOffset)
t.Errorf("logBuffer.offset mismatch: expected %d, got %d", nextKafkaOffset, finalOffset)
}
}
@ -575,14 +575,14 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) {
}
if len(missingOffsets) > 0 {
t.Errorf("MISSING OFFSETS after flush: %d offsets cannot be read", len(missingOffsets))
t.Errorf("MISSING OFFSETS after flush: %d offsets cannot be read", len(missingOffsets))
if len(missingOffsets) <= 20 {
t.Errorf("Missing: %v", missingOffsets)
} else {
t.Errorf("Missing: %v ... and %d more", missingOffsets[:20], len(missingOffsets)-20)
}
} else {
t.Logf("All 100 offsets can be read after flush")
t.Logf("All 100 offsets can be read after flush")
}
}
@ -646,12 +646,12 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) {
// CRITICAL CHECK: bufferStartOffset should advance to where offset was before flush
if afterFlushStart != afterAddOffset {
t.Errorf("FLUSH BUG: bufferStartOffset did NOT advance correctly!")
t.Errorf("FLUSH BUG: bufferStartOffset did NOT advance correctly!")
t.Errorf(" Expected bufferStartOffset=%d (= offset after add)", afterAddOffset)
t.Errorf(" Actual bufferStartOffset=%d", afterFlushStart)
t.Errorf(" Gap: %d offsets WILL BE LOST", afterAddOffset-afterFlushStart)
} else {
t.Logf("bufferStartOffset correctly advanced to %d", afterFlushStart)
t.Logf("bufferStartOffset correctly advanced to %d", afterFlushStart)
}
}
@ -668,11 +668,11 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) {
gap := currentMin - (prevMax + 1)
if gap > 0 {
t.Errorf("GAP between flush #%d and #%d: %d offsets missing!", i-1, i, gap)
t.Errorf("GAP between flush #%d and #%d: %d offsets missing!", i-1, i, gap)
} else if gap < 0 {
t.Errorf("OVERLAP between flush #%d and #%d: %d offsets duplicated!", i-1, i, -gap)
t.Errorf("OVERLAP between flush #%d and #%d: %d offsets duplicated!", i-1, i, -gap)
} else {
t.Logf(" Continuous with previous flush")
t.Logf(" Continuous with previous flush")
}
}
}

Loading…
Cancel
Save