[Notifications] Improving webhook notifications (#6965)

* worker setup

* fix tests

* start worker

* graceful worker drain

* retry queue

* migrate queue to watermill

* adding filters and improvements

* add the event type to the webhook message

* eliminating redundant JSON serialization

* resolve review comments

* trigger actions

* fix tests

* typo fixes

* read max_backoff_seconds from config

* add more context to the dead letter

* close the http response on errors

* drain the http response body in case not empty

* eliminate exported typesπ
This commit is contained in:
Ibrahim Konsowa
2025-07-15 21:49:37 +04:00
committed by GitHub
parent 74f4e9ba5a
commit d78aa3d2de
9 changed files with 1237 additions and 41 deletions

7
go.mod
View File

@@ -123,6 +123,7 @@ require (
require (
github.com/Jille/raft-grpc-transport v1.6.1
github.com/ThreeDotsLabs/watermill v1.4.7
github.com/a-h/templ v0.3.906
github.com/arangodb/go-driver v1.6.6
github.com/armon/go-metrics v0.4.1
@@ -162,6 +163,12 @@ require (
require github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
)
require (
cel.dev/expr v0.23.0 // indirect
cloud.google.com/go/auth v0.16.2 // indirect

6
go.sum
View File

@@ -622,6 +622,8 @@ github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A
github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0=
github.com/ThreeDotsLabs/watermill v1.4.7 h1:LiF4wMP400/psRTdHL/IcV1YIv9htHYFggbe2d6cLeI=
github.com/ThreeDotsLabs/watermill v1.4.7/go.mod h1:Ks20MyglVnqjpha1qq0kjaQ+J9ay7bdnjszQ4cW9FMU=
github.com/a-h/templ v0.3.906 h1:ZUThc8Q9n04UATaCwaG60pB1AqbulLmYEAMnWV63svg=
github.com/a-h/templ v0.3.906/go.mod h1:FFAu4dI//ESmEN7PQkJ7E7QfnSEMdcnu7QrAY8Dn334=
github.com/aalpar/deheap v0.0.0-20210914013432-0cc84d79dec3 h1:hhdWprfSpFbN7lz3W1gM40vOgvSh1WCSMxYD6gGB4Hs=
@@ -732,6 +734,8 @@ github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCN
github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
github.com/calebcase/tmpfile v1.0.3 h1:BZrOWZ79gJqQ3XbAQlihYZf/YCV0H4KPIdM5K5oMpJo=
github.com/calebcase/tmpfile v1.0.3/go.mod h1:UAUc01aHeC+pudPagY/lWvt2qS9ZO5Zzof6/tIUzqeI=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -1291,6 +1295,8 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linxGnu/grocksdb v1.10.1 h1:YX6gUcKvSC3d0s9DaqgbU+CRkZHzlELgHu1Z/kmtslg=
github.com/linxGnu/grocksdb v1.10.1/go.mod h1:C3CNe9UYc9hlEM2pC82AqiGS3LRW537u9LFV4wIZuHk=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/lpar/date v1.0.0 h1:bq/zVqFTUmsxvd/CylidY4Udqpr9BOFrParoP6p0x/I=
github.com/lpar/date v1.0.0/go.mod h1:KjYe0dDyMQTgpqcUz4LEIeM5VZwhggjVx/V2dtc8NSo=
github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35 h1:PpXWgLPs+Fqr325bN2FD2ISlRRztXibcX6e8f5FR5Dc=

View File

@@ -0,0 +1,64 @@
package webhook
import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
type filter struct {
eventTypes map[eventType]bool
pathPrefixes []string
}
func newFilter(cfg *config) *filter {
f := &filter{
eventTypes: make(map[eventType]bool),
pathPrefixes: cfg.pathPrefixes,
}
if len(cfg.eventTypes) == 0 {
f.eventTypes[eventTypeCreate] = true
f.eventTypes[eventTypeDelete] = true
f.eventTypes[eventTypeUpdate] = true
f.eventTypes[eventTypeRename] = true
} else {
for _, et := range cfg.eventTypes {
t := eventType(et)
if !t.valid() {
glog.Warningf("invalid event type: %v", t)
continue
}
f.eventTypes[t] = true
}
}
return f
}
func (f *filter) shouldPublish(key string, notification *filer_pb.EventNotification) bool {
if !f.matchesPath(key) {
return false
}
eventType := detectEventType(notification)
return f.eventTypes[eventType]
}
func (f *filter) matchesPath(key string) bool {
if len(f.pathPrefixes) == 0 {
return true
}
for _, prefix := range f.pathPrefixes {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
}

View File

@@ -0,0 +1,225 @@
package webhook
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
func TestFilterEventTypes(t *testing.T) {
tests := []struct {
name string
eventTypes []string
notification *filer_pb.EventNotification
expectedType eventType
shouldPublish bool
}{
{
name: "create event - allowed",
eventTypes: []string{"create", "delete"},
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "test.txt"},
},
expectedType: eventTypeCreate,
shouldPublish: true,
},
{
name: "create event - not allowed",
eventTypes: []string{"delete", "update"},
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "test.txt"},
},
expectedType: eventTypeCreate,
shouldPublish: false,
},
{
name: "delete event - allowed",
eventTypes: []string{"create", "delete"},
notification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: "test.txt"},
},
expectedType: eventTypeDelete,
shouldPublish: true,
},
{
name: "update event - allowed",
eventTypes: []string{"update"},
notification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: "test.txt"},
NewEntry: &filer_pb.Entry{Name: "test.txt"},
},
expectedType: eventTypeUpdate,
shouldPublish: true,
},
{
name: "rename event - allowed",
eventTypes: []string{"rename"},
notification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: "old.txt"},
NewEntry: &filer_pb.Entry{Name: "new.txt"},
NewParentPath: "/new/path",
},
expectedType: eventTypeRename,
shouldPublish: true,
},
{
name: "rename event - not allowed",
eventTypes: []string{"create", "delete", "update"},
notification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: "old.txt"},
NewEntry: &filer_pb.Entry{Name: "new.txt"},
NewParentPath: "/new/path",
},
expectedType: eventTypeRename,
shouldPublish: false,
},
{
name: "all events allowed when empty",
eventTypes: []string{},
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "test.txt"},
},
expectedType: eventTypeCreate,
shouldPublish: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &config{eventTypes: tt.eventTypes}
f := newFilter(cfg)
eventType := detectEventType(tt.notification)
if eventType != tt.expectedType {
t.Errorf("detectEventType() = %v, want %v", eventType, tt.expectedType)
}
shouldPublish := f.shouldPublish("/test/path", tt.notification)
if shouldPublish != tt.shouldPublish {
t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish)
}
})
}
}
func TestFilterPathPrefixes(t *testing.T) {
tests := []struct {
name string
pathPrefixes []string
key string
shouldPublish bool
}{
{
name: "matches single prefix",
pathPrefixes: []string{"/data/"},
key: "/data/file.txt",
shouldPublish: true,
},
{
name: "matches one of multiple prefixes",
pathPrefixes: []string{"/data/", "/logs/", "/tmp/"},
key: "/logs/app.log",
shouldPublish: true,
},
{
name: "no match",
pathPrefixes: []string{"/data/", "/logs/"},
key: "/other/file.txt",
shouldPublish: false,
},
{
name: "empty prefixes allows all",
pathPrefixes: []string{},
key: "/any/path/file.txt",
shouldPublish: true,
},
{
name: "exact prefix match",
pathPrefixes: []string{"/data"},
key: "/data",
shouldPublish: true,
},
{
name: "partial match not allowed",
pathPrefixes: []string{"/data/"},
key: "/database/file.txt",
shouldPublish: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &config{
pathPrefixes: tt.pathPrefixes,
eventTypes: []string{"create"},
}
f := newFilter(cfg)
notification := &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "test.txt"},
}
shouldPublish := f.shouldPublish(tt.key, notification)
if shouldPublish != tt.shouldPublish {
t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish)
}
})
}
}
func TestFilterCombined(t *testing.T) {
cfg := &config{
eventTypes: []string{"create", "update"},
pathPrefixes: []string{"/data/", "/logs/"},
}
f := newFilter(cfg)
tests := []struct {
name string
key string
notification *filer_pb.EventNotification
shouldPublish bool
}{
{
name: "allowed event and path",
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: true,
},
{
name: "allowed event but wrong path",
key: "/other/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "wrong event but allowed path",
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "wrong event and wrong path",
key: "/other/file.txt",
notification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
shouldPublish := f.shouldPublish(tt.key, tt.notification)
if shouldPublish != tt.shouldPublish {
t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish)
}
})
}
}

View File

@@ -2,30 +2,43 @@ package webhook
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/protobuf/proto"
)
type httpClient struct {
endpoint string
token string
timeout time.Duration
}
func newHTTPClient(cfg *config) (*httpClient, error) {
return &httpClient{
endpoint: cfg.endpoint,
token: cfg.authBearerToken,
timeout: time.Duration(cfg.timeoutSeconds) * time.Second,
}, nil
}
func (h *httpClient) sendMessage(key string, message proto.Message) error {
func (h *httpClient) sendMessage(message *webhookMessage) error {
// Serialize the protobuf message to JSON for HTTP payload
notificationData, err := json.Marshal(message.Notification)
if err != nil {
return fmt.Errorf("failed to marshal notification: %v", err)
}
payload := map[string]interface{}{
"key": key,
"message": message,
"key": message.Key,
"event_type": message.EventType,
"message": json.RawMessage(notificationData),
}
jsonData, err := json.Marshal(payload)
@@ -43,8 +56,18 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error {
req.Header.Set("Authorization", "Bearer "+h.token)
}
if h.timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
req = req.WithContext(ctx)
}
resp, err := util_http.Do(req)
if err != nil {
if err = drainResponse(resp); err != nil {
glog.Errorf("failed to drain response: %v", err)
}
return fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
@@ -55,3 +78,16 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error {
return nil
}
func drainResponse(resp *http.Response) error {
if resp == nil || resp.Body == nil {
return nil
}
_, err := io.ReadAll(resp.Body)
return errors.Join(
err,
resp.Body.Close(),
)
}

View File

@@ -48,7 +48,7 @@ func TestHttpClientSendMessage(t *testing.T) {
},
}
err = client.sendMessage("/test/path", message)
err = client.sendMessage(newWebhookMessage("/test/path", message))
if err != nil {
t.Fatalf("Failed to send message: %v", err)
}
@@ -57,6 +57,10 @@ func TestHttpClientSendMessage(t *testing.T) {
t.Errorf("Expected key '/test/path', got %v", receivedPayload["key"])
}
if receivedPayload["event_type"] != "create" {
t.Errorf("Expected event_type 'create', got %v", receivedPayload["event_type"])
}
if receivedPayload["message"] == nil {
t.Error("Expected message to be present")
}
@@ -92,7 +96,7 @@ func TestHttpClientSendMessageWithoutToken(t *testing.T) {
message := &filer_pb.EventNotification{}
err = client.sendMessage("/test/path", message)
err = client.sendMessage(newWebhookMessage("/test/path", message))
if err != nil {
t.Fatalf("Failed to send message: %v", err)
}
@@ -120,7 +124,7 @@ func TestHttpClientSendMessageServerError(t *testing.T) {
message := &filer_pb.EventNotification{}
err = client.sendMessage("/test/path", message)
err = client.sendMessage(newWebhookMessage("/test/path", message))
if err == nil {
t.Error("Expected error for server error response")
}
@@ -139,7 +143,7 @@ func TestHttpClientSendMessageNetworkError(t *testing.T) {
message := &filer_pb.EventNotification{}
err = client.sendMessage("/test/path", message)
err = client.sendMessage(newWebhookMessage("/test/path", message))
if err == nil {
t.Error("Expected error for network failure")
}

View File

@@ -0,0 +1,182 @@
package webhook
import (
"fmt"
"net/url"
"slices"
"strconv"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
)
const (
queueName = "webhook"
pubSubTopicName = "webhook_topic"
deadLetterTopic = "webhook_dead_letter"
)
type eventType string
const (
eventTypeCreate eventType = "create"
eventTypeDelete eventType = "delete"
eventTypeUpdate eventType = "update"
eventTypeRename eventType = "rename"
)
func (e eventType) valid() bool {
return slices.Contains([]eventType{
eventTypeCreate,
eventTypeDelete,
eventTypeUpdate,
eventTypeRename,
},
e,
)
}
var (
pubSubHandlerNameTemplate = func(n int) string {
return "webhook_handler_" + strconv.Itoa(n)
}
)
type client interface {
sendMessage(message *webhookMessage) error
}
type webhookMessage struct {
Key string `json:"key"`
EventType string `json:"event_type"`
Notification *filer_pb.EventNotification `json:"message_data"`
}
func newWebhookMessage(key string, message proto.Message) *webhookMessage {
notification, ok := message.(*filer_pb.EventNotification)
if !ok {
return nil
}
eventType := string(detectEventType(notification))
return &webhookMessage{
Key: key,
EventType: eventType,
Notification: notification,
}
}
type config struct {
endpoint string
authBearerToken string
timeoutSeconds int
maxRetries int
backoffSeconds int
maxBackoffSeconds int
nWorkers int
bufferSize int
eventTypes []string
pathPrefixes []string
}
func newConfigWithDefaults(configuration util.Configuration, prefix string) *config {
c := &config{
endpoint: configuration.GetString(prefix + "endpoint"),
authBearerToken: configuration.GetString(prefix + "bearer_token"),
timeoutSeconds: 10,
maxRetries: 3,
backoffSeconds: 3,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10_000,
}
if bufferSize := configuration.GetInt(prefix + "buffer_size"); bufferSize > 0 {
c.bufferSize = bufferSize
}
if workers := configuration.GetInt(prefix + "workers"); workers > 0 {
c.nWorkers = workers
}
if maxRetries := configuration.GetInt(prefix + "max_retries"); maxRetries > 0 {
c.maxRetries = maxRetries
}
if backoffSeconds := configuration.GetInt(prefix + "backoff_seconds"); backoffSeconds > 0 {
c.backoffSeconds = backoffSeconds
}
if maxBackoffSeconds := configuration.GetInt(prefix + "max_backoff_seconds"); maxBackoffSeconds > 0 {
c.maxBackoffSeconds = maxBackoffSeconds
}
if timeout := configuration.GetInt(prefix + "timeout_seconds"); timeout > 0 {
c.timeoutSeconds = timeout
}
c.eventTypes = configuration.GetStringSlice(prefix + "event_types")
c.pathPrefixes = configuration.GetStringSlice(prefix + "path_prefixes")
return c
}
func (c *config) validate() error {
if c.endpoint == "" {
return fmt.Errorf("webhook endpoint is required")
}
_, err := url.Parse(c.endpoint)
if err != nil {
return fmt.Errorf("invalid webhook endpoint: %w", err)
}
if c.timeoutSeconds < 1 || c.timeoutSeconds > 300 {
return fmt.Errorf("timeout must be between 1 and 300 seconds, got %d", c.timeoutSeconds)
}
if c.maxRetries < 0 || c.maxRetries > 10 {
return fmt.Errorf("max retries must be between 0 and 10, got %d", c.maxRetries)
}
if c.backoffSeconds < 1 || c.backoffSeconds > 60 {
return fmt.Errorf("backoff seconds must be between 1 and 60, got %d", c.backoffSeconds)
}
if c.maxBackoffSeconds < c.backoffSeconds || c.maxBackoffSeconds > 300 {
return fmt.Errorf("max backoff seconds must be between %d and 300, got %d", c.backoffSeconds, c.maxBackoffSeconds)
}
if c.nWorkers < 1 || c.nWorkers > 100 {
return fmt.Errorf("workers must be between 1 and 100, got %d", c.nWorkers)
}
if c.bufferSize < 100 || c.bufferSize > 1_000_000 {
return fmt.Errorf("buffer size must be between 100 and 1,000,000, got %d", c.bufferSize)
}
return nil
}
func detectEventType(notification *filer_pb.EventNotification) eventType {
hasOldEntry := notification.OldEntry != nil
hasNewEntry := notification.NewEntry != nil
hasNewParentPath := notification.NewParentPath != ""
if !hasOldEntry && hasNewEntry {
return eventTypeCreate
}
if hasOldEntry && !hasNewEntry {
return eventTypeDelete
}
if hasOldEntry && hasNewEntry {
if hasNewParentPath {
return eventTypeRename
}
return eventTypeUpdate
}
return eventTypeUpdate
}

View File

@@ -1,52 +1,82 @@
package webhook
import (
"context"
"errors"
"fmt"
"net/url"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/notification"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
)
// client defines the interface for transport client
// could be extended to support gRPC
type client interface {
sendMessage(key string, message proto.Message) error
}
func init() {
notification.MessageQueues = append(notification.MessageQueues, &WebhookQueue{})
notification.MessageQueues = append(notification.MessageQueues, &Queue{})
}
type WebhookQueue struct {
type Queue struct {
router *message.Router
queueChannel *gochannel.GoChannel
config *config
client client
filter *filter
ctx context.Context
cancel context.CancelFunc
}
type config struct {
endpoint string
authBearerToken string
func (w *Queue) GetName() string {
return queueName
}
func (c *config) validate() error {
_, err := url.Parse(c.endpoint)
if err != nil {
return fmt.Errorf("invalid webhook endpoint %w", err)
}
func (w *Queue) SendMessage(key string, msg proto.Message) error {
eventNotification, ok := msg.(*filer_pb.EventNotification)
if !ok {
return nil
}
func (w *WebhookQueue) GetName() string {
return "webhook"
}
func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix string) error {
c := &config{
endpoint: configuration.GetString(prefix + "endpoint"),
authBearerToken: configuration.GetString(prefix + "bearer_token"),
}
if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) {
return nil
}
m := newWebhookMessage(key, msg)
if m == nil {
return nil
}
wMsg, err := m.toWaterMillMessage()
if err != nil {
return err
}
return w.queueChannel.Publish(pubSubTopicName, wMsg)
}
func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) {
payload, err := proto.Marshal(w.Notification)
if err != nil {
return nil, err
}
msg := message.NewMessage(watermill.NewUUID(), payload)
// Set event type and key as metadata
msg.Metadata.Set("event_type", w.EventType)
msg.Metadata.Set("key", w.Key)
return msg, nil
}
func (w *Queue) Initialize(configuration util.Configuration, prefix string) error {
c := newConfigWithDefaults(configuration, prefix)
if err := c.validate(); err != nil {
return err
}
@@ -54,18 +84,124 @@ func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix strin
return w.initialize(c)
}
func (w *WebhookQueue) initialize(cfg *config) error {
client, err := newHTTPClient(cfg)
func (w *Queue) initialize(cfg *config) error {
w.ctx, w.cancel = context.WithCancel(context.Background())
w.config = cfg
w.filter = newFilter(cfg)
hClient, err := newHTTPClient(cfg)
if err != nil {
return fmt.Errorf("failed to create webhook client: %v", err)
return fmt.Errorf("failed to create webhook http client: %w", err)
}
w.client = client
w.client = hClient
if err = w.setupWatermillQueue(cfg); err != nil {
return fmt.Errorf("failed to setup watermill queue: %w", err)
}
if err = w.logDeadLetterMessages(); err != nil {
return err
}
return nil
}
func (w *WebhookQueue) SendMessage(key string, message proto.Message) error {
if w.client == nil {
return fmt.Errorf("webhook client not initialized")
func (w *Queue) setupWatermillQueue(cfg *config) error {
logger := watermill.NewStdLogger(false, false)
pubSubConfig := gochannel.Config{
OutputChannelBuffer: int64(cfg.bufferSize),
Persistent: false,
}
return w.client.sendMessage(key, message)
w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger)
router, err := message.NewRouter(
message.RouterConfig{
CloseTimeout: 60 * time.Second,
},
logger,
)
if err != nil {
return fmt.Errorf("failed to create router: %v", err)
}
w.router = router
retryMiddleware := middleware.Retry{
MaxRetries: cfg.maxRetries,
InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second,
MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second,
Multiplier: 2.0,
RandomizationFactor: 0.3,
Logger: logger,
}.Middleware
poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic)
if err != nil {
return fmt.Errorf("failed to create poison queue: %v", err)
}
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(retryMiddleware, poisonQueue)
for i := 0; i < cfg.nWorkers; i++ {
router.AddNoPublisherHandler(
pubSubHandlerNameTemplate(i),
pubSubTopicName,
w.queueChannel,
w.handleWebhook,
)
}
go func() {
// cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
defer w.cancel()
if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) {
glog.Errorf("webhook pubsub worker stopped with error: %v", err)
}
glog.Info("webhook pubsub worker stopped")
}()
return nil
}
func (w *Queue) handleWebhook(msg *message.Message) error {
var n filer_pb.EventNotification
if err := proto.Unmarshal(msg.Payload, &n); err != nil {
glog.Errorf("failed to unmarshal protobuf message: %v", err)
return err
}
// Reconstruct webhook message from metadata and payload
webhookMsg := &webhookMessage{
Key: msg.Metadata.Get("key"),
EventType: msg.Metadata.Get("event_type"),
Notification: &n,
}
if err := w.client.sendMessage(webhookMsg); err != nil {
glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err)
return err
}
return nil
}
func (w *Queue) logDeadLetterMessages() error {
ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic)
if err != nil {
return err
}
go func() {
for {
select {
case msg := <-ch:
glog.Errorf("received dead letter message: %s, key: %s", string(msg.Payload), msg.Metadata["key"])
case <-w.ctx.Done():
return
}
}
}()
return nil
}

View File

@@ -0,0 +1,536 @@
package webhook
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"google.golang.org/protobuf/proto"
)
func TestConfigValidation(t *testing.T) {
tests := []struct {
name string
config *config
wantErr bool
errMsg string
}{
{
name: "valid config",
config: &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: false,
},
{
name: "empty endpoint",
config: &config{
endpoint: "",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "endpoint is required",
},
{
name: "invalid URL",
config: &config{
endpoint: "://invalid-url",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "invalid webhook endpoint",
},
{
name: "timeout too large",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 301,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "timeout must be between",
},
{
name: "too many retries",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 30,
maxRetries: 11,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "max retries must be between",
},
{
name: "too many workers",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 101,
bufferSize: 10000,
},
wantErr: true,
errMsg: "workers must be between",
},
{
name: "buffer too large",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 1000001,
},
wantErr: true,
errMsg: "buffer size must be between",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.validate()
if (err != nil) != tt.wantErr {
t.Errorf("validate() error = %v, wantErr %v", err, tt.wantErr)
}
if err != nil && tt.errMsg != "" {
if err.Error() == "" || !strings.Contains(err.Error(), tt.errMsg) {
t.Errorf("validate() error message = %v, want to contain %v", err.Error(), tt.errMsg)
}
}
})
}
}
func TestWebhookMessageSerialization(t *testing.T) {
msg := &filer_pb.EventNotification{
OldEntry: nil,
NewEntry: &filer_pb.Entry{
Name: "test.txt",
IsDirectory: false,
},
}
webhookMsg := newWebhookMessage("/test/path", msg)
wmMsg, err := webhookMsg.toWaterMillMessage()
if err != nil {
t.Fatalf("Failed to convert to watermill message: %v", err)
}
// Unmarshal the protobuf payload directly
var eventNotification filer_pb.EventNotification
err = proto.Unmarshal(wmMsg.Payload, &eventNotification)
if err != nil {
t.Fatalf("Failed to unmarshal protobuf message: %v", err)
}
// Check metadata
if wmMsg.Metadata.Get("key") != "/test/path" {
t.Errorf("Expected key '/test/path', got %v", wmMsg.Metadata.Get("key"))
}
if wmMsg.Metadata.Get("event_type") != "create" {
t.Errorf("Expected event type 'create', got %v", wmMsg.Metadata.Get("event_type"))
}
if eventNotification.NewEntry.Name != "test.txt" {
t.Errorf("Expected file name 'test.txt', got %v", eventNotification.NewEntry.Name)
}
}
func TestQueueInitialize(t *testing.T) {
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 10,
maxRetries: 3,
backoffSeconds: 3,
maxBackoffSeconds: 60,
nWorkers: 1,
bufferSize: 100,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Errorf("Initialize() error = %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
if q.router == nil {
t.Error("Expected router to be initialized")
}
if q.queueChannel == nil {
t.Error("Expected queueChannel to be initialized")
}
if q.client == nil {
t.Error("Expected client to be initialized")
}
if q.config == nil {
t.Error("Expected config to be initialized")
}
}
// TestQueueSendMessage test sending messages to the queue
func TestQueueSendMessage(t *testing.T) {
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Fatalf("Failed to initialize queue: %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
msg := &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{
Name: "test.txt",
},
}
err = q.SendMessage("/test/path", msg)
if err != nil {
t.Errorf("SendMessage() error = %v", err)
}
}
func TestQueueHandleWebhook(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
cfg := &config{
endpoint: server.URL,
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 0,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
}
client, _ := newHTTPClient(cfg)
q := &Queue{
client: client,
}
message := newWebhookMessage("/test/path", &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{
Name: "test.txt",
},
})
wmMsg, err := message.toWaterMillMessage()
if err != nil {
t.Fatalf("Failed to create watermill message: %v", err)
}
err = q.handleWebhook(wmMsg)
if err != nil {
t.Errorf("handleWebhook() error = %v", err)
}
}
func TestQueueEndToEnd(t *testing.T) {
// Simplified test - just verify the queue can be created and message can be sent
// without needing full end-to-end processing
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 0,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Fatalf("Failed to initialize queue: %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
msg := &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{
Name: "test.txt",
},
}
err = q.SendMessage("/test/path", msg)
if err != nil {
t.Errorf("SendMessage() error = %v", err)
}
}
func TestQueueRetryMechanism(t *testing.T) {
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 3, // Test that this config is used
backoffSeconds: 2,
maxBackoffSeconds: 10,
nWorkers: 1,
bufferSize: 10,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Fatalf("Failed to initialize queue: %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
// Verify that the queue is properly configured for retries
if q.config.maxRetries != 3 {
t.Errorf("Expected maxRetries=3, got %d", q.config.maxRetries)
}
if q.config.backoffSeconds != 2 {
t.Errorf("Expected backoffSeconds=2, got %d", q.config.backoffSeconds)
}
if q.config.maxBackoffSeconds != 10 {
t.Errorf("Expected maxBackoffSeconds=10, got %d", q.config.maxBackoffSeconds)
}
// Test that we can send a message (retry behavior is handled by Watermill middleware)
msg := &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "test.txt"},
}
err = q.SendMessage("/test/retry", msg)
if err != nil {
t.Errorf("SendMessage() error = %v", err)
}
}
func TestQueueSendMessageWithFilter(t *testing.T) {
tests := []struct {
name string
cfg *config
key string
notification *filer_pb.EventNotification
shouldPublish bool
}{
{
name: "allowed event type",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"create"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: true,
},
{
name: "filtered event type",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"update", "rename"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "allowed path prefix",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
pathPrefixes: []string{"/data/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: true,
},
{
name: "filtered path prefix",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
pathPrefixes: []string{"/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "combined filters - both pass",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"create", "delete"},
pathPrefixes: []string{"/data/", "/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: true,
},
{
name: "combined filters - event fails",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"update", "delete"},
pathPrefixes: []string{"/data/", "/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "combined filters - path fails",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"create", "delete"},
pathPrefixes: []string{"/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
shouldPublish := newFilter(tt.cfg).shouldPublish(tt.key, tt.notification)
if shouldPublish != tt.shouldPublish {
t.Errorf("Expected shouldPublish=%v, got %v", tt.shouldPublish, shouldPublish)
}
})
}
}