load volume info from .vif file, use superblock as a backup

This commit is contained in:
Chris Lu
2019-12-28 12:28:58 -08:00
parent b7bc08cf52
commit c06f7eb48a
9 changed files with 292 additions and 251 deletions

View File

@@ -126,6 +126,7 @@ func (ev *EcVolume) Destroy() {
}
os.Remove(ev.FileName() + ".ecx")
os.Remove(ev.FileName() + ".ecj")
os.Remove(ev.FileName() + ".vif")
}
func (ev *EcVolume) FileName() string {

View File

@@ -70,6 +70,9 @@ func (v *Volume) FileName() (fileName string) {
}
func (v *Volume) Version() needle.Version {
if v.volumeInfo.Version != 0 {
v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
}
return v.SuperBlock.Version
}

View File

@@ -3,7 +3,6 @@ package storage
import (
"fmt"
"os"
"time"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -12,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
)
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) {
@@ -34,7 +34,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
v.LoadRemoteFile()
alreadyHasSuperBlock = true
} else if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists {
// open dat file
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
@@ -144,20 +144,3 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
return err
}
func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time, fileSize int64) {
exists = true
fi, err := os.Stat(filename)
if os.IsNotExist(err) {
exists = false
return
}
if fi.Mode()&0400 != 0 {
canRead = true
}
if fi.Mode()&0200 != 0 {
canWrite = true
}
modTime = fi.ModTime()
fileSize = fi.Size()
return
}

View File

@@ -184,8 +184,10 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
return fmt.Errorf("failed to load volume %d: %v", id, err)
}
if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
return fmt.Errorf("failed to process volume %d super block: %v", id, err)
if v.volumeInfo.Version == 0 {
if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
return fmt.Errorf("failed to process volume %d super block: %v", id, err)
}
}
defer v.Close()

View File

@@ -1,66 +1,29 @@
package storage
import (
"bytes"
"fmt"
"io/ioutil"
_ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
"github.com/golang/protobuf/jsonpb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
_ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
)
func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
return v.volumeInfo
}
func (v *Volume) maybeLoadVolumeInfo() bool {
func (v *Volume) maybeLoadVolumeInfo() {
v.volumeInfo = &volume_server_pb.VolumeInfo{}
var found bool
tierFileName := v.FileName() + ".vif"
v.volumeInfo, found = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif")
if exists, canRead, _, _, _ := checkFile(tierFileName); !exists || !canRead {
if !exists {
return false
}
if !canRead {
glog.Warningf("can not read %s", tierFileName)
}
return false
if found {
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key)
v.hasRemoteFile = true
}
glog.V(0).Infof("maybeLoadVolumeInfo loading volume %d check file", v.Id)
tierData, readErr := ioutil.ReadFile(tierFileName)
if readErr != nil {
glog.Warningf("fail to read %s : %v", tierFileName, readErr)
return false
}
glog.V(0).Infof("maybeLoadVolumeInfo loading volume %d ReadFile", v.Id)
if err := jsonpb.Unmarshal(bytes.NewReader(tierData), v.volumeInfo); err != nil {
glog.Warningf("unmarshal error: %v", err)
return false
}
glog.V(0).Infof("maybeLoadVolumeInfo loading volume %d Unmarshal tierInfo %v", v.Id, v.volumeInfo)
if len(v.volumeInfo.GetFiles()) == 0 {
return false
}
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key)
v.hasRemoteFile = true
return true
}
func (v *Volume) HasRemoteFile() bool {
@@ -83,24 +46,6 @@ func (v *Volume) SaveVolumeInfo() error {
tierFileName := v.FileName() + ".vif"
if exists, _, canWrite, _, _ := checkFile(tierFileName); exists && !canWrite {
return fmt.Errorf("%s not writable", tierFileName)
}
return pb.SaveVolumeInfo(tierFileName, v.volumeInfo)
m := jsonpb.Marshaler{
EmitDefaults: true,
Indent: " ",
}
text, marshalErr := m.MarshalToString(v.GetVolumeInfo())
if marshalErr != nil {
return fmt.Errorf("marshal volume %d tier info: %v", v.Id, marshalErr)
}
writeErr := ioutil.WriteFile(tierFileName, []byte(text), 0755)
if writeErr != nil {
return fmt.Errorf("fail to write %s : %v", tierFileName, writeErr)
}
return nil
}