Browse Source
[Notifications] Support webhook notifications (#6962)
[Notifications] Support webhook notifications (#6962)
Add webhook notification supportpull/6967/head
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 275 additions and 0 deletions
-
57weed/notification/webhook/http.go
-
146weed/notification/webhook/http_test.go
-
71weed/notification/webhook/webhook_queue.go
-
1weed/server/filer_server.go
@ -0,0 +1,57 @@ |
|||
package webhook |
|||
|
|||
import ( |
|||
"bytes" |
|||
"encoding/json" |
|||
"fmt" |
|||
"net/http" |
|||
|
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
"google.golang.org/protobuf/proto" |
|||
) |
|||
|
|||
type httpClient struct { |
|||
endpoint string |
|||
token string |
|||
} |
|||
|
|||
func newHTTPClient(cfg *config) (*httpClient, error) { |
|||
return &httpClient{ |
|||
endpoint: cfg.endpoint, |
|||
token: cfg.authBearerToken, |
|||
}, nil |
|||
} |
|||
|
|||
func (h *httpClient) sendMessage(key string, message proto.Message) error { |
|||
payload := map[string]interface{}{ |
|||
"key": key, |
|||
"message": message, |
|||
} |
|||
|
|||
jsonData, err := json.Marshal(payload) |
|||
if err != nil { |
|||
return fmt.Errorf("failed to marshal message: %v", err) |
|||
} |
|||
|
|||
req, err := http.NewRequest(http.MethodPost, h.endpoint, bytes.NewBuffer(jsonData)) |
|||
if err != nil { |
|||
return fmt.Errorf("failed to create request: %v", err) |
|||
} |
|||
|
|||
req.Header.Set("Content-Type", "application/json") |
|||
if h.token != "" { |
|||
req.Header.Set("Authorization", "Bearer "+h.token) |
|||
} |
|||
|
|||
resp, err := util_http.Do(req) |
|||
if err != nil { |
|||
return fmt.Errorf("failed to send request: %v", err) |
|||
} |
|||
defer resp.Body.Close() |
|||
|
|||
if resp.StatusCode < 200 || resp.StatusCode >= 300 { |
|||
return fmt.Errorf("webhook returned status code: %d", resp.StatusCode) |
|||
} |
|||
|
|||
return nil |
|||
} |
@ -0,0 +1,146 @@ |
|||
package webhook |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"io" |
|||
"net/http" |
|||
"net/http/httptest" |
|||
"testing" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
) |
|||
|
|||
func init() { |
|||
util_http.InitGlobalHttpClient() |
|||
} |
|||
|
|||
func TestHttpClientSendMessage(t *testing.T) { |
|||
var receivedPayload map[string]interface{} |
|||
var receivedHeaders http.Header |
|||
|
|||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
|||
receivedHeaders = r.Header |
|||
body, _ := io.ReadAll(r.Body) |
|||
if err := json.Unmarshal(body, &receivedPayload); err != nil { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
return |
|||
} |
|||
w.WriteHeader(http.StatusOK) |
|||
})) |
|||
defer server.Close() |
|||
|
|||
cfg := &config{ |
|||
endpoint: server.URL, |
|||
authBearerToken: "test-token", |
|||
} |
|||
|
|||
client, err := newHTTPClient(cfg) |
|||
if err != nil { |
|||
t.Fatalf("Failed to create HTTP client: %v", err) |
|||
} |
|||
|
|||
message := &filer_pb.EventNotification{ |
|||
OldEntry: nil, |
|||
NewEntry: &filer_pb.Entry{ |
|||
Name: "test.txt", |
|||
IsDirectory: false, |
|||
}, |
|||
} |
|||
|
|||
err = client.sendMessage("/test/path", message) |
|||
if err != nil { |
|||
t.Fatalf("Failed to send message: %v", err) |
|||
} |
|||
|
|||
if receivedPayload["key"] != "/test/path" { |
|||
t.Errorf("Expected key '/test/path', got %v", receivedPayload["key"]) |
|||
} |
|||
|
|||
if receivedPayload["message"] == nil { |
|||
t.Error("Expected message to be present") |
|||
} |
|||
|
|||
if receivedHeaders.Get("Content-Type") != "application/json" { |
|||
t.Errorf("Expected Content-Type 'application/json', got %s", receivedHeaders.Get("Content-Type")) |
|||
} |
|||
|
|||
expectedAuth := "Bearer test-token" |
|||
if receivedHeaders.Get("Authorization") != expectedAuth { |
|||
t.Errorf("Expected Authorization '%s', got %s", expectedAuth, receivedHeaders.Get("Authorization")) |
|||
} |
|||
} |
|||
|
|||
func TestHttpClientSendMessageWithoutToken(t *testing.T) { |
|||
var receivedHeaders http.Header |
|||
|
|||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
|||
receivedHeaders = r.Header |
|||
w.WriteHeader(http.StatusOK) |
|||
})) |
|||
defer server.Close() |
|||
|
|||
cfg := &config{ |
|||
endpoint: server.URL, |
|||
authBearerToken: "", |
|||
} |
|||
|
|||
client, err := newHTTPClient(cfg) |
|||
if err != nil { |
|||
t.Fatalf("Failed to create HTTP client: %v", err) |
|||
} |
|||
|
|||
message := &filer_pb.EventNotification{} |
|||
|
|||
err = client.sendMessage("/test/path", message) |
|||
if err != nil { |
|||
t.Fatalf("Failed to send message: %v", err) |
|||
} |
|||
|
|||
if receivedHeaders.Get("Authorization") != "" { |
|||
t.Errorf("Expected no Authorization header, got %s", receivedHeaders.Get("Authorization")) |
|||
} |
|||
} |
|||
|
|||
func TestHttpClientSendMessageServerError(t *testing.T) { |
|||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
})) |
|||
defer server.Close() |
|||
|
|||
cfg := &config{ |
|||
endpoint: server.URL, |
|||
authBearerToken: "test-token", |
|||
} |
|||
|
|||
client, err := newHTTPClient(cfg) |
|||
if err != nil { |
|||
t.Fatalf("Failed to create HTTP client: %v", err) |
|||
} |
|||
|
|||
message := &filer_pb.EventNotification{} |
|||
|
|||
err = client.sendMessage("/test/path", message) |
|||
if err == nil { |
|||
t.Error("Expected error for server error response") |
|||
} |
|||
} |
|||
|
|||
func TestHttpClientSendMessageNetworkError(t *testing.T) { |
|||
cfg := &config{ |
|||
endpoint: "http://localhost:99999", |
|||
authBearerToken: "", |
|||
} |
|||
|
|||
client, err := newHTTPClient(cfg) |
|||
if err != nil { |
|||
t.Fatalf("Failed to create HTTP client: %v", err) |
|||
} |
|||
|
|||
message := &filer_pb.EventNotification{} |
|||
|
|||
err = client.sendMessage("/test/path", message) |
|||
if err == nil { |
|||
t.Error("Expected error for network failure") |
|||
} |
|||
} |
@ -0,0 +1,71 @@ |
|||
package webhook |
|||
|
|||
import ( |
|||
"fmt" |
|||
"net/url" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/notification" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
"google.golang.org/protobuf/proto" |
|||
) |
|||
|
|||
// client defines the interface for transport client
|
|||
// could be extended to support gRPC
|
|||
type client interface { |
|||
sendMessage(key string, message proto.Message) error |
|||
} |
|||
|
|||
func init() { |
|||
notification.MessageQueues = append(notification.MessageQueues, &WebhookQueue{}) |
|||
} |
|||
|
|||
type WebhookQueue struct { |
|||
client client |
|||
} |
|||
|
|||
type config struct { |
|||
endpoint string |
|||
authBearerToken string |
|||
} |
|||
|
|||
func (c *config) validate() error { |
|||
_, err := url.Parse(c.endpoint) |
|||
if err != nil { |
|||
return fmt.Errorf("invalid webhook endpoint %w", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (w *WebhookQueue) GetName() string { |
|||
return "webhook" |
|||
} |
|||
|
|||
func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix string) error { |
|||
c := &config{ |
|||
endpoint: configuration.GetString(prefix + "endpoint"), |
|||
authBearerToken: configuration.GetString(prefix + "bearer_token"), |
|||
} |
|||
|
|||
if err := c.validate(); err != nil { |
|||
return err |
|||
} |
|||
|
|||
return w.initialize(c) |
|||
} |
|||
|
|||
func (w *WebhookQueue) initialize(cfg *config) error { |
|||
client, err := newHTTPClient(cfg) |
|||
if err != nil { |
|||
return fmt.Errorf("failed to create webhook client: %v", err) |
|||
} |
|||
w.client = client |
|||
return nil |
|||
} |
|||
|
|||
func (w *WebhookQueue) SendMessage(key string, message proto.Message) error { |
|||
if w.client == nil { |
|||
return fmt.Errorf("webhook client not initialized") |
|||
} |
|||
return w.client.sendMessage(key, message) |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue