diff --git a/weed/notification/webhook/http.go b/weed/notification/webhook/http.go index 6b1a0e26d..82a8f8e71 100644 --- a/weed/notification/webhook/http.go +++ b/weed/notification/webhook/http.go @@ -8,16 +8,24 @@ import ( "fmt" "io" "net/http" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) +const ( + maxWebhookRetryDepth = 2 +) + type httpClient struct { - endpoint string - token string - timeout time.Duration + endpoint string + token string + timeout time.Duration + client *http.Client // Reused HTTP client with redirect prevention + endpointMu sync.RWMutex + finalURL string // Cached final URL after following redirects } func newHTTPClient(cfg *config) (*httpClient, error) { @@ -25,10 +33,24 @@ func newHTTPClient(cfg *config) (*httpClient, error) { endpoint: cfg.endpoint, token: cfg.authBearerToken, timeout: time.Duration(cfg.timeoutSeconds) * time.Second, + client: &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + }, }, nil } func (h *httpClient) sendMessage(message *webhookMessage) error { + return h.sendMessageWithRetry(message, 0) +} + +func (h *httpClient) sendMessageWithRetry(message *webhookMessage, depth int) error { + // Prevent infinite recursion + if depth > maxWebhookRetryDepth { + return fmt.Errorf("webhook max retry depth exceeded") + } + // Serialize the protobuf message to JSON for HTTP payload notificationData, err := json.Marshal(message.Notification) if err != nil { @@ -46,7 +68,16 @@ func (h *httpClient) sendMessage(message *webhookMessage) error { return fmt.Errorf("failed to marshal message: %w", err) } - req, err := http.NewRequest(http.MethodPost, h.endpoint, bytes.NewBuffer(jsonData)) + // Use cached final URL if available, otherwise use original endpoint + h.endpointMu.RLock() + targetURL := h.finalURL + usingCachedURL := targetURL != "" + if targetURL == "" { + targetURL = h.endpoint + } + h.endpointMu.RUnlock() + + req, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewBuffer(jsonData)) if err != nil { return fmt.Errorf("failed to create request: %w", err) } @@ -56,29 +87,88 @@ func (h *httpClient) sendMessage(message *webhookMessage) error { req.Header.Set("Authorization", "Bearer "+h.token) } + // Apply timeout via context (not on client) to avoid redundancy if h.timeout > 0 { ctx, cancel := context.WithTimeout(context.Background(), h.timeout) defer cancel() req = req.WithContext(ctx) } - resp, err := util_http.Do(req) + resp, err := h.client.Do(req) if err != nil { - if err = drainResponse(resp); err != nil { - glog.Errorf("failed to drain response: %v", err) + if drainErr := drainResponse(resp); drainErr != nil { + glog.Errorf("failed to drain response: %v", drainErr) + } + + // If using cached URL and request failed, clear cache and retry with original endpoint + if usingCachedURL && depth == 0 { + glog.V(1).Infof("Webhook request to cached URL %s failed, clearing cache and retrying with original endpoint", targetURL) + h.setFinalURL("") + return h.sendMessageWithRetry(message, depth+1) } return fmt.Errorf("failed to send request: %w", err) } - defer resp.Body.Close() + + // Handle redirects by caching the final destination and recreating POST request + if resp.StatusCode >= 300 && resp.StatusCode < 400 { + // Drain and close response body to enable connection reuse + util_http.CloseResponse(resp) + + location := resp.Header.Get("Location") + if location == "" { + return fmt.Errorf("webhook returned redirect status %d without Location header", resp.StatusCode) + } + + // Resolve relative URLs against the request URL + reqURL := req.URL + finalURL, err := reqURL.Parse(location) + if err != nil { + return fmt.Errorf("failed to parse redirect location: %w", err) + } + + // Update finalURL to follow the redirect for this attempt + finalURLStr := finalURL.String() + h.setFinalURL(finalURLStr) + + if depth == 0 { + glog.V(1).Infof("Webhook endpoint redirected from %s to %s, caching final destination", targetURL, finalURLStr) + } else { + glog.V(1).Infof("Webhook endpoint redirected from %s to %s (following redirect on retry)", targetURL, finalURLStr) + } + + // Recreate the POST request to the final destination (increment depth to prevent infinite loops) + return h.sendMessageWithRetry(message, depth+1) + } + + // If using cached URL and got an error response, clear cache and retry with original endpoint + if resp.StatusCode >= 400 && usingCachedURL && depth == 0 { + // Drain and close response body to enable connection reuse + util_http.CloseResponse(resp) + + glog.V(1).Infof("Webhook request to cached URL %s returned error %d, clearing cache and retrying with original endpoint", targetURL, resp.StatusCode) + h.setFinalURL("") + return h.sendMessageWithRetry(message, depth+1) + } if resp.StatusCode < 200 || resp.StatusCode >= 300 { + // Drain and close response body before returning error to enable connection reuse + util_http.CloseResponse(resp) return fmt.Errorf("webhook returned status code: %d", resp.StatusCode) } + // Drain and close response body on success to enable connection reuse + util_http.CloseResponse(resp) + return nil } +func (h *httpClient) setFinalURL(url string) { + h.endpointMu.Lock() + h.finalURL = url + h.endpointMu.Unlock() +} + func drainResponse(resp *http.Response) error { if resp == nil || resp.Body == nil { return nil diff --git a/weed/notification/webhook/http_test.go b/weed/notification/webhook/http_test.go index f7ef006ae..f64c01f2d 100644 --- a/weed/notification/webhook/http_test.go +++ b/weed/notification/webhook/http_test.go @@ -148,3 +148,375 @@ func TestHttpClientSendMessageNetworkError(t *testing.T) { t.Error("Expected error for network failure") } } + +// TestHttpClientFollowsRedirectAsPost verifies that redirects are followed with POST method preserved +func TestHttpClientFollowsRedirectAsPost(t *testing.T) { + redirectCalled := false + finalCalled := false + var finalMethod string + var finalBody map[string]interface{} + + // Create final destination server + finalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + finalCalled = true + finalMethod = r.Method + body, _ := io.ReadAll(r.Body) + json.Unmarshal(body, &finalBody) + w.WriteHeader(http.StatusOK) + })) + defer finalServer.Close() + + // Create redirect server + redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + redirectCalled = true + // Return 301 redirect to final server + http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently) + })) + defer redirectServer.Close() + + cfg := &config{ + endpoint: redirectServer.URL, + authBearerToken: "test-token", + timeoutSeconds: 5, + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + // Send message - should follow redirect and recreate POST request + err = client.sendMessage(newWebhookMessage("/test/path", message)) + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + + if !redirectCalled { + t.Error("Expected redirect server to be called") + } + + if !finalCalled { + t.Error("Expected final server to be called after redirect") + } + + if finalMethod != "POST" { + t.Errorf("Expected POST method at final destination, got %s", finalMethod) + } + + if finalBody["key"] != "/test/path" { + t.Errorf("Expected key '/test/path' at final destination, got %v", finalBody["key"]) + } + + // Verify the final URL is cached + client.endpointMu.RLock() + cachedURL := client.finalURL + client.endpointMu.RUnlock() + + if cachedURL != finalServer.URL { + t.Errorf("Expected cached URL %s, got %s", finalServer.URL, cachedURL) + } +} + +// TestHttpClientUsesCachedRedirect verifies that subsequent requests use the cached redirect destination +func TestHttpClientUsesCachedRedirect(t *testing.T) { + redirectCount := 0 + finalCount := 0 + + // Create final destination server + finalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + finalCount++ + w.WriteHeader(http.StatusOK) + })) + defer finalServer.Close() + + // Create redirect server + redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + redirectCount++ + http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently) + })) + defer redirectServer.Close() + + cfg := &config{ + endpoint: redirectServer.URL, + authBearerToken: "test-token", + timeoutSeconds: 5, + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + // First request - should hit redirect server + err = client.sendMessage(newWebhookMessage("/test/path1", message)) + if err != nil { + t.Fatalf("Failed to send first message: %v", err) + } + + if redirectCount != 1 { + t.Errorf("Expected 1 redirect call, got %d", redirectCount) + } + if finalCount != 1 { + t.Errorf("Expected 1 final call, got %d", finalCount) + } + + // Second request - should use cached URL and skip redirect server + err = client.sendMessage(newWebhookMessage("/test/path2", message)) + if err != nil { + t.Fatalf("Failed to send second message: %v", err) + } + + if redirectCount != 1 { + t.Errorf("Expected redirect server to be called only once (cached), got %d calls", redirectCount) + } + if finalCount != 2 { + t.Errorf("Expected 2 final calls, got %d", finalCount) + } +} + +// TestHttpClientPreservesPostMethod verifies POST method is preserved and not converted to GET +func TestHttpClientPreservesPostMethod(t *testing.T) { + var receivedMethod string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedMethod = r.Method + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "test-token", + timeoutSeconds: 5, + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + err = client.sendMessage(newWebhookMessage("/test/path", message)) + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + + if receivedMethod != "POST" { + t.Errorf("Expected POST method, got %s", receivedMethod) + } +} + +// TestHttpClientInvalidatesCacheOnError verifies that cache is invalidated when cached URL fails +func TestHttpClientInvalidatesCacheOnError(t *testing.T) { + finalServerDown := false // Start with server UP + originalCallCount := 0 + finalCallCount := 0 + + // Create final destination server that can be toggled + finalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + finalCallCount++ + if finalServerDown { + w.WriteHeader(http.StatusServiceUnavailable) + } else { + w.WriteHeader(http.StatusOK) + } + })) + defer finalServer.Close() + + // Create redirect server + redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + originalCallCount++ + http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently) + })) + defer redirectServer.Close() + + cfg := &config{ + endpoint: redirectServer.URL, + authBearerToken: "test-token", + timeoutSeconds: 5, + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + // First request - should follow redirect and cache the final URL + err = client.sendMessage(newWebhookMessage("/test/path1", message)) + if err != nil { + t.Fatalf("Failed to send first message: %v", err) + } + + if originalCallCount != 1 { + t.Errorf("Expected 1 original call, got %d", originalCallCount) + } + if finalCallCount != 1 { + t.Errorf("Expected 1 final call, got %d", finalCallCount) + } + + // Verify cache was set + client.endpointMu.RLock() + cachedURL := client.finalURL + client.endpointMu.RUnlock() + if cachedURL != finalServer.URL { + t.Errorf("Expected cached URL %s, got %s", finalServer.URL, cachedURL) + } + + // Second request with cached URL working - should use cache + err = client.sendMessage(newWebhookMessage("/test/path2", message)) + if err != nil { + t.Fatalf("Failed to send second message: %v", err) + } + + if originalCallCount != 1 { + t.Errorf("Expected still 1 original call (using cache), got %d", originalCallCount) + } + if finalCallCount != 2 { + t.Errorf("Expected 2 final calls, got %d", finalCallCount) + } + + // Third request - bring final server DOWN, should invalidate cache and retry with original + // Flow: cached URL (fail, depth=0) -> clear cache -> retry original (depth=1) -> redirect -> final (fail, depth=2) + finalServerDown = true + err = client.sendMessage(newWebhookMessage("/test/path3", message)) + if err == nil { + t.Error("Expected error when cached URL fails and retry also fails") + } + + // originalCallCount: 1 (initial) + 1 (retry after cache invalidation) = 2 + if originalCallCount != 2 { + t.Errorf("Expected 2 original calls, got %d", originalCallCount) + } + // finalCallCount: 2 (previous) + 1 (cached fail) + 1 (retry after redirect) = 4 + if finalCallCount != 4 { + t.Errorf("Expected 4 final calls, got %d", finalCallCount) + } + + // Verify final URL is still set (to the failed destination from the redirect) + client.endpointMu.RLock() + finalURLAfterError := client.finalURL + client.endpointMu.RUnlock() + if finalURLAfterError != finalServer.URL { + t.Errorf("Expected finalURL to be %s after error, got %s", finalServer.URL, finalURLAfterError) + } + + // Fourth request - bring final server back UP + // Since cache still has the final URL, it should use it directly + finalServerDown = false + err = client.sendMessage(newWebhookMessage("/test/path4", message)) + if err != nil { + t.Fatalf("Failed to send fourth message after recovery: %v", err) + } + + // Should have used the cached URL directly (no new original call) + // originalCallCount: still 2 + if originalCallCount != 2 { + t.Errorf("Expected 2 original calls (using cache), got %d", originalCallCount) + } + // finalCallCount: 4 + 1 = 5 + if finalCallCount != 5 { + t.Errorf("Expected 5 final calls, got %d", finalCallCount) + } + + // Verify cache was re-established + client.endpointMu.RLock() + reestablishedCache := client.finalURL + client.endpointMu.RUnlock() + if reestablishedCache != finalServer.URL { + t.Errorf("Expected cache to be re-established to %s, got %s", finalServer.URL, reestablishedCache) + } +} + +// TestHttpClientInvalidatesCacheOnNetworkError verifies cache invalidation on network errors +func TestHttpClientInvalidatesCacheOnNetworkError(t *testing.T) { + originalCallCount := 0 + var finalServer *httptest.Server + // Create redirect server + redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + originalCallCount++ + if finalServer != nil { + http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + })) + defer redirectServer.Close() + + // Create final destination server + finalServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + cfg := &config{ + endpoint: redirectServer.URL, + authBearerToken: "test-token", + timeoutSeconds: 5, + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + // First request - establish cache + err = client.sendMessage(newWebhookMessage("/test/path1", message)) + if err != nil { + t.Fatalf("Failed to send first message: %v", err) + } + + if originalCallCount != 1 { + t.Errorf("Expected 1 original call, got %d", originalCallCount) + } + + // Close final server to simulate network error + cachedURL := finalServer.URL + finalServer.Close() + finalServer = nil + + // Second request - cached URL is down, should invalidate and retry with original + err = client.sendMessage(newWebhookMessage("/test/path2", message)) + if err == nil { + t.Error("Expected error when network fails") + } + + if originalCallCount != 2 { + t.Errorf("Expected 2 original calls (retry after cache invalidation), got %d", originalCallCount) + } + + // Verify cache was cleared + client.endpointMu.RLock() + clearedCache := client.finalURL + client.endpointMu.RUnlock() + if clearedCache == cachedURL { + t.Errorf("Expected cache to be invalidated, but still has %s", clearedCache) + } +} diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go index e034e9537..2ac9db3eb 100644 --- a/weed/notification/webhook/webhook_queue.go +++ b/weed/notification/webhook/webhook_queue.go @@ -31,6 +31,9 @@ type Queue struct { ctx context.Context cancel context.CancelFunc + + // Semaphore for controlling concurrent webhook requests + sem chan struct{} } func (w *Queue) GetName() string { @@ -89,6 +92,9 @@ func (w *Queue) initialize(cfg *config) error { w.config = cfg w.filter = newFilter(cfg) + // Initialize semaphore for controlling concurrent webhook requests + w.sem = make(chan struct{}, cfg.nWorkers) + hClient, err := newHTTPClient(cfg) if err != nil { return fmt.Errorf("failed to create webhook http client: %w", err) @@ -141,14 +147,17 @@ func (w *Queue) setupWatermillQueue(cfg *config) error { router.AddPlugin(plugin.SignalsHandler) router.AddMiddleware(retryMiddleware, poisonQueue) - for i := 0; i < cfg.nWorkers; i++ { - router.AddNoPublisherHandler( - pubSubHandlerNameTemplate(i), - pubSubTopicName, - w.queueChannel, - w.handleWebhook, - ) - } + // Add a single handler to avoid duplicate message delivery. + // With gochannel's default behavior, each handler call creates + // a separate subscription, and all subscriptions receive their own copy of each message. + // Using a single handler ensures each webhook is sent only once. + // Concurrency is controlled via semaphore in handleWebhook based on nWorkers config. + router.AddConsumerHandler( + "webhook_handler", + pubSubTopicName, + w.queueChannel, + w.handleWebhook, + ) go func() { // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already @@ -165,6 +174,10 @@ func (w *Queue) setupWatermillQueue(cfg *config) error { } func (w *Queue) handleWebhook(msg *message.Message) error { + // Acquire semaphore slot (blocks if at capacity) + w.sem <- struct{}{} + defer func() { <-w.sem }() + var n filer_pb.EventNotification if err := proto.Unmarshal(msg.Payload, &n); err != nil { glog.Errorf("failed to unmarshal protobuf message: %v", err) diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go index 52a290149..a084ef975 100644 --- a/weed/notification/webhook/webhook_queue_test.go +++ b/weed/notification/webhook/webhook_queue_test.go @@ -273,6 +273,7 @@ func TestQueueHandleWebhook(t *testing.T) { client, _ := newHTTPClient(cfg) q := &Queue{ client: client, + sem: make(chan struct{}, cfg.nWorkers), } message := newWebhookMessage("/test/path", &filer_pb.EventNotification{ @@ -386,6 +387,78 @@ func TestQueueRetryMechanism(t *testing.T) { } } +// TestQueueNoDuplicateWebhooks verifies that webhooks are sent only once regardless of worker count +func TestQueueNoDuplicateWebhooks(t *testing.T) { + tests := []struct { + name string + nWorkers int + expected int + }{ + {"1 worker", 1, 1}, + {"5 workers", 5, 1}, + {"10 workers", 10, 1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + callCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "test-token", + timeoutSeconds: 5, + maxRetries: 0, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: tt.nWorkers, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + // Wait for router and subscriber to be fully ready + time.Sleep(200 * time.Millisecond) + + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + err = q.SendMessage("/test/path", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } + + // Wait for message processing + time.Sleep(500 * time.Millisecond) + + if callCount != tt.expected { + t.Errorf("Expected %d webhook call(s), got %d with %d workers", tt.expected, callCount, tt.nWorkers) + } + }) + } +} + func TestQueueSendMessageWithFilter(t *testing.T) { tests := []struct { name string