mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-22 07:27:25 +08:00
instant notification of new volumes added or deleted
This commit is contained in:
@@ -18,16 +18,18 @@ const (
|
||||
* A VolumeServer contains one Store
|
||||
*/
|
||||
type Store struct {
|
||||
Ip string
|
||||
Port int
|
||||
PublicUrl string
|
||||
Locations []*DiskLocation
|
||||
dataCenter string //optional informaton, overwriting master setting if exists
|
||||
rack string //optional information, overwriting master setting if exists
|
||||
connected bool
|
||||
VolumeSizeLimit uint64 //read from the master
|
||||
Client master_pb.Seaweed_SendHeartbeatClient
|
||||
NeedleMapType NeedleMapType
|
||||
Ip string
|
||||
Port int
|
||||
PublicUrl string
|
||||
Locations []*DiskLocation
|
||||
dataCenter string //optional informaton, overwriting master setting if exists
|
||||
rack string //optional information, overwriting master setting if exists
|
||||
connected bool
|
||||
VolumeSizeLimit uint64 //read from the master
|
||||
Client master_pb.Seaweed_SendHeartbeatClient
|
||||
NeedleMapType NeedleMapType
|
||||
NewVolumeIdChan chan VolumeId
|
||||
DeletedVolumeIdChan chan VolumeId
|
||||
}
|
||||
|
||||
func (s *Store) String() (str string) {
|
||||
@@ -43,6 +45,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
|
||||
location.loadExistingVolumes(needleMapKind)
|
||||
s.Locations = append(s.Locations, location)
|
||||
}
|
||||
s.NewVolumeIdChan = make(chan VolumeId, 3)
|
||||
s.DeletedVolumeIdChan = make(chan VolumeId, 3)
|
||||
return
|
||||
}
|
||||
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
|
||||
@@ -87,6 +91,7 @@ func (s *Store) DeleteCollection(collection string) (e error) {
|
||||
if e != nil {
|
||||
return
|
||||
}
|
||||
// let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumeIdChan
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -119,6 +124,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
|
||||
location.Directory, vid, collection, replicaPlacement, ttl)
|
||||
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil {
|
||||
location.SetVolume(vid, volume)
|
||||
s.NewVolumeIdChan <- vid
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
@@ -232,14 +238,6 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) updateMaster() {
|
||||
if s.Client != nil {
|
||||
if e := s.Client.Send(s.CollectHeartbeat()); e != nil {
|
||||
glog.V(0).Infoln("error when reporting size:", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
|
||||
if v := s.findVolume(i); v != nil && !v.readOnly {
|
||||
return v.deleteNeedle(n)
|
||||
@@ -265,7 +263,7 @@ func (s *Store) HasVolume(i VolumeId) bool {
|
||||
func (s *Store) MountVolume(i VolumeId) error {
|
||||
for _, location := range s.Locations {
|
||||
if found := location.LoadVolume(i, s.NeedleMapType); found == true {
|
||||
s.updateMaster()
|
||||
s.NewVolumeIdChan <- VolumeId(i)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -276,7 +274,7 @@ func (s *Store) MountVolume(i VolumeId) error {
|
||||
func (s *Store) UnmountVolume(i VolumeId) error {
|
||||
for _, location := range s.Locations {
|
||||
if err := location.UnloadVolume(i); err == nil {
|
||||
s.updateMaster()
|
||||
s.DeletedVolumeIdChan <- VolumeId(i)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -287,7 +285,7 @@ func (s *Store) UnmountVolume(i VolumeId) error {
|
||||
func (s *Store) DeleteVolume(i VolumeId) error {
|
||||
for _, location := range s.Locations {
|
||||
if error := location.deleteVolumeById(i); error == nil {
|
||||
s.updateMaster()
|
||||
s.DeletedVolumeIdChan <- VolumeId(i)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user