Merge pull request #2868 from kmlebedev/hashicorp_raft

hashicorp raft
This commit is contained in:
Chris Lu
2022-04-10 23:00:05 -07:00
committed by GitHub
33 changed files with 1817 additions and 252 deletions

View File

@@ -0,0 +1,66 @@
package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/hashicorp/raft"
)
func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
resp := &master_pb.RaftListClusterServersResponse{}
servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
for _, server := range servers {
resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
Id: string(server.ID),
Address: string(server.Address),
Suffrage: server.Suffrage.String(),
})
}
return resp, nil
}
func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
resp := &master_pb.RaftAddServerResponse{}
if ms.Topo.HashicorpRaft.State() != raft.Leader {
return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
}
var idxFuture raft.IndexFuture
if req.Voter {
idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
} else {
idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
}
if err := idxFuture.Error(); err != nil {
return nil, err
}
return resp, nil
}
func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
resp := &master_pb.RaftRemoveServerResponse{}
if ms.Topo.HashicorpRaft.State() != raft.Leader {
return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
}
if !req.Force {
ms.clientChansLock.RLock()
_, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)]
ms.clientChansLock.RUnlock()
if ok {
return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id)
}
}
idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0)
if err := idxFuture.Error(); err != nil {
return nil, err
}
return resp, nil
}

View File

@@ -1,6 +1,7 @@
package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
@@ -17,6 +18,7 @@ import (
"github.com/chrislusf/raft"
"github.com/gorilla/mux"
hashicorpRaft "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -30,8 +32,9 @@ import (
)
const (
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
RaftServerRemovalTime = 72 * time.Minute
)
type MasterOption struct {
@@ -62,6 +65,9 @@ type MasterServer struct {
boundedLeaderChan chan int
onPeerUpdatDoneCn chan string
onPeerUpdatDoneCnExist bool
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
@@ -112,6 +118,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
ms.onPeerUpdatDoneCn = make(chan string)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
if nil == seq {
@@ -160,19 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
var raftServerName string
if raftServer.raftServer != nil {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
raftServerName = ms.Topo.RaftServer.Name()
} else if raftServer.RaftHashicorp != nil {
ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
leaderCh := raftServer.RaftHashicorp.LeaderCh()
prevLeader := ms.Topo.HashicorpRaft.Leader()
go func() {
for {
select {
case isLeader := <-leaderCh:
leader := ms.Topo.HashicorpRaft.Leader()
glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
prevLeader = leader
}
}
}()
raftServerName = ms.Topo.HashicorpRaft.String()
}
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
if ms.Topo.RaftServer.Leader() != "" {
if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
} else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
}
}
@@ -181,31 +212,38 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
} else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
ms.boundedLeaderChan <- 1
defer func() { <-ms.boundedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err))
return
}
glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
director := proxy.Director
proxy.Director = func(req *http.Request) {
actualHost, err := security.GetActualRemoteHost(req)
if err == nil {
req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
}
director(req)
}
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
// handle requests locally
f(w, r)
return
}
var raftServerLeader string
if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
raftServerLeader = ms.Topo.RaftServer.Leader()
} else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
raftServerLeader = string(ms.Topo.HashicorpRaft.Leader())
}
if raftServerLeader == "" {
f(w, r)
return
}
ms.boundedLeaderChan <- 1
defer func() { <-ms.boundedLeaderChan }()
targetUrl, err := url.Parse("http://" + raftServerLeader)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err))
return
}
glog.V(4).Infoln("proxying to leader", raftServerLeader)
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
director := proxy.Director
proxy.Director = func(req *http.Request) {
actualHost, err := security.GetActualRemoteHost(req)
if err == nil {
req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
}
director(req)
}
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
}
}
@@ -301,3 +339,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
return seq
}
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return
}
glog.V(4).Infof("OnPeerUpdate: %+v", update)
peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress)
isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
if update.IsAdd {
if isLeader {
raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName {
raftServerFound = true
}
}
if !raftServerFound {
glog.V(0).Infof("adding new raft server: %s", peerName)
ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
}
if ms.onPeerUpdatDoneCnExist {
ms.onPeerUpdatDoneCn <- peerName
}
} else if isLeader {
go func(peerName string) {
for {
select {
case <-time.After(RaftServerRemovalTime):
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: peerName,
Force: false,
})
return err
})
if err != nil {
glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
}
return
case peerDone := <-ms.onPeerUpdatDoneCn:
if peerName == peerDone {
return
}
}
}
}(peerName)
ms.onPeerUpdatDoneCnExist = true
}
}

View File

@@ -5,6 +5,8 @@ import (
"time"
"github.com/chrislusf/raft"
hashicorpRaft "github.com/hashicorp/raft"
ui "github.com/chrislusf/seaweedfs/weed/server/master_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -13,20 +15,40 @@ import (
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
args := struct {
Version string
Topology interface{}
RaftServer raft.Server
Stats map[string]interface{}
Counters *stats.ServerStats
VolumeSizeLimitMB uint32
}{
util.Version(),
ms.Topo.ToMap(),
ms.Topo.RaftServer,
infos,
serverStats,
ms.option.VolumeSizeLimitMB,
infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId()
if ms.Topo.RaftServer != nil {
args := struct {
Version string
Topology interface{}
RaftServer raft.Server
Stats map[string]interface{}
Counters *stats.ServerStats
VolumeSizeLimitMB uint32
}{
util.Version(),
ms.Topo.ToMap(),
ms.Topo.RaftServer,
infos,
serverStats,
ms.option.VolumeSizeLimitMB,
}
ui.StatusTpl.Execute(w, args)
} else if ms.Topo.HashicorpRaft != nil {
args := struct {
Version string
Topology interface{}
RaftServer *hashicorpRaft.Raft
Stats map[string]interface{}
Counters *stats.ServerStats
VolumeSizeLimitMB uint32
}{
util.Version(),
ms.Topo.ToMap(),
ms.Topo.HashicorpRaft,
infos,
serverStats,
ms.option.VolumeSizeLimitMB,
}
ui.StatusNewRaftTpl.Execute(w, args)
}
ui.StatusTpl.Execute(w, args)
}

View File

@@ -0,0 +1,121 @@
<!DOCTYPE html>
<html>
<head>
<title>SeaweedFS {{ .Version }}</title>
<link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
</head>
<body>
<div class="container">
<div class="page-header">
<h1>
<a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
SeaweedFS <small>{{ .Version }}</small>
</h1>
</div>
<div class="row">
<div class="col-sm-6">
<h2>Cluster status</h2>
<table class="table table-condensed table-striped">
<tbody>
<tr>
<th>Volume Size Limit</th>
<td>{{ .VolumeSizeLimitMB }}MB</td>
</tr>
<tr>
<th>Free</th>
<td>{{ .Topology.Free }}</td>
</tr>
<tr>
<th>Max</th>
<td>{{ .Topology.Max }}</td>
</tr>
{{ with .RaftServer }}
<tr>
<th>Leader</th>
<td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td>
</tr>
<tr>
<th>Other Masters</th>
<td class="col-sm-5">
<ul class="list-unstyled">
{{ range $k, $p := .GetConfiguration.Configuration.Servers }}
<li><a href="http://{{ $p.ID }}/ui/index.html">{{ $p.ID }}</a></li>
{{ end }}
</ul>
</td>
</tr>
{{ end }}
</tbody>
</table>
</div>
<div class="col-sm-6">
<h2>System Stats</h2>
<table class="table table-condensed table-striped">
<tr>
<th>Concurrent Connections</th>
<td>{{ .Counters.Connections.WeekCounter.Sum }}</td>
</tr>
{{ range $key, $val := .Stats }}
<tr>
<th>{{ $key }}</th>
<td>{{ $val }}</td>
</tr>
{{ end }}
</table>
<h2>Raft Stats</h2>
<table class="table table-condensed table-striped">
<tr>
<th>applied_index</th>
<td>{{ .RaftServer.Stats.applied_index }}</td>
</tr>
<tr>
<th>last_log_term</th>
<td>{{ .RaftServer.Stats.last_log_term }}</td>
</tr>
</table>
</div>
</div>
<div class="row">
<h2>Topology</h2>
<table class="table table-striped">
<thead>
<tr>
<th>Data Center</th>
<th>Rack</th>
<th>RemoteAddr</th>
<th>#Volumes</th>
<th>Volume Ids</th>
<th>#ErasureCodingShards</th>
<th>Max</th>
</tr>
</thead>
<tbody>
{{ range $dc_index, $dc := .Topology.DataCenters }}
{{ range $rack_index, $rack := $dc.Racks }}
{{ range $dn_index, $dn := $rack.DataNodes }}
<tr>
<td><code>{{ $dc.Id }}</code></td>
<td>{{ $rack.Id }}</td>
<td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
{{ if ne $dn.PublicUrl $dn.Url }}
/ <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
{{ end }}
</td>
<td>{{ $dn.Volumes }}</td>
<td>{{ $dn.VolumeIds}}</td>
<td>{{ $dn.EcShards }}</td>
<td>{{ $dn.Max }}</td>
</tr>
{{ end }}
{{ end }}
{{ end }}
</tbody>
</table>
</div>
</div>
</body>
</html>

View File

@@ -8,4 +8,8 @@ import (
//go:embed master.html
var masterHtml string
//go:embed masterNewRaft.html
var masterNewRaftHtml string
var StatusTpl = template.Must(template.New("status").Parse(masterHtml))
var StatusNewRaftTpl = template.Must(template.New("status").Parse(masterNewRaftHtml))

View File

@@ -0,0 +1,183 @@
package weed_server
// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28
// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18
import (
"fmt"
transport "github.com/Jille/raft-grpc-transport"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/hashicorp/raft"
boltdb "github.com/hashicorp/raft-boltdb"
"google.golang.org/grpc"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
)
const (
ldbFile = "logs.dat"
sdbFile = "stable.dat"
updatePeersTimeout = 15 * time.Minute
)
func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int {
peers := make([]pb.ServerAddress, 0, len(mapPeers))
for _, peer := range mapPeers {
peers = append(peers, peer)
}
sort.Slice(peers, func(i, j int) bool {
return strings.Compare(string(peers[i]), string(peers[j])) < 0
})
for i, peer := range peers {
if string(peer) == string(self) {
return i
}
}
return -1
}
func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
for _, peer := range s.peers {
cfg.Servers = append(cfg.Servers, raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(peer),
Address: raft.ServerAddress(peer.ToGrpcAddress()),
})
}
return cfg
}
func (s *RaftServer) UpdatePeers() {
for {
select {
case isLeader := <-s.RaftHashicorp.LeaderCh():
if isLeader {
peerLeader := string(s.serverAddr)
existsPeerName := make(map[string]bool)
for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerLeader {
continue
}
existsPeerName[string(server.ID)] = true
}
for _, peer := range s.peers {
peerName := string(peer)
if peerName == peerLeader || existsPeerName[peerName] {
continue
}
glog.V(0).Infof("adding new peer: %s", peerName)
s.RaftHashicorp.AddVoter(
raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
}
for peer, _ := range existsPeerName {
if _, found := s.peers[peer]; !found {
glog.V(0).Infof("removing old peer: %s", peer)
s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
}
}
if _, found := s.peers[peerLeader]; !found {
glog.V(0).Infof("removing old leader peer: %s", peerLeader)
s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
}
}
return
case <-time.After(updatePeersTimeout):
return
}
}
}
func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
serverAddr: option.ServerAddr,
dataDir: option.DataDir,
topo: option.Topo,
}
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
c.ElectionTimeout = option.ElectionTimeout
if c.LeaderLeaseTimeout > c.HeartbeatTimeout {
c.LeaderLeaseTimeout = c.HeartbeatTimeout
}
if glog.V(4) {
c.LogLevel = "Debug"
} else if glog.V(2) {
c.LogLevel = "Info"
} else if glog.V(1) {
c.LogLevel = "Warn"
} else if glog.V(0) {
c.LogLevel = "Error"
}
if option.RaftBootstrap {
os.RemoveAll(path.Join(s.dataDir, ldbFile))
os.RemoveAll(path.Join(s.dataDir, sdbFile))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
baseDir := s.dataDir
ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, ldbFile))
if err != nil {
return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
}
sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, sdbFile))
if err != nil {
return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
}
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
if err != nil {
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption})
stateMachine := StateMachine{topo: option.Topo}
s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
if err != nil {
return nil, fmt.Errorf("raft.NewRaft: %v", err)
}
if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
cfg := s.AddPeersConfiguration()
// Need to get lock, in case all servers do this at the same time.
peerIdx := getPeerIdx(s.serverAddr, s.peers)
timeSpeep := time.Duration(float64(c.LeaderLeaseTimeout) * (rand.Float64()*0.25 + 1) * float64(peerIdx))
glog.V(0).Infof("Bootstrapping idx: %d sleep: %v new cluster: %+v", peerIdx, timeSpeep, cfg)
time.Sleep(timeSpeep)
f := s.RaftHashicorp.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
} else {
go s.UpdatePeers()
}
ticker := time.NewTicker(c.HeartbeatTimeout * 10)
if glog.V(4) {
go func() {
for {
select {
case <-ticker.C:
cfuture := s.RaftHashicorp.GetConfiguration()
if err = cfuture.Error(); err != nil {
glog.Fatalf("error getting config: %s", err)
}
configuration := cfuture.Configuration()
glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers)
}
}
}()
}
return s, nil
}

View File

@@ -2,6 +2,9 @@ package weed_server
import (
"encoding/json"
transport "github.com/Jille/raft-grpc-transport"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
@@ -12,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/raft"
hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
@@ -26,14 +30,17 @@ type RaftServerOption struct {
RaftResumeState bool
HeartbeatInterval time.Duration
ElectionTimeout time.Duration
RaftBootstrap bool
}
type RaftServer struct {
peers map[string]pb.ServerAddress // initial peers to join with
raftServer raft.Server
dataDir string
serverAddr pb.ServerAddress
topo *topology.Topology
peers map[string]pb.ServerAddress // initial peers to join with
raftServer raft.Server
RaftHashicorp *hashicorpRaft.Raft
TransportManager *transport.Manager
dataDir string
serverAddr pb.ServerAddress
topo *topology.Topology
*raft.GrpcServer
}
@@ -42,6 +49,8 @@ type StateMachine struct {
topo *topology.Topology
}
var _ hashicorpRaft.FSM = &StateMachine{}
func (s StateMachine) Save() ([]byte, error) {
state := topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
@@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
before := s.topo.GetMaxVolumeId()
state := topology.MaxVolumeIdCommand{}
err := json.Unmarshal(l.Data, &state)
if err != nil {
return err
}
s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
return nil
}
func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
return &topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
}, nil
}
func (s *StateMachine) Restore(r io.ReadCloser) error {
b, err := ioutil.ReadAll(r)
if err != nil {
return err
}
if err := s.Recovery(b); err != nil {
return err
}
return nil
}
func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
@@ -132,12 +171,17 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
}
func (s *RaftServer) Peers() (members []string) {
peers := s.raftServer.Peers()
for _, p := range peers {
members = append(members, p.Name)
if s.raftServer != nil {
peers := s.raftServer.Peers()
for _, p := range peers {
members = append(members, p.Name)
}
} else if s.RaftHashicorp != nil {
cfg := s.RaftHashicorp.GetConfiguration()
for _, p := range cfg.Configuration().Servers {
members = append(members, string(p.ID))
}
}
return
}

View File

@@ -25,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
}
writeJsonQuiet(w, r, http.StatusOK, ret)
}
func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) {
if s.RaftHashicorp == nil {
writeJsonQuiet(w, r, http.StatusNotFound, nil)
return
}
writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats())
}