diff --git a/go.mod b/go.mod index 179ea2395..ef46a358a 100644 --- a/go.mod +++ b/go.mod @@ -123,6 +123,7 @@ require ( require ( github.com/Jille/raft-grpc-transport v1.6.1 + github.com/ThreeDotsLabs/watermill v1.4.7 github.com/a-h/templ v0.3.906 github.com/arangodb/go-driver v1.6.6 github.com/armon/go-metrics v0.4.1 @@ -162,6 +163,12 @@ require ( require github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect +require ( + github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect +) + require ( cel.dev/expr v0.23.0 // indirect cloud.google.com/go/auth v0.16.2 // indirect diff --git a/go.sum b/go.sum index 508210f47..73ce2762f 100644 --- a/go.sum +++ b/go.sum @@ -622,6 +622,8 @@ github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0= +github.com/ThreeDotsLabs/watermill v1.4.7 h1:LiF4wMP400/psRTdHL/IcV1YIv9htHYFggbe2d6cLeI= +github.com/ThreeDotsLabs/watermill v1.4.7/go.mod h1:Ks20MyglVnqjpha1qq0kjaQ+J9ay7bdnjszQ4cW9FMU= github.com/a-h/templ v0.3.906 h1:ZUThc8Q9n04UATaCwaG60pB1AqbulLmYEAMnWV63svg= github.com/a-h/templ v0.3.906/go.mod h1:FFAu4dI//ESmEN7PQkJ7E7QfnSEMdcnu7QrAY8Dn334= github.com/aalpar/deheap v0.0.0-20210914013432-0cc84d79dec3 h1:hhdWprfSpFbN7lz3W1gM40vOgvSh1WCSMxYD6gGB4Hs= @@ -732,6 +734,8 @@ github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCN github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/calebcase/tmpfile v1.0.3 h1:BZrOWZ79gJqQ3XbAQlihYZf/YCV0H4KPIdM5K5oMpJo= github.com/calebcase/tmpfile v1.0.3/go.mod h1:UAUc01aHeC+pudPagY/lWvt2qS9ZO5Zzof6/tIUzqeI= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -1291,6 +1295,8 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linxGnu/grocksdb v1.10.1 h1:YX6gUcKvSC3d0s9DaqgbU+CRkZHzlELgHu1Z/kmtslg= github.com/linxGnu/grocksdb v1.10.1/go.mod h1:C3CNe9UYc9hlEM2pC82AqiGS3LRW537u9LFV4wIZuHk= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/lpar/date v1.0.0 h1:bq/zVqFTUmsxvd/CylidY4Udqpr9BOFrParoP6p0x/I= github.com/lpar/date v1.0.0/go.mod h1:KjYe0dDyMQTgpqcUz4LEIeM5VZwhggjVx/V2dtc8NSo= github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35 h1:PpXWgLPs+Fqr325bN2FD2ISlRRztXibcX6e8f5FR5Dc= diff --git a/weed/notification/webhook/filter.go b/weed/notification/webhook/filter.go new file mode 100644 index 000000000..f346d6c93 --- /dev/null +++ b/weed/notification/webhook/filter.go @@ -0,0 +1,64 @@ +package webhook + +import ( + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +type filter struct { + eventTypes map[eventType]bool + pathPrefixes []string +} + +func newFilter(cfg *config) *filter { + f := &filter{ + eventTypes: make(map[eventType]bool), + pathPrefixes: cfg.pathPrefixes, + } + + if len(cfg.eventTypes) == 0 { + f.eventTypes[eventTypeCreate] = true + f.eventTypes[eventTypeDelete] = true + f.eventTypes[eventTypeUpdate] = true + f.eventTypes[eventTypeRename] = true + } else { + for _, et := range cfg.eventTypes { + t := eventType(et) + if !t.valid() { + glog.Warningf("invalid event type: %v", t) + + continue + } + + f.eventTypes[t] = true + } + } + + return f +} + +func (f *filter) shouldPublish(key string, notification *filer_pb.EventNotification) bool { + if !f.matchesPath(key) { + return false + } + + eventType := detectEventType(notification) + + return f.eventTypes[eventType] +} + +func (f *filter) matchesPath(key string) bool { + if len(f.pathPrefixes) == 0 { + return true + } + + for _, prefix := range f.pathPrefixes { + if strings.HasPrefix(key, prefix) { + return true + } + } + + return false +} diff --git a/weed/notification/webhook/filter_test.go b/weed/notification/webhook/filter_test.go new file mode 100644 index 000000000..e95a085fe --- /dev/null +++ b/weed/notification/webhook/filter_test.go @@ -0,0 +1,225 @@ +package webhook + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +func TestFilterEventTypes(t *testing.T) { + tests := []struct { + name string + eventTypes []string + notification *filer_pb.EventNotification + expectedType eventType + shouldPublish bool + }{ + { + name: "create event - allowed", + eventTypes: []string{"create", "delete"}, + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + }, + expectedType: eventTypeCreate, + shouldPublish: true, + }, + { + name: "create event - not allowed", + eventTypes: []string{"delete", "update"}, + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + }, + expectedType: eventTypeCreate, + shouldPublish: false, + }, + { + name: "delete event - allowed", + eventTypes: []string{"create", "delete"}, + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "test.txt"}, + }, + expectedType: eventTypeDelete, + shouldPublish: true, + }, + { + name: "update event - allowed", + eventTypes: []string{"update"}, + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "test.txt"}, + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + }, + expectedType: eventTypeUpdate, + shouldPublish: true, + }, + { + name: "rename event - allowed", + eventTypes: []string{"rename"}, + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old.txt"}, + NewEntry: &filer_pb.Entry{Name: "new.txt"}, + NewParentPath: "/new/path", + }, + expectedType: eventTypeRename, + shouldPublish: true, + }, + { + name: "rename event - not allowed", + eventTypes: []string{"create", "delete", "update"}, + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old.txt"}, + NewEntry: &filer_pb.Entry{Name: "new.txt"}, + NewParentPath: "/new/path", + }, + expectedType: eventTypeRename, + shouldPublish: false, + }, + { + name: "all events allowed when empty", + eventTypes: []string{}, + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + }, + expectedType: eventTypeCreate, + shouldPublish: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &config{eventTypes: tt.eventTypes} + f := newFilter(cfg) + + eventType := detectEventType(tt.notification) + if eventType != tt.expectedType { + t.Errorf("detectEventType() = %v, want %v", eventType, tt.expectedType) + } + + shouldPublish := f.shouldPublish("/test/path", tt.notification) + if shouldPublish != tt.shouldPublish { + t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish) + } + }) + } +} + +func TestFilterPathPrefixes(t *testing.T) { + tests := []struct { + name string + pathPrefixes []string + key string + shouldPublish bool + }{ + { + name: "matches single prefix", + pathPrefixes: []string{"/data/"}, + key: "/data/file.txt", + shouldPublish: true, + }, + { + name: "matches one of multiple prefixes", + pathPrefixes: []string{"/data/", "/logs/", "/tmp/"}, + key: "/logs/app.log", + shouldPublish: true, + }, + { + name: "no match", + pathPrefixes: []string{"/data/", "/logs/"}, + key: "/other/file.txt", + shouldPublish: false, + }, + { + name: "empty prefixes allows all", + pathPrefixes: []string{}, + key: "/any/path/file.txt", + shouldPublish: true, + }, + { + name: "exact prefix match", + pathPrefixes: []string{"/data"}, + key: "/data", + shouldPublish: true, + }, + { + name: "partial match not allowed", + pathPrefixes: []string{"/data/"}, + key: "/database/file.txt", + shouldPublish: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &config{ + pathPrefixes: tt.pathPrefixes, + eventTypes: []string{"create"}, + } + f := newFilter(cfg) + + notification := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + } + + shouldPublish := f.shouldPublish(tt.key, notification) + if shouldPublish != tt.shouldPublish { + t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish) + } + }) + } +} + +func TestFilterCombined(t *testing.T) { + cfg := &config{ + eventTypes: []string{"create", "update"}, + pathPrefixes: []string{"/data/", "/logs/"}, + } + f := newFilter(cfg) + + tests := []struct { + name string + key string + notification *filer_pb.EventNotification + shouldPublish bool + }{ + { + name: "allowed event and path", + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: true, + }, + { + name: "allowed event but wrong path", + key: "/other/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "wrong event but allowed path", + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "wrong event and wrong path", + key: "/other/file.txt", + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + shouldPublish := f.shouldPublish(tt.key, tt.notification) + if shouldPublish != tt.shouldPublish { + t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish) + } + }) + } +} diff --git a/weed/notification/webhook/http.go b/weed/notification/webhook/http.go index 13b7f30d9..bb6a11a09 100644 --- a/weed/notification/webhook/http.go +++ b/weed/notification/webhook/http.go @@ -2,30 +2,43 @@ package webhook import ( "bytes" + "context" "encoding/json" + "errors" "fmt" + "io" "net/http" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" - "google.golang.org/protobuf/proto" ) type httpClient struct { endpoint string token string + timeout time.Duration } func newHTTPClient(cfg *config) (*httpClient, error) { return &httpClient{ endpoint: cfg.endpoint, token: cfg.authBearerToken, + timeout: time.Duration(cfg.timeoutSeconds) * time.Second, }, nil } -func (h *httpClient) sendMessage(key string, message proto.Message) error { +func (h *httpClient) sendMessage(message *webhookMessage) error { + // Serialize the protobuf message to JSON for HTTP payload + notificationData, err := json.Marshal(message.Notification) + if err != nil { + return fmt.Errorf("failed to marshal notification: %v", err) + } + payload := map[string]interface{}{ - "key": key, - "message": message, + "key": message.Key, + "event_type": message.EventType, + "message": json.RawMessage(notificationData), } jsonData, err := json.Marshal(payload) @@ -43,8 +56,18 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error { req.Header.Set("Authorization", "Bearer "+h.token) } + if h.timeout > 0 { + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() + req = req.WithContext(ctx) + } + resp, err := util_http.Do(req) if err != nil { + if err = drainResponse(resp); err != nil { + glog.Errorf("failed to drain response: %v", err) + } + return fmt.Errorf("failed to send request: %v", err) } defer resp.Body.Close() @@ -55,3 +78,16 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error { return nil } + +func drainResponse(resp *http.Response) error { + if resp == nil || resp.Body == nil { + return nil + } + + _, err := io.ReadAll(resp.Body) + + return errors.Join( + err, + resp.Body.Close(), + ) +} diff --git a/weed/notification/webhook/http_test.go b/weed/notification/webhook/http_test.go index 5a008d2a5..f7ef006ae 100644 --- a/weed/notification/webhook/http_test.go +++ b/weed/notification/webhook/http_test.go @@ -48,7 +48,7 @@ func TestHttpClientSendMessage(t *testing.T) { }, } - err = client.sendMessage("/test/path", message) + err = client.sendMessage(newWebhookMessage("/test/path", message)) if err != nil { t.Fatalf("Failed to send message: %v", err) } @@ -57,6 +57,10 @@ func TestHttpClientSendMessage(t *testing.T) { t.Errorf("Expected key '/test/path', got %v", receivedPayload["key"]) } + if receivedPayload["event_type"] != "create" { + t.Errorf("Expected event_type 'create', got %v", receivedPayload["event_type"]) + } + if receivedPayload["message"] == nil { t.Error("Expected message to be present") } @@ -92,7 +96,7 @@ func TestHttpClientSendMessageWithoutToken(t *testing.T) { message := &filer_pb.EventNotification{} - err = client.sendMessage("/test/path", message) + err = client.sendMessage(newWebhookMessage("/test/path", message)) if err != nil { t.Fatalf("Failed to send message: %v", err) } @@ -120,7 +124,7 @@ func TestHttpClientSendMessageServerError(t *testing.T) { message := &filer_pb.EventNotification{} - err = client.sendMessage("/test/path", message) + err = client.sendMessage(newWebhookMessage("/test/path", message)) if err == nil { t.Error("Expected error for server error response") } @@ -139,7 +143,7 @@ func TestHttpClientSendMessageNetworkError(t *testing.T) { message := &filer_pb.EventNotification{} - err = client.sendMessage("/test/path", message) + err = client.sendMessage(newWebhookMessage("/test/path", message)) if err == nil { t.Error("Expected error for network failure") } diff --git a/weed/notification/webhook/types.go b/weed/notification/webhook/types.go new file mode 100644 index 000000000..5cd79c7da --- /dev/null +++ b/weed/notification/webhook/types.go @@ -0,0 +1,182 @@ +package webhook + +import ( + "fmt" + "net/url" + "slices" + "strconv" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/protobuf/proto" +) + +const ( + queueName = "webhook" + pubSubTopicName = "webhook_topic" + deadLetterTopic = "webhook_dead_letter" +) + +type eventType string + +const ( + eventTypeCreate eventType = "create" + eventTypeDelete eventType = "delete" + eventTypeUpdate eventType = "update" + eventTypeRename eventType = "rename" +) + +func (e eventType) valid() bool { + return slices.Contains([]eventType{ + eventTypeCreate, + eventTypeDelete, + eventTypeUpdate, + eventTypeRename, + }, + e, + ) +} + +var ( + pubSubHandlerNameTemplate = func(n int) string { + return "webhook_handler_" + strconv.Itoa(n) + } +) + +type client interface { + sendMessage(message *webhookMessage) error +} + +type webhookMessage struct { + Key string `json:"key"` + EventType string `json:"event_type"` + Notification *filer_pb.EventNotification `json:"message_data"` +} + +func newWebhookMessage(key string, message proto.Message) *webhookMessage { + notification, ok := message.(*filer_pb.EventNotification) + if !ok { + return nil + } + + eventType := string(detectEventType(notification)) + + return &webhookMessage{ + Key: key, + EventType: eventType, + Notification: notification, + } +} + +type config struct { + endpoint string + authBearerToken string + timeoutSeconds int + + maxRetries int + backoffSeconds int + maxBackoffSeconds int + nWorkers int + bufferSize int + + eventTypes []string + pathPrefixes []string +} + +func newConfigWithDefaults(configuration util.Configuration, prefix string) *config { + c := &config{ + endpoint: configuration.GetString(prefix + "endpoint"), + authBearerToken: configuration.GetString(prefix + "bearer_token"), + timeoutSeconds: 10, + maxRetries: 3, + backoffSeconds: 3, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10_000, + } + + if bufferSize := configuration.GetInt(prefix + "buffer_size"); bufferSize > 0 { + c.bufferSize = bufferSize + } + if workers := configuration.GetInt(prefix + "workers"); workers > 0 { + c.nWorkers = workers + } + if maxRetries := configuration.GetInt(prefix + "max_retries"); maxRetries > 0 { + c.maxRetries = maxRetries + } + if backoffSeconds := configuration.GetInt(prefix + "backoff_seconds"); backoffSeconds > 0 { + c.backoffSeconds = backoffSeconds + } + if maxBackoffSeconds := configuration.GetInt(prefix + "max_backoff_seconds"); maxBackoffSeconds > 0 { + c.maxBackoffSeconds = maxBackoffSeconds + } + if timeout := configuration.GetInt(prefix + "timeout_seconds"); timeout > 0 { + c.timeoutSeconds = timeout + } + + c.eventTypes = configuration.GetStringSlice(prefix + "event_types") + c.pathPrefixes = configuration.GetStringSlice(prefix + "path_prefixes") + + return c +} + +func (c *config) validate() error { + if c.endpoint == "" { + return fmt.Errorf("webhook endpoint is required") + } + + _, err := url.Parse(c.endpoint) + if err != nil { + return fmt.Errorf("invalid webhook endpoint: %w", err) + } + + if c.timeoutSeconds < 1 || c.timeoutSeconds > 300 { + return fmt.Errorf("timeout must be between 1 and 300 seconds, got %d", c.timeoutSeconds) + } + + if c.maxRetries < 0 || c.maxRetries > 10 { + return fmt.Errorf("max retries must be between 0 and 10, got %d", c.maxRetries) + } + + if c.backoffSeconds < 1 || c.backoffSeconds > 60 { + return fmt.Errorf("backoff seconds must be between 1 and 60, got %d", c.backoffSeconds) + } + + if c.maxBackoffSeconds < c.backoffSeconds || c.maxBackoffSeconds > 300 { + return fmt.Errorf("max backoff seconds must be between %d and 300, got %d", c.backoffSeconds, c.maxBackoffSeconds) + } + + if c.nWorkers < 1 || c.nWorkers > 100 { + return fmt.Errorf("workers must be between 1 and 100, got %d", c.nWorkers) + } + + if c.bufferSize < 100 || c.bufferSize > 1_000_000 { + return fmt.Errorf("buffer size must be between 100 and 1,000,000, got %d", c.bufferSize) + } + + return nil +} + +func detectEventType(notification *filer_pb.EventNotification) eventType { + hasOldEntry := notification.OldEntry != nil + hasNewEntry := notification.NewEntry != nil + hasNewParentPath := notification.NewParentPath != "" + + if !hasOldEntry && hasNewEntry { + return eventTypeCreate + } + + if hasOldEntry && !hasNewEntry { + return eventTypeDelete + } + + if hasOldEntry && hasNewEntry { + if hasNewParentPath { + return eventTypeRename + } + + return eventTypeUpdate + } + + return eventTypeUpdate +} diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go index d209b74e2..d8f9a0734 100644 --- a/weed/notification/webhook/webhook_queue.go +++ b/weed/notification/webhook/webhook_queue.go @@ -1,52 +1,82 @@ package webhook import ( + "context" + "errors" "fmt" - "net/url" + "time" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/ThreeDotsLabs/watermill/message/router/plugin" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/notification" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/protobuf/proto" ) -// client defines the interface for transport client -// could be extended to support gRPC -type client interface { - sendMessage(key string, message proto.Message) error -} - func init() { - notification.MessageQueues = append(notification.MessageQueues, &WebhookQueue{}) + notification.MessageQueues = append(notification.MessageQueues, &Queue{}) } -type WebhookQueue struct { - client client +type Queue struct { + router *message.Router + queueChannel *gochannel.GoChannel + config *config + client client + filter *filter + + ctx context.Context + cancel context.CancelFunc } -type config struct { - endpoint string - authBearerToken string +func (w *Queue) GetName() string { + return queueName } -func (c *config) validate() error { - _, err := url.Parse(c.endpoint) +func (w *Queue) SendMessage(key string, msg proto.Message) error { + eventNotification, ok := msg.(*filer_pb.EventNotification) + if !ok { + return nil + } + + if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) { + return nil + } + + m := newWebhookMessage(key, msg) + if m == nil { + return nil + } + + wMsg, err := m.toWaterMillMessage() if err != nil { - return fmt.Errorf("invalid webhook endpoint %w", err) + return err } - return nil + return w.queueChannel.Publish(pubSubTopicName, wMsg) } -func (w *WebhookQueue) GetName() string { - return "webhook" -} - -func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix string) error { - c := &config{ - endpoint: configuration.GetString(prefix + "endpoint"), - authBearerToken: configuration.GetString(prefix + "bearer_token"), +func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) { + payload, err := proto.Marshal(w.Notification) + if err != nil { + return nil, err } + msg := message.NewMessage(watermill.NewUUID(), payload) + // Set event type and key as metadata + msg.Metadata.Set("event_type", w.EventType) + msg.Metadata.Set("key", w.Key) + + return msg, nil +} + +func (w *Queue) Initialize(configuration util.Configuration, prefix string) error { + c := newConfigWithDefaults(configuration, prefix) + if err := c.validate(); err != nil { return err } @@ -54,18 +84,124 @@ func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix strin return w.initialize(c) } -func (w *WebhookQueue) initialize(cfg *config) error { - client, err := newHTTPClient(cfg) +func (w *Queue) initialize(cfg *config) error { + w.ctx, w.cancel = context.WithCancel(context.Background()) + w.config = cfg + w.filter = newFilter(cfg) + + hClient, err := newHTTPClient(cfg) if err != nil { - return fmt.Errorf("failed to create webhook client: %v", err) + return fmt.Errorf("failed to create webhook http client: %w", err) } - w.client = client + w.client = hClient + + if err = w.setupWatermillQueue(cfg); err != nil { + return fmt.Errorf("failed to setup watermill queue: %w", err) + } + if err = w.logDeadLetterMessages(); err != nil { + return err + } + return nil } -func (w *WebhookQueue) SendMessage(key string, message proto.Message) error { - if w.client == nil { - return fmt.Errorf("webhook client not initialized") +func (w *Queue) setupWatermillQueue(cfg *config) error { + logger := watermill.NewStdLogger(false, false) + pubSubConfig := gochannel.Config{ + OutputChannelBuffer: int64(cfg.bufferSize), + Persistent: false, } - return w.client.sendMessage(key, message) + w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger) + + router, err := message.NewRouter( + message.RouterConfig{ + CloseTimeout: 60 * time.Second, + }, + logger, + ) + if err != nil { + return fmt.Errorf("failed to create router: %v", err) + } + w.router = router + + retryMiddleware := middleware.Retry{ + MaxRetries: cfg.maxRetries, + InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second, + MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second, + Multiplier: 2.0, + RandomizationFactor: 0.3, + Logger: logger, + }.Middleware + + poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic) + if err != nil { + return fmt.Errorf("failed to create poison queue: %v", err) + } + + router.AddPlugin(plugin.SignalsHandler) + router.AddMiddleware(retryMiddleware, poisonQueue) + + for i := 0; i < cfg.nWorkers; i++ { + router.AddNoPublisherHandler( + pubSubHandlerNameTemplate(i), + pubSubTopicName, + w.queueChannel, + w.handleWebhook, + ) + } + + go func() { + // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already + defer w.cancel() + + if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) { + glog.Errorf("webhook pubsub worker stopped with error: %v", err) + } + + glog.Info("webhook pubsub worker stopped") + }() + + return nil +} + +func (w *Queue) handleWebhook(msg *message.Message) error { + var n filer_pb.EventNotification + if err := proto.Unmarshal(msg.Payload, &n); err != nil { + glog.Errorf("failed to unmarshal protobuf message: %v", err) + return err + } + + // Reconstruct webhook message from metadata and payload + webhookMsg := &webhookMessage{ + Key: msg.Metadata.Get("key"), + EventType: msg.Metadata.Get("event_type"), + Notification: &n, + } + + if err := w.client.sendMessage(webhookMsg); err != nil { + glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err) + return err + } + + return nil +} + +func (w *Queue) logDeadLetterMessages() error { + ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic) + if err != nil { + return err + } + + go func() { + for { + select { + case msg := <-ch: + glog.Errorf("received dead letter message: %s, key: %s", string(msg.Payload), msg.Metadata["key"]) + case <-w.ctx.Done(): + return + } + } + }() + + return nil } diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go new file mode 100644 index 000000000..52a290149 --- /dev/null +++ b/weed/notification/webhook/webhook_queue_test.go @@ -0,0 +1,536 @@ +package webhook + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "google.golang.org/protobuf/proto" +) + +func TestConfigValidation(t *testing.T) { + tests := []struct { + name string + config *config + wantErr bool + errMsg string + }{ + { + name: "valid config", + config: &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: false, + }, + { + name: "empty endpoint", + config: &config{ + endpoint: "", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "endpoint is required", + }, + { + name: "invalid URL", + config: &config{ + endpoint: "://invalid-url", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "invalid webhook endpoint", + }, + { + name: "timeout too large", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 301, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "timeout must be between", + }, + { + name: "too many retries", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 30, + maxRetries: 11, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "max retries must be between", + }, + { + name: "too many workers", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 101, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "workers must be between", + }, + { + name: "buffer too large", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 1000001, + }, + wantErr: true, + errMsg: "buffer size must be between", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.validate() + if (err != nil) != tt.wantErr { + t.Errorf("validate() error = %v, wantErr %v", err, tt.wantErr) + } + if err != nil && tt.errMsg != "" { + if err.Error() == "" || !strings.Contains(err.Error(), tt.errMsg) { + t.Errorf("validate() error message = %v, want to contain %v", err.Error(), tt.errMsg) + } + } + }) + } +} + +func TestWebhookMessageSerialization(t *testing.T) { + msg := &filer_pb.EventNotification{ + OldEntry: nil, + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + IsDirectory: false, + }, + } + + webhookMsg := newWebhookMessage("/test/path", msg) + + wmMsg, err := webhookMsg.toWaterMillMessage() + if err != nil { + t.Fatalf("Failed to convert to watermill message: %v", err) + } + + // Unmarshal the protobuf payload directly + var eventNotification filer_pb.EventNotification + err = proto.Unmarshal(wmMsg.Payload, &eventNotification) + if err != nil { + t.Fatalf("Failed to unmarshal protobuf message: %v", err) + } + + // Check metadata + if wmMsg.Metadata.Get("key") != "/test/path" { + t.Errorf("Expected key '/test/path', got %v", wmMsg.Metadata.Get("key")) + } + + if wmMsg.Metadata.Get("event_type") != "create" { + t.Errorf("Expected event type 'create', got %v", wmMsg.Metadata.Get("event_type")) + } + + if eventNotification.NewEntry.Name != "test.txt" { + t.Errorf("Expected file name 'test.txt', got %v", eventNotification.NewEntry.Name) + } +} + +func TestQueueInitialize(t *testing.T) { + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 10, + maxRetries: 3, + backoffSeconds: 3, + maxBackoffSeconds: 60, + nWorkers: 1, + bufferSize: 100, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Errorf("Initialize() error = %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + if q.router == nil { + t.Error("Expected router to be initialized") + } + if q.queueChannel == nil { + t.Error("Expected queueChannel to be initialized") + } + if q.client == nil { + t.Error("Expected client to be initialized") + } + if q.config == nil { + t.Error("Expected config to be initialized") + } +} + +// TestQueueSendMessage test sending messages to the queue +func TestQueueSendMessage(t *testing.T) { + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + err = q.SendMessage("/test/path", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } +} + +func TestQueueHandleWebhook(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 0, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + } + + client, _ := newHTTPClient(cfg) + q := &Queue{ + client: client, + } + + message := newWebhookMessage("/test/path", &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + }) + + wmMsg, err := message.toWaterMillMessage() + if err != nil { + t.Fatalf("Failed to create watermill message: %v", err) + } + + err = q.handleWebhook(wmMsg) + if err != nil { + t.Errorf("handleWebhook() error = %v", err) + } +} + +func TestQueueEndToEnd(t *testing.T) { + // Simplified test - just verify the queue can be created and message can be sent + // without needing full end-to-end processing + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 0, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + err = q.SendMessage("/test/path", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } +} + +func TestQueueRetryMechanism(t *testing.T) { + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 3, // Test that this config is used + backoffSeconds: 2, + maxBackoffSeconds: 10, + nWorkers: 1, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + // Verify that the queue is properly configured for retries + if q.config.maxRetries != 3 { + t.Errorf("Expected maxRetries=3, got %d", q.config.maxRetries) + } + + if q.config.backoffSeconds != 2 { + t.Errorf("Expected backoffSeconds=2, got %d", q.config.backoffSeconds) + } + + if q.config.maxBackoffSeconds != 10 { + t.Errorf("Expected maxBackoffSeconds=10, got %d", q.config.maxBackoffSeconds) + } + + // Test that we can send a message (retry behavior is handled by Watermill middleware) + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + } + + err = q.SendMessage("/test/retry", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } +} + +func TestQueueSendMessageWithFilter(t *testing.T) { + tests := []struct { + name string + cfg *config + key string + notification *filer_pb.EventNotification + shouldPublish bool + }{ + { + name: "allowed event type", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"create"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: true, + }, + { + name: "filtered event type", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"update", "rename"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "allowed path prefix", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + pathPrefixes: []string{"/data/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: true, + }, + { + name: "filtered path prefix", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + pathPrefixes: []string{"/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "combined filters - both pass", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"create", "delete"}, + pathPrefixes: []string{"/data/", "/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: true, + }, + { + name: "combined filters - event fails", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"update", "delete"}, + pathPrefixes: []string{"/data/", "/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "combined filters - path fails", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"create", "delete"}, + pathPrefixes: []string{"/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + shouldPublish := newFilter(tt.cfg).shouldPublish(tt.key, tt.notification) + if shouldPublish != tt.shouldPublish { + t.Errorf("Expected shouldPublish=%v, got %v", tt.shouldPublish, shouldPublish) + } + }) + } +}