mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-02-09 09:17:28 +08:00
renaming msgqueue to notification
This commit is contained in:
33
weed/notification/configuration.go
Normal file
33
weed/notification/configuration.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var (
|
||||
MessageQueues []MessageQueue
|
||||
|
||||
Queue MessageQueue
|
||||
)
|
||||
|
||||
func LoadConfiguration(config *viper.Viper) {
|
||||
|
||||
if config == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, store := range MessageQueues {
|
||||
if config.GetBool(store.GetName() + ".enabled") {
|
||||
viperSub := config.Sub(store.GetName())
|
||||
if err := store.Initialize(viperSub); err != nil {
|
||||
glog.Fatalf("Failed to initialize store for %s: %+v",
|
||||
store.GetName(), err)
|
||||
}
|
||||
Queue = store
|
||||
glog.V(0).Infof("Configure message queue for %s", store.GetName())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
79
weed/notification/kafka/kafka_queue.go
Normal file
79
weed/notification/kafka/kafka_queue.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/notification"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
func init() {
|
||||
notification.MessageQueues = append(notification.MessageQueues, &KafkaQueue{})
|
||||
}
|
||||
|
||||
type KafkaQueue struct {
|
||||
topic string
|
||||
producer sarama.AsyncProducer
|
||||
}
|
||||
|
||||
func (k *KafkaQueue) GetName() string {
|
||||
return "kafka"
|
||||
}
|
||||
|
||||
func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) {
|
||||
glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
|
||||
glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic"))
|
||||
return k.initialize(
|
||||
configuration.GetStringSlice("hosts"),
|
||||
configuration.GetString("topic"),
|
||||
)
|
||||
}
|
||||
|
||||
func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
|
||||
config := sarama.NewConfig()
|
||||
config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
config.Producer.Partitioner = sarama.NewHashPartitioner
|
||||
config.Producer.Return.Successes = true
|
||||
config.Producer.Return.Errors = true
|
||||
k.producer, err = sarama.NewAsyncProducer(hosts, config)
|
||||
k.topic = topic
|
||||
go k.handleSuccess()
|
||||
go k.handleError()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) {
|
||||
bytes, err := proto.Marshal(message)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
msg := &sarama.ProducerMessage{
|
||||
Topic: k.topic,
|
||||
Key: sarama.StringEncoder(key),
|
||||
Value: sarama.ByteEncoder(bytes),
|
||||
}
|
||||
|
||||
k.producer.Input() <- msg
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KafkaQueue) handleSuccess() {
|
||||
for {
|
||||
pm := <-k.producer.Successes()
|
||||
if pm != nil {
|
||||
glog.V(3).Infof("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (k *KafkaQueue) handleError() {
|
||||
for {
|
||||
err := <-k.producer.Errors()
|
||||
if err != nil {
|
||||
glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
|
||||
}
|
||||
}
|
||||
}
|
||||
29
weed/notification/log/log_queue.go
Normal file
29
weed/notification/log/log_queue.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/notification"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
func init() {
|
||||
notification.MessageQueues = append(notification.MessageQueues, &LogQueue{})
|
||||
}
|
||||
|
||||
type LogQueue struct {
|
||||
}
|
||||
|
||||
func (k *LogQueue) GetName() string {
|
||||
return "log"
|
||||
}
|
||||
|
||||
func (k *LogQueue) Initialize(configuration util.Configuration) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *LogQueue) SendMessage(key string, message proto.Message) (err error) {
|
||||
|
||||
glog.V(0).Infof("%v: %+v", key, message)
|
||||
return nil
|
||||
}
|
||||
14
weed/notification/message_queue.go
Normal file
14
weed/notification/message_queue.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
type MessageQueue interface {
|
||||
// GetName gets the name to locate the configuration in message_queue.toml file
|
||||
GetName() string
|
||||
// Initialize initializes the file store
|
||||
Initialize(configuration util.Configuration) error
|
||||
SendMessage(key string, message proto.Message) error
|
||||
}
|
||||
Reference in New Issue
Block a user