shell: add volume.configure.replication to change replication for a volume

fix https://github.com/chrislusf/seaweedfs/issues/1192
This commit is contained in:
Chris Lu
2020-02-02 15:37:23 -08:00
parent fb19263a71
commit 40ae533fa3
10 changed files with 536 additions and 252 deletions

View File

@@ -1,13 +1,12 @@
package storage
import (
"fmt"
"io/ioutil"
"os"
"strings"
"sync"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -172,16 +171,10 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
}
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
for _, fileInfo := range fileInfos {
volId, _, err := l.volumeIdFromPath(fileInfo)
if vid == volId && err == nil {
l.loadExistingVolume(fileInfo, needleMapKind)
return true
}
}
if fileInfo, found := l.LocateVolume(vid); found {
l.loadExistingVolume(fileInfo, needleMapKind)
return true
}
return false
}
@@ -217,7 +210,7 @@ func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[need
}
}
for k, _ := range deltaVols {
for k := range deltaVols {
delete(l.volumes, k)
}
return deltaVols
@@ -260,3 +253,16 @@ func (l *DiskLocation) Close() {
return
}
func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) {
if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
for _, fileInfo := range fileInfos {
volId, _, err := l.volumeIdFromPath(fileInfo)
if vid == volId && err == nil {
return fileInfo, true
}
}
}
return nil, false
}

View File

@@ -60,7 +60,7 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
// read volume info
ev.Version = needle.Version3
if volumeInfo, found := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})

View File

@@ -2,11 +2,14 @@ package storage
import (
"fmt"
"path/filepath"
"strings"
"sync/atomic"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -346,6 +349,31 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
return fmt.Errorf("volume %d not found on disk", i)
}
func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
for _, location := range s.Locations {
fileInfo, found := location.LocateVolume(i)
if !found {
continue
}
// load, modify, save
baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
vifFile := filepath.Join(location.Directory, baseFileName + ".vif")
volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
if err != nil {
return fmt.Errorf("volume %d fail to load vif", i)
}
volumeInfo.Replication = replication
err = pb.SaveVolumeInfo(vifFile, volumeInfo)
if err != nil {
return fmt.Errorf("volume %d fail to save vif", i)
}
return nil
}
return fmt.Errorf("volume %d not found on disk", i)
}
func (s *Store) SetVolumeSizeLimit(x uint64) {
atomic.StoreUint64(&s.volumeSizeLimit, x)
}

View File

@@ -1,6 +1,7 @@
package storage
import (
"fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -36,5 +37,12 @@ func (v *Volume) maybeWriteSuperBlock() error {
func (v *Volume) readSuperBlock() (err error) {
v.SuperBlock, err = super_block.ReadSuperBlock(v.DataBackend)
if v.volumeInfo != nil && v.volumeInfo.Replication != ""{
if replication, err := super_block.NewReplicaPlacementFromString(v.volumeInfo.Replication); err != nil {
return fmt.Errorf("Error parse volume %d replication %s : %v", v.Id, v.volumeInfo.Replication, err)
} else {
v.SuperBlock.ReplicaPlacement = replication
}
}
return err
}

View File

@@ -14,7 +14,7 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
func (v *Volume) maybeLoadVolumeInfo() (found bool) {
v.volumeInfo, v.hasRemoteFile = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif")
v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif")
if v.hasRemoteFile {
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,