mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-14 08:56:19 +08:00
1. adding statistics reporting
2. refactor version to util package
This commit is contained in:
26
go/stats/disk.go
Normal file
26
go/stats/disk.go
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package stats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DiskStatus struct {
|
||||||
|
All uint64 `json:"all"`
|
||||||
|
Used uint64 `json:"used"`
|
||||||
|
Free uint64 `json:"free"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func DiskUsage(path string) (disk *DiskStatus) {
|
||||||
|
disk = &DiskStatus{}
|
||||||
|
fs := syscall.Statfs_t{}
|
||||||
|
err := syscall.Statfs(path, &fs)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
disk.All = fs.Blocks * uint64(fs.Bsize)
|
||||||
|
disk.Free = fs.Bfree * uint64(fs.Bsize)
|
||||||
|
disk.Used = disk.All - disk.Free
|
||||||
|
return
|
||||||
|
}
|
17
go/stats/disk_windows.go
Normal file
17
go/stats/disk_windows.go
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
// +build windows
|
||||||
|
|
||||||
|
package stats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DiskStatus struct {
|
||||||
|
All uint64 `json:"all"`
|
||||||
|
Used uint64 `json:"used"`
|
||||||
|
Free uint64 `json:"free"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func DiskUsage(path string) (disk *DiskStatus) {
|
||||||
|
return
|
||||||
|
}
|
83
go/stats/duration_counter.go
Normal file
83
go/stats/duration_counter.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package stats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TimedValue struct {
|
||||||
|
t time.Time
|
||||||
|
val int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTimedValue(t time.Time, val int64) *TimedValue {
|
||||||
|
return &TimedValue{t: t, val: val}
|
||||||
|
}
|
||||||
|
|
||||||
|
type RoundRobinCounter struct {
|
||||||
|
LastIndex int
|
||||||
|
Values []int64
|
||||||
|
Counts []int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRoundRobinCounter(slots int) *RoundRobinCounter {
|
||||||
|
return &RoundRobinCounter{LastIndex: -1, Values: make([]int64, slots), Counts: make([]int64, slots)}
|
||||||
|
}
|
||||||
|
func (rrc *RoundRobinCounter) Add(index int, val int64) {
|
||||||
|
for rrc.LastIndex != index {
|
||||||
|
rrc.LastIndex++
|
||||||
|
if rrc.LastIndex >= len(rrc.Values) {
|
||||||
|
if index >= len(rrc.Values) {
|
||||||
|
break //just avoid endless loop
|
||||||
|
}
|
||||||
|
rrc.LastIndex = 0
|
||||||
|
}
|
||||||
|
rrc.Values[rrc.LastIndex] = 0
|
||||||
|
rrc.Counts[rrc.LastIndex] = 0
|
||||||
|
}
|
||||||
|
rrc.Values[index] += val
|
||||||
|
rrc.Counts[index]++
|
||||||
|
}
|
||||||
|
func (rrc *RoundRobinCounter) Max() (max int64) {
|
||||||
|
for _, val := range rrc.Values {
|
||||||
|
if max < val {
|
||||||
|
max = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func (rrc *RoundRobinCounter) Count() (cnt int64) {
|
||||||
|
for _, c := range rrc.Counts {
|
||||||
|
cnt += c
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func (rrc *RoundRobinCounter) Sum() (sum int64) {
|
||||||
|
for _, val := range rrc.Values {
|
||||||
|
sum += val
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type DurationCounter struct {
|
||||||
|
MinuteCounter *RoundRobinCounter
|
||||||
|
HourCounter *RoundRobinCounter
|
||||||
|
DayCounter *RoundRobinCounter
|
||||||
|
WeekCounter *RoundRobinCounter
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDurationCounter() *DurationCounter {
|
||||||
|
return &DurationCounter{
|
||||||
|
MinuteCounter: NewRoundRobinCounter(60),
|
||||||
|
HourCounter: NewRoundRobinCounter(60),
|
||||||
|
DayCounter: NewRoundRobinCounter(24),
|
||||||
|
WeekCounter: NewRoundRobinCounter(7),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add is for cumulative counts
|
||||||
|
func (sc *DurationCounter) Add(tv *TimedValue) {
|
||||||
|
sc.MinuteCounter.Add(tv.t.Second(), tv.val)
|
||||||
|
sc.HourCounter.Add(tv.t.Minute(), tv.val)
|
||||||
|
sc.DayCounter.Add(tv.t.Hour(), tv.val)
|
||||||
|
sc.WeekCounter.Add(int(tv.t.Weekday()), tv.val)
|
||||||
|
}
|
113
go/stats/stats.go
Normal file
113
go/stats/stats.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package stats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ServerStats struct {
|
||||||
|
Requests *DurationCounter
|
||||||
|
Connections *DurationCounter
|
||||||
|
AssignRequests *DurationCounter
|
||||||
|
ReadRequests *DurationCounter
|
||||||
|
WriteRequests *DurationCounter
|
||||||
|
DeleteRequests *DurationCounter
|
||||||
|
BytesIn *DurationCounter
|
||||||
|
BytesOut *DurationCounter
|
||||||
|
}
|
||||||
|
|
||||||
|
type Channels struct {
|
||||||
|
Connections chan *TimedValue
|
||||||
|
Requests chan *TimedValue
|
||||||
|
AssignRequests chan *TimedValue
|
||||||
|
ReadRequests chan *TimedValue
|
||||||
|
WriteRequests chan *TimedValue
|
||||||
|
DeleteRequests chan *TimedValue
|
||||||
|
BytesIn chan *TimedValue
|
||||||
|
BytesOut chan *TimedValue
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
Chan *Channels
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Chan = &Channels{
|
||||||
|
Connections: make(chan *TimedValue, 100),
|
||||||
|
Requests: make(chan *TimedValue, 100),
|
||||||
|
AssignRequests: make(chan *TimedValue, 100),
|
||||||
|
ReadRequests: make(chan *TimedValue, 100),
|
||||||
|
WriteRequests: make(chan *TimedValue, 100),
|
||||||
|
DeleteRequests: make(chan *TimedValue, 100),
|
||||||
|
BytesIn: make(chan *TimedValue, 100),
|
||||||
|
BytesOut: make(chan *TimedValue, 100),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServerStats() *ServerStats {
|
||||||
|
return &ServerStats{
|
||||||
|
Requests: NewDurationCounter(),
|
||||||
|
Connections: NewDurationCounter(),
|
||||||
|
AssignRequests: NewDurationCounter(),
|
||||||
|
ReadRequests: NewDurationCounter(),
|
||||||
|
WriteRequests: NewDurationCounter(),
|
||||||
|
DeleteRequests: NewDurationCounter(),
|
||||||
|
BytesIn: NewDurationCounter(),
|
||||||
|
BytesOut: NewDurationCounter(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConnectionOpen() {
|
||||||
|
Chan.Connections <- NewTimedValue(time.Now(), 1)
|
||||||
|
}
|
||||||
|
func ConnectionClose() {
|
||||||
|
Chan.Connections <- NewTimedValue(time.Now(), -1)
|
||||||
|
}
|
||||||
|
func RequestOpen() {
|
||||||
|
Chan.Requests <- NewTimedValue(time.Now(), 1)
|
||||||
|
}
|
||||||
|
func RequestClose() {
|
||||||
|
Chan.Requests <- NewTimedValue(time.Now(), -1)
|
||||||
|
}
|
||||||
|
func AssignRequest() {
|
||||||
|
Chan.AssignRequests <- NewTimedValue(time.Now(), 1)
|
||||||
|
}
|
||||||
|
func ReadRequest() {
|
||||||
|
Chan.ReadRequests <- NewTimedValue(time.Now(), 1)
|
||||||
|
}
|
||||||
|
func WriteRequest() {
|
||||||
|
Chan.WriteRequests <- NewTimedValue(time.Now(), 1)
|
||||||
|
}
|
||||||
|
func DeleteRequest() {
|
||||||
|
Chan.DeleteRequests <- NewTimedValue(time.Now(), 1)
|
||||||
|
}
|
||||||
|
func BytesIn(val int64) {
|
||||||
|
Chan.BytesIn <- NewTimedValue(time.Now(), val)
|
||||||
|
}
|
||||||
|
func BytesOut(val int64) {
|
||||||
|
Chan.BytesOut <- NewTimedValue(time.Now(), val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *ServerStats) Start() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case tv := <-Chan.Connections:
|
||||||
|
ss.Connections.Add(tv)
|
||||||
|
case tv := <-Chan.Requests:
|
||||||
|
ss.Requests.Add(tv)
|
||||||
|
case tv := <-Chan.AssignRequests:
|
||||||
|
ss.AssignRequests.Add(tv)
|
||||||
|
case tv := <-Chan.ReadRequests:
|
||||||
|
ss.ReadRequests.Add(tv)
|
||||||
|
case tv := <-Chan.WriteRequests:
|
||||||
|
ss.WriteRequests.Add(tv)
|
||||||
|
case tv := <-Chan.ReadRequests:
|
||||||
|
ss.ReadRequests.Add(tv)
|
||||||
|
case tv := <-Chan.DeleteRequests:
|
||||||
|
ss.DeleteRequests.Add(tv)
|
||||||
|
case tv := <-Chan.BytesIn:
|
||||||
|
ss.BytesIn.Add(tv)
|
||||||
|
case tv := <-Chan.BytesOut:
|
||||||
|
ss.BytesOut.Add(tv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
7
go/util/constants.go
Normal file
7
go/util/constants.go
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
package util
|
||||||
|
|
||||||
|
import ()
|
||||||
|
|
||||||
|
const (
|
||||||
|
VERSION = "0.51"
|
||||||
|
)
|
@@ -1,6 +1,7 @@
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"code.google.com/p/weed-fs/go/stats"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -18,6 +19,7 @@ func (l *Listener) Accept() (net.Conn, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
stats.ConnectionOpen()
|
||||||
tc := &Conn{
|
tc := &Conn{
|
||||||
Conn: c,
|
Conn: c,
|
||||||
ReadTimeout: l.ReadTimeout,
|
ReadTimeout: l.ReadTimeout,
|
||||||
@@ -34,20 +36,33 @@ type Conn struct {
|
|||||||
WriteTimeout time.Duration
|
WriteTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Read(b []byte) (int, error) {
|
func (c *Conn) Read(b []byte) (count int, e error) {
|
||||||
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
|
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return c.Conn.Read(b)
|
count, e = c.Conn.Read(b)
|
||||||
|
if e == nil {
|
||||||
|
stats.BytesIn(int64(count))
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Write(b []byte) (int, error) {
|
func (c *Conn) Write(b []byte) (count int, e error) {
|
||||||
err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
|
err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return c.Conn.Write(b)
|
count, e = c.Conn.Write(b)
|
||||||
|
if e == nil {
|
||||||
|
stats.BytesOut(int64(count))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Close() error {
|
||||||
|
stats.ConnectionClose()
|
||||||
|
return c.Conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewListener(addr string, timeout time.Duration) (net.Listener, error) {
|
func NewListener(addr string, timeout time.Duration) (net.Listener, error) {
|
||||||
|
@@ -98,7 +98,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runbenchmark(cmd *Command, args []string) bool {
|
func runbenchmark(cmd *Command, args []string) bool {
|
||||||
fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
|
fmt.Printf("This is Weed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
|
||||||
if *b.cpuprofile != "" {
|
if *b.cpuprofile != "" {
|
||||||
f, err := os.Create(*b.cpuprofile)
|
f, err := os.Create(*b.cpuprofile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -56,11 +56,11 @@ func runMaster(cmd *Command, args []string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder,
|
ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
|
||||||
*volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList,
|
*volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList,
|
||||||
)
|
)
|
||||||
|
|
||||||
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport))
|
glog.V(0).Infoln("Start Weed Master", util.VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport))
|
||||||
|
|
||||||
listener, e := util.NewListener(
|
listener, e := util.NewListener(
|
||||||
*masterIp+":"+strconv.Itoa(*mport),
|
*masterIp+":"+strconv.Itoa(*mport),
|
||||||
@@ -76,7 +76,7 @@ func runMaster(cmd *Command, args []string) bool {
|
|||||||
if *masterPeers != "" {
|
if *masterPeers != "" {
|
||||||
peers = strings.Split(*masterPeers, ",")
|
peers = strings.Split(*masterPeers, ",")
|
||||||
}
|
}
|
||||||
raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse)
|
raftServer := weed_server.NewRaftServer(r, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse)
|
||||||
ms.SetRaftServer(raftServer)
|
ms.SetRaftServer(raftServer)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@@ -104,11 +104,11 @@ func runServer(cmd *Command, args []string) bool {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder,
|
ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
|
||||||
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *garbageThreshold, serverWhiteList,
|
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *garbageThreshold, serverWhiteList,
|
||||||
)
|
)
|
||||||
|
|
||||||
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort))
|
glog.V(0).Infoln("Start Weed Master", util.VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort))
|
||||||
masterListener, e := util.NewListener(
|
masterListener, e := util.NewListener(
|
||||||
*serverIp+":"+strconv.Itoa(*masterPort),
|
*serverIp+":"+strconv.Itoa(*masterPort),
|
||||||
time.Duration(*serverTimeout)*time.Second,
|
time.Duration(*serverTimeout)*time.Second,
|
||||||
@@ -124,7 +124,7 @@ func runServer(cmd *Command, args []string) bool {
|
|||||||
if *serverPeers != "" {
|
if *serverPeers != "" {
|
||||||
peers = strings.Split(*serverPeers, ",")
|
peers = strings.Split(*serverPeers, ",")
|
||||||
}
|
}
|
||||||
raftServer := weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse)
|
raftServer := weed_server.NewRaftServer(r, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse)
|
||||||
ms.SetRaftServer(raftServer)
|
ms.SetRaftServer(raftServer)
|
||||||
volumeWait.Done()
|
volumeWait.Done()
|
||||||
}()
|
}()
|
||||||
@@ -138,11 +138,11 @@ func runServer(cmd *Command, args []string) bool {
|
|||||||
volumeWait.Wait()
|
volumeWait.Wait()
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
r := http.NewServeMux()
|
r := http.NewServeMux()
|
||||||
weed_server.NewVolumeServer(r, VERSION, *serverIp, *volumePort, *volumePublicUrl, folders, maxCounts,
|
weed_server.NewVolumeServer(r, *serverIp, *volumePort, *volumePublicUrl, folders, maxCounts,
|
||||||
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList,
|
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList,
|
||||||
)
|
)
|
||||||
|
|
||||||
glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*serverIp+":"+strconv.Itoa(*volumePort))
|
glog.V(0).Infoln("Start Weed volume server", util.VERSION, "at http://"+*serverIp+":"+strconv.Itoa(*volumePort))
|
||||||
volumeListener, e := util.NewListener(
|
volumeListener, e := util.NewListener(
|
||||||
*serverIp+":"+strconv.Itoa(*volumePort),
|
*serverIp+":"+strconv.Itoa(*volumePort),
|
||||||
time.Duration(*serverTimeout)*time.Second,
|
time.Duration(*serverTimeout)*time.Second,
|
||||||
|
@@ -1,14 +1,11 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"code.google.com/p/weed-fs/go/util"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
VERSION = "0.51"
|
|
||||||
)
|
|
||||||
|
|
||||||
var cmdVersion = &Command{
|
var cmdVersion = &Command{
|
||||||
Run: runVersion,
|
Run: runVersion,
|
||||||
UsageLine: "version",
|
UsageLine: "version",
|
||||||
@@ -21,6 +18,6 @@ func runVersion(cmd *Command, args []string) bool {
|
|||||||
cmd.Usage()
|
cmd.Usage()
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
|
fmt.Printf("version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@@ -74,11 +74,11 @@ func runVolume(cmd *Command, args []string) bool {
|
|||||||
|
|
||||||
r := http.NewServeMux()
|
r := http.NewServeMux()
|
||||||
|
|
||||||
weed_server.NewVolumeServer(r, VERSION, *ip, *vport, *publicUrl, folders, maxCounts,
|
weed_server.NewVolumeServer(r, *ip, *vport, *publicUrl, folders, maxCounts,
|
||||||
*masterNode, *vpulse, *dataCenter, *rack, volumeWhiteList,
|
*masterNode, *vpulse, *dataCenter, *rack, volumeWhiteList,
|
||||||
)
|
)
|
||||||
|
|
||||||
glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
|
glog.V(0).Infoln("Start Weed volume server", util.VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
|
||||||
listener, e := util.NewListener(
|
listener, e := util.NewListener(
|
||||||
*ip+":"+strconv.Itoa(*vport),
|
*ip+":"+strconv.Itoa(*vport),
|
||||||
time.Duration(*vTimeout)*time.Second,
|
time.Duration(*vTimeout)*time.Second,
|
||||||
|
@@ -4,7 +4,9 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"code.google.com/p/weed-fs/go/glog"
|
"code.google.com/p/weed-fs/go/glog"
|
||||||
"code.google.com/p/weed-fs/go/operation"
|
"code.google.com/p/weed-fs/go/operation"
|
||||||
|
"code.google.com/p/weed-fs/go/stats"
|
||||||
"code.google.com/p/weed-fs/go/storage"
|
"code.google.com/p/weed-fs/go/storage"
|
||||||
|
"code.google.com/p/weed-fs/go/util"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
@@ -14,6 +16,14 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var serverStats *stats.ServerStats
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
serverStats = stats.NewServerStats()
|
||||||
|
go serverStats.Start()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) {
|
func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) {
|
||||||
w.Header().Set("Content-Type", "application/javascript")
|
w.Header().Set("Content-Type", "application/javascript")
|
||||||
var bytes []byte
|
var bytes []byte
|
||||||
@@ -152,3 +162,15 @@ func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly b
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
func statsCounterHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
m["Version"] = util.VERSION
|
||||||
|
m["Statistics"] = serverStats
|
||||||
|
writeJsonQuiet(w, r, m)
|
||||||
|
}
|
||||||
|
func statsMemoryHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
m["Version"] = util.VERSION
|
||||||
|
m["Statistics"] = serverStats
|
||||||
|
writeJsonQuiet(w, r, m)
|
||||||
|
}
|
||||||
|
@@ -23,7 +23,6 @@ type MasterServer struct {
|
|||||||
defaultReplicaPlacement string
|
defaultReplicaPlacement string
|
||||||
garbageThreshold string
|
garbageThreshold string
|
||||||
whiteList []string
|
whiteList []string
|
||||||
version string
|
|
||||||
|
|
||||||
Topo *topology.Topology
|
Topo *topology.Topology
|
||||||
vg *replication.VolumeGrowth
|
vg *replication.VolumeGrowth
|
||||||
@@ -32,7 +31,7 @@ type MasterServer struct {
|
|||||||
bounedLeaderChan chan int
|
bounedLeaderChan chan int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
|
func NewMasterServer(r *mux.Router, port int, metaFolder string,
|
||||||
volumeSizeLimitMB uint,
|
volumeSizeLimitMB uint,
|
||||||
pulseSeconds int,
|
pulseSeconds int,
|
||||||
confFile string,
|
confFile string,
|
||||||
@@ -41,7 +40,6 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
|
|||||||
whiteList []string,
|
whiteList []string,
|
||||||
) *MasterServer {
|
) *MasterServer {
|
||||||
ms := &MasterServer{
|
ms := &MasterServer{
|
||||||
version: version,
|
|
||||||
volumeSizeLimitMB: volumeSizeLimitMB,
|
volumeSizeLimitMB: volumeSizeLimitMB,
|
||||||
pulseSeconds: pulseSeconds,
|
pulseSeconds: pulseSeconds,
|
||||||
defaultReplicaPlacement: defaultReplicaPlacement,
|
defaultReplicaPlacement: defaultReplicaPlacement,
|
||||||
@@ -68,6 +66,8 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
|
|||||||
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
|
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
|
||||||
r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler))
|
r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler))
|
||||||
r.HandleFunc("/{filekey}", ms.redirectHandler)
|
r.HandleFunc("/{filekey}", ms.redirectHandler)
|
||||||
|
r.HandleFunc("/stats/counter", secure(ms.whiteList, statsCounterHandler))
|
||||||
|
r.HandleFunc("/stats/memory", secure(ms.whiteList, statsMemoryHandler))
|
||||||
|
|
||||||
ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
|
ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
|
||||||
|
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package weed_server
|
package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"code.google.com/p/weed-fs/go/stats"
|
||||||
"code.google.com/p/weed-fs/go/storage"
|
"code.google.com/p/weed-fs/go/storage"
|
||||||
"code.google.com/p/weed-fs/go/util"
|
"code.google.com/p/weed-fs/go/util"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@@ -37,6 +38,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
stats.AssignRequest()
|
||||||
c, e := strconv.Atoi(r.FormValue("count"))
|
c, e := strconv.Atoi(r.FormValue("count"))
|
||||||
if e != nil {
|
if e != nil {
|
||||||
c = 1
|
c = 1
|
||||||
@@ -119,7 +121,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["Version"] = ms.version
|
m["Version"] = util.VERSION
|
||||||
m["Topology"] = ms.Topo.ToMap()
|
m["Topology"] = ms.Topo.ToMap()
|
||||||
writeJsonQuiet(w, r, m)
|
writeJsonQuiet(w, r, m)
|
||||||
}
|
}
|
||||||
@@ -159,7 +161,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
|
|||||||
|
|
||||||
func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["Version"] = ms.version
|
m["Version"] = util.VERSION
|
||||||
m["Volumes"] = ms.Topo.ToVolumeMap()
|
m["Volumes"] = ms.Topo.ToVolumeMap()
|
||||||
writeJsonQuiet(w, r, m)
|
writeJsonQuiet(w, r, m)
|
||||||
}
|
}
|
||||||
|
@@ -21,14 +21,12 @@ type RaftServer struct {
|
|||||||
raftServer raft.Server
|
raftServer raft.Server
|
||||||
dataDir string
|
dataDir string
|
||||||
httpAddr string
|
httpAddr string
|
||||||
version string
|
|
||||||
router *mux.Router
|
router *mux.Router
|
||||||
topo *topology.Topology
|
topo *topology.Topology
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
|
func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
|
||||||
s := &RaftServer{
|
s := &RaftServer{
|
||||||
version: version,
|
|
||||||
peers: peers,
|
peers: peers,
|
||||||
httpAddr: httpAddr,
|
httpAddr: httpAddr,
|
||||||
dataDir: dataDir,
|
dataDir: dataDir,
|
||||||
|
@@ -15,15 +15,13 @@ type VolumeServer struct {
|
|||||||
rack string
|
rack string
|
||||||
whiteList []string
|
whiteList []string
|
||||||
store *storage.Store
|
store *storage.Store
|
||||||
version string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publicUrl string, folders []string, maxCounts []int,
|
func NewVolumeServer(r *http.ServeMux, ip string, port int, publicUrl string, folders []string, maxCounts []int,
|
||||||
masterNode string, pulseSeconds int,
|
masterNode string, pulseSeconds int,
|
||||||
dataCenter string, rack string,
|
dataCenter string, rack string,
|
||||||
whiteList []string) *VolumeServer {
|
whiteList []string) *VolumeServer {
|
||||||
vs := &VolumeServer{
|
vs := &VolumeServer{
|
||||||
version: version,
|
|
||||||
masterNode: masterNode,
|
masterNode: masterNode,
|
||||||
pulseSeconds: pulseSeconds,
|
pulseSeconds: pulseSeconds,
|
||||||
dataCenter: dataCenter,
|
dataCenter: dataCenter,
|
||||||
@@ -40,6 +38,8 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
|
|||||||
r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler))
|
r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler))
|
||||||
r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler))
|
r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler))
|
||||||
r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler))
|
r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler))
|
||||||
|
r.HandleFunc("/stats/counter", secure(vs.whiteList, statsCounterHandler))
|
||||||
|
r.HandleFunc("/stats/memory", secure(vs.whiteList, statsMemoryHandler))
|
||||||
r.HandleFunc("/", vs.storeHandler)
|
r.HandleFunc("/", vs.storeHandler)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@@ -4,7 +4,9 @@ import (
|
|||||||
"code.google.com/p/weed-fs/go/glog"
|
"code.google.com/p/weed-fs/go/glog"
|
||||||
"code.google.com/p/weed-fs/go/operation"
|
"code.google.com/p/weed-fs/go/operation"
|
||||||
"code.google.com/p/weed-fs/go/replication"
|
"code.google.com/p/weed-fs/go/replication"
|
||||||
|
"code.google.com/p/weed-fs/go/stats"
|
||||||
"code.google.com/p/weed-fs/go/storage"
|
"code.google.com/p/weed-fs/go/storage"
|
||||||
|
"code.google.com/p/weed-fs/go/util"
|
||||||
"mime"
|
"mime"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -16,7 +18,7 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
|
|||||||
|
|
||||||
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
|
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["Version"] = vs.version
|
m["Version"] = util.VERSION
|
||||||
m["Volumes"] = vs.store.Status()
|
m["Volumes"] = vs.store.Status()
|
||||||
writeJsonQuiet(w, r, m)
|
writeJsonQuiet(w, r, m)
|
||||||
}
|
}
|
||||||
@@ -86,14 +88,19 @@ func (vs *VolumeServer) submitFromVolumeServerHandler(w http.ResponseWriter, r *
|
|||||||
func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) {
|
func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case "GET":
|
case "GET":
|
||||||
|
stats.ReadRequest()
|
||||||
vs.GetOrHeadHandler(w, r, true)
|
vs.GetOrHeadHandler(w, r, true)
|
||||||
case "HEAD":
|
case "HEAD":
|
||||||
|
stats.ReadRequest()
|
||||||
vs.GetOrHeadHandler(w, r, false)
|
vs.GetOrHeadHandler(w, r, false)
|
||||||
case "DELETE":
|
case "DELETE":
|
||||||
|
stats.DeleteRequest()
|
||||||
secure(vs.whiteList, vs.DeleteHandler)(w, r)
|
secure(vs.whiteList, vs.DeleteHandler)(w, r)
|
||||||
case "PUT":
|
case "PUT":
|
||||||
|
stats.WriteRequest()
|
||||||
secure(vs.whiteList, vs.PostHandler)(w, r)
|
secure(vs.whiteList, vs.PostHandler)(w, r)
|
||||||
case "POST":
|
case "POST":
|
||||||
|
stats.WriteRequest()
|
||||||
secure(vs.whiteList, vs.PostHandler)(w, r)
|
secure(vs.whiteList, vs.PostHandler)(w, r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user