diff --git a/weed/notification/webhook/http.go b/weed/notification/webhook/http.go new file mode 100644 index 000000000..13b7f30d9 --- /dev/null +++ b/weed/notification/webhook/http.go @@ -0,0 +1,57 @@ +package webhook + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "google.golang.org/protobuf/proto" +) + +type httpClient struct { + endpoint string + token string +} + +func newHTTPClient(cfg *config) (*httpClient, error) { + return &httpClient{ + endpoint: cfg.endpoint, + token: cfg.authBearerToken, + }, nil +} + +func (h *httpClient) sendMessage(key string, message proto.Message) error { + payload := map[string]interface{}{ + "key": key, + "message": message, + } + + jsonData, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal message: %v", err) + } + + req, err := http.NewRequest(http.MethodPost, h.endpoint, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + if h.token != "" { + req.Header.Set("Authorization", "Bearer "+h.token) + } + + resp, err := util_http.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("webhook returned status code: %d", resp.StatusCode) + } + + return nil +} diff --git a/weed/notification/webhook/http_test.go b/weed/notification/webhook/http_test.go new file mode 100644 index 000000000..5a008d2a5 --- /dev/null +++ b/weed/notification/webhook/http_test.go @@ -0,0 +1,146 @@ +package webhook + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" +) + +func init() { + util_http.InitGlobalHttpClient() +} + +func TestHttpClientSendMessage(t *testing.T) { + var receivedPayload map[string]interface{} + var receivedHeaders http.Header + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedHeaders = r.Header + body, _ := io.ReadAll(r.Body) + if err := json.Unmarshal(body, &receivedPayload); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "test-token", + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{ + OldEntry: nil, + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + IsDirectory: false, + }, + } + + err = client.sendMessage("/test/path", message) + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + + if receivedPayload["key"] != "/test/path" { + t.Errorf("Expected key '/test/path', got %v", receivedPayload["key"]) + } + + if receivedPayload["message"] == nil { + t.Error("Expected message to be present") + } + + if receivedHeaders.Get("Content-Type") != "application/json" { + t.Errorf("Expected Content-Type 'application/json', got %s", receivedHeaders.Get("Content-Type")) + } + + expectedAuth := "Bearer test-token" + if receivedHeaders.Get("Authorization") != expectedAuth { + t.Errorf("Expected Authorization '%s', got %s", expectedAuth, receivedHeaders.Get("Authorization")) + } +} + +func TestHttpClientSendMessageWithoutToken(t *testing.T) { + var receivedHeaders http.Header + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedHeaders = r.Header + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "", + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{} + + err = client.sendMessage("/test/path", message) + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + + if receivedHeaders.Get("Authorization") != "" { + t.Errorf("Expected no Authorization header, got %s", receivedHeaders.Get("Authorization")) + } +} + +func TestHttpClientSendMessageServerError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "test-token", + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{} + + err = client.sendMessage("/test/path", message) + if err == nil { + t.Error("Expected error for server error response") + } +} + +func TestHttpClientSendMessageNetworkError(t *testing.T) { + cfg := &config{ + endpoint: "http://localhost:99999", + authBearerToken: "", + } + + client, err := newHTTPClient(cfg) + if err != nil { + t.Fatalf("Failed to create HTTP client: %v", err) + } + + message := &filer_pb.EventNotification{} + + err = client.sendMessage("/test/path", message) + if err == nil { + t.Error("Expected error for network failure") + } +} diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go new file mode 100644 index 000000000..d209b74e2 --- /dev/null +++ b/weed/notification/webhook/webhook_queue.go @@ -0,0 +1,71 @@ +package webhook + +import ( + "fmt" + "net/url" + + "github.com/seaweedfs/seaweedfs/weed/notification" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/protobuf/proto" +) + +// client defines the interface for transport client +// could be extended to support gRPC +type client interface { + sendMessage(key string, message proto.Message) error +} + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &WebhookQueue{}) +} + +type WebhookQueue struct { + client client +} + +type config struct { + endpoint string + authBearerToken string +} + +func (c *config) validate() error { + _, err := url.Parse(c.endpoint) + if err != nil { + return fmt.Errorf("invalid webhook endpoint %w", err) + } + + return nil +} + +func (w *WebhookQueue) GetName() string { + return "webhook" +} + +func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix string) error { + c := &config{ + endpoint: configuration.GetString(prefix + "endpoint"), + authBearerToken: configuration.GetString(prefix + "bearer_token"), + } + + if err := c.validate(); err != nil { + return err + } + + return w.initialize(c) +} + +func (w *WebhookQueue) initialize(cfg *config) error { + client, err := newHTTPClient(cfg) + if err != nil { + return fmt.Errorf("failed to create webhook client: %v", err) + } + w.client = client + return nil +} + +func (w *WebhookQueue) SendMessage(key string, message proto.Message) error { + if w.client == nil { + return fmt.Errorf("webhook client not initialized") + } + return w.client.sendMessage(key, message) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 7b5470684..f395f6d60 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -50,6 +50,7 @@ import ( _ "github.com/seaweedfs/seaweedfs/weed/notification/google_pub_sub" _ "github.com/seaweedfs/seaweedfs/weed/notification/kafka" _ "github.com/seaweedfs/seaweedfs/weed/notification/log" + _ "github.com/seaweedfs/seaweedfs/weed/notification/webhook" "github.com/seaweedfs/seaweedfs/weed/security" )