mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 15:17:25 +08:00
volume: add "-dir.idx" option for separate index storage
fix https://github.com/chrislusf/seaweedfs/issues/1265
This commit is contained in:
@@ -53,8 +53,8 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
|
||||
}
|
||||
|
||||
func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
|
||||
if strings.HasSuffix(filename, ".idx") {
|
||||
base := filename[:len(filename)-len(".idx")]
|
||||
if strings.HasSuffix(filename, ".idx") || strings.HasSuffix(filename, ".vif") {
|
||||
base := filename[:len(filename)-4]
|
||||
collection, volumeId, err := parseCollectionVolumeId(base)
|
||||
return volumeId, collection, err
|
||||
}
|
||||
@@ -76,10 +76,10 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
|
||||
if fileInfo.IsDir() {
|
||||
return false
|
||||
}
|
||||
if !strings.HasSuffix(basename, ".idx") {
|
||||
if !strings.HasSuffix(basename, ".idx") && !strings.HasSuffix(basename, ".vif") {
|
||||
return false
|
||||
}
|
||||
volumeName := basename[:len(basename)-len(".idx")]
|
||||
volumeName := basename[:len(basename)-4]
|
||||
|
||||
// check for incomplete volume
|
||||
noteFile := l.Directory + "/" + volumeName + ".note"
|
||||
@@ -108,7 +108,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
|
||||
}
|
||||
|
||||
// load the volume
|
||||
v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0)
|
||||
v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0)
|
||||
if e != nil {
|
||||
glog.V(0).Infof("new volume %s error %s", volumeName, e)
|
||||
return false
|
||||
|
@@ -68,7 +68,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
|
||||
defer l.ecVolumesLock.Unlock()
|
||||
ecVolume, found := l.ecVolumes[vid]
|
||||
if !found {
|
||||
ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid)
|
||||
ecVolume, err = erasure_coding.NewEcVolume(l.Directory, l.IdxDirectory, collection, vid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
|
||||
}
|
||||
@@ -122,6 +122,13 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
|
||||
if err != nil {
|
||||
return fmt.Errorf("load all ec shards in dir %s: %v", l.Directory, err)
|
||||
}
|
||||
if l.IdxDirectory != l.Directory {
|
||||
indexFileInfos, err := ioutil.ReadDir(l.IdxDirectory)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load all ec shards in dir %s: %v", l.IdxDirectory, err)
|
||||
}
|
||||
fileInfos = append(fileInfos, indexFileInfos...)
|
||||
}
|
||||
|
||||
sort.Slice(fileInfos, func(i, j int) bool {
|
||||
return fileInfos[i].Name() < fileInfos[j].Name()
|
||||
|
@@ -45,14 +45,14 @@ func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
|
||||
// FindDatFileSize calculate .dat file size from max offset entry
|
||||
// there may be extra deletions after that entry
|
||||
// but they are deletions anyway
|
||||
func FindDatFileSize(baseFileName string) (datSize int64, err error) {
|
||||
func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) {
|
||||
|
||||
version, err := readEcVolumeVersion(baseFileName)
|
||||
version, err := readEcVolumeVersion(dataBaseFileName)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
|
||||
return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
|
||||
}
|
||||
|
||||
err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
||||
err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
||||
|
||||
if size.IsDeleted() {
|
||||
return nil
|
||||
|
@@ -25,6 +25,7 @@ type EcVolume struct {
|
||||
VolumeId needle.VolumeId
|
||||
Collection string
|
||||
dir string
|
||||
dirIdx string
|
||||
ecxFile *os.File
|
||||
ecxFileSize int64
|
||||
ecxCreatedAt time.Time
|
||||
@@ -37,33 +38,34 @@ type EcVolume struct {
|
||||
ecjFileAccessLock sync.Mutex
|
||||
}
|
||||
|
||||
func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
|
||||
ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
|
||||
func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
|
||||
ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid}
|
||||
|
||||
baseFileName := EcShardFileName(collection, dir, int(vid))
|
||||
dataBaseFileName := EcShardFileName(collection, dir, int(vid))
|
||||
indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
|
||||
|
||||
// open ecx file
|
||||
if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
|
||||
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
|
||||
if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
|
||||
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
|
||||
}
|
||||
ecxFi, statErr := ev.ecxFile.Stat()
|
||||
if statErr != nil {
|
||||
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
|
||||
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
|
||||
}
|
||||
ev.ecxFileSize = ecxFi.Size()
|
||||
ev.ecxCreatedAt = ecxFi.ModTime()
|
||||
|
||||
// open ecj file
|
||||
if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
||||
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
|
||||
if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
||||
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
|
||||
}
|
||||
|
||||
// read volume info
|
||||
ev.Version = needle.Version3
|
||||
if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
|
||||
if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
|
||||
ev.Version = needle.Version(volumeInfo.Version)
|
||||
} else {
|
||||
pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
|
||||
pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
|
||||
}
|
||||
|
||||
ev.ShardLocations = make(map[ShardId][]string)
|
||||
@@ -134,15 +136,26 @@ func (ev *EcVolume) Destroy() {
|
||||
for _, s := range ev.Shards {
|
||||
s.Destroy()
|
||||
}
|
||||
os.Remove(ev.FileName() + ".ecx")
|
||||
os.Remove(ev.FileName() + ".ecj")
|
||||
os.Remove(ev.FileName() + ".vif")
|
||||
os.Remove(ev.FileName(".ecx"))
|
||||
os.Remove(ev.FileName(".ecj"))
|
||||
os.Remove(ev.FileName(".vif"))
|
||||
}
|
||||
|
||||
func (ev *EcVolume) FileName() string {
|
||||
func (ev *EcVolume) FileName(ext string) string {
|
||||
switch ext {
|
||||
case ".ecx", ".ecj":
|
||||
return ev.IndexBaseFileName() + ext
|
||||
}
|
||||
// .vif
|
||||
return ev.DataBaseFileName() + ext
|
||||
}
|
||||
|
||||
func (ev *EcVolume) DataBaseFileName() string {
|
||||
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
|
||||
}
|
||||
|
||||
func (ev *EcVolume) IndexBaseFileName() string {
|
||||
return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
|
||||
}
|
||||
|
||||
func (ev *EcVolume) ShardSize() uint64 {
|
||||
|
@@ -16,18 +16,18 @@ type SortedFileNeedleMap struct {
|
||||
dbFileSize int64
|
||||
}
|
||||
|
||||
func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
|
||||
m = &SortedFileNeedleMap{baseFileName: baseFileName}
|
||||
func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
|
||||
m = &SortedFileNeedleMap{baseFileName: indexBaseFileName}
|
||||
m.indexFile = indexFile
|
||||
fileName := baseFileName + ".sdx"
|
||||
fileName := indexBaseFileName + ".sdx"
|
||||
if !isSortedFileFresh(fileName, indexFile) {
|
||||
glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
|
||||
erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdx")
|
||||
erasure_coding.WriteSortedFileFromIdx(indexBaseFileName, ".sdx")
|
||||
glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
|
||||
}
|
||||
glog.V(1).Infof("Opening %s...", fileName)
|
||||
|
||||
if m.dbFile, err = os.Open(baseFileName + ".sdx"); err != nil {
|
||||
if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil {
|
||||
return
|
||||
}
|
||||
dbStat, _ := m.dbFile.Stat()
|
||||
|
@@ -121,7 +121,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
|
||||
if location := s.FindFreeLocation(); location != nil {
|
||||
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
|
||||
location.Directory, vid, collection, replicaPlacement, ttl)
|
||||
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
|
||||
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
|
||||
location.SetVolume(vid, volume)
|
||||
glog.V(0).Infof("add volume %d", vid)
|
||||
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
type Volume struct {
|
||||
Id needle.VolumeId
|
||||
dir string
|
||||
dirIdx string
|
||||
Collection string
|
||||
DataBackend backend.BackendStorageFile
|
||||
nm NeedleMapper
|
||||
@@ -47,9 +48,9 @@ type Volume struct {
|
||||
location *DiskLocation
|
||||
}
|
||||
|
||||
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
|
||||
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
|
||||
// if replicaPlacement is nil, the superblock will be loaded from disk
|
||||
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
|
||||
v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
|
||||
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
|
||||
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
|
||||
v.needleMapKind = needleMapKind
|
||||
@@ -61,7 +62,7 @@ func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapK
|
||||
func (v *Volume) String() string {
|
||||
v.noWriteLock.RLock()
|
||||
defer v.noWriteLock.RUnlock()
|
||||
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
|
||||
return fmt.Sprintf("Id:%v dir:%s dirIdx:%s Collection:%s dataFile:%v nm:%v noWrite:%v canDelete:%v", v.Id, v.dir, v.dirIdx, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
|
||||
}
|
||||
|
||||
func VolumeFileName(dir string, collection string, id int) (fileName string) {
|
||||
@@ -74,10 +75,23 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (v *Volume) FileName() (fileName string) {
|
||||
func (v *Volume) DataFileName() (fileName string) {
|
||||
return VolumeFileName(v.dir, v.Collection, int(v.Id))
|
||||
}
|
||||
|
||||
func (v *Volume) IndexFileName() (fileName string) {
|
||||
return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))
|
||||
}
|
||||
|
||||
func (v *Volume) FileName(ext string) (fileName string) {
|
||||
switch ext {
|
||||
case ".idx", ".cpx", ".ldb":
|
||||
return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))+ext
|
||||
}
|
||||
// .dat, .cpd, .vif
|
||||
return VolumeFileName(v.dir, v.Collection, int(v.Id))+ext
|
||||
}
|
||||
|
||||
func (v *Volume) Version() needle.Version {
|
||||
if v.volumeInfo.Version != 0 {
|
||||
v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
|
||||
|
@@ -124,9 +124,9 @@ func (v *Volume) findLastAppendAtNs() (uint64, error) {
|
||||
}
|
||||
|
||||
func (v *Volume) locateLastAppendEntry() (Offset, error) {
|
||||
indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
|
||||
indexFile, e := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
|
||||
if e != nil {
|
||||
return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
|
||||
return Offset{}, fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), e)
|
||||
}
|
||||
defer indexFile.Close()
|
||||
|
||||
@@ -156,9 +156,9 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
|
||||
|
||||
n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
|
||||
return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToAcutalOffset(), offset.ToAcutalOffset()+NeedleHeaderSize, err)
|
||||
}
|
||||
_, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
|
||||
_, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+NeedleHeaderSize, bodyLength)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
|
||||
}
|
||||
@@ -168,9 +168,9 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
|
||||
|
||||
// on server side
|
||||
func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
|
||||
indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
|
||||
indexFile, openErr := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
|
||||
if openErr != nil {
|
||||
err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
|
||||
err = fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), openErr)
|
||||
return
|
||||
}
|
||||
defer indexFile.Close()
|
||||
|
@@ -23,7 +23,6 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI
|
||||
}
|
||||
|
||||
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) {
|
||||
fileName := v.FileName()
|
||||
alreadyHasSuperBlock := false
|
||||
|
||||
hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0
|
||||
@@ -34,17 +33,17 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
|
||||
v.LoadRemoteFile()
|
||||
alreadyHasSuperBlock = true
|
||||
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists {
|
||||
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
|
||||
// open dat file
|
||||
if !canRead {
|
||||
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
|
||||
return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat"))
|
||||
}
|
||||
var dataFile *os.File
|
||||
if canWrite {
|
||||
dataFile, err = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
|
||||
dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644)
|
||||
} else {
|
||||
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
|
||||
dataFile, err = os.Open(fileName + ".dat")
|
||||
glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat"))
|
||||
dataFile, err = os.Open(v.FileName(".dat"))
|
||||
v.noWriteOrDelete = true
|
||||
}
|
||||
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
|
||||
@@ -54,17 +53,17 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
v.DataBackend = backend.NewDiskFile(dataFile)
|
||||
} else {
|
||||
if createDatIfMissing {
|
||||
v.DataBackend, err = backend.CreateVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
|
||||
v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb)
|
||||
} else {
|
||||
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
|
||||
return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat"))
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if !os.IsPermission(err) {
|
||||
return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, err)
|
||||
return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err)
|
||||
} else {
|
||||
return fmt.Errorf("load data file %s.dat: %v", fileName, err)
|
||||
return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,21 +71,27 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
err = v.readSuperBlock()
|
||||
} else {
|
||||
if !v.SuperBlock.Initialized() {
|
||||
return fmt.Errorf("volume %s.dat not initialized", fileName)
|
||||
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
|
||||
}
|
||||
err = v.maybeWriteSuperBlock()
|
||||
}
|
||||
if err == nil && alsoLoadIndex {
|
||||
// adjust for existing volumes with .idx together with .dat files
|
||||
if v.dirIdx != v.dir {
|
||||
if util.FileExists(v.DataFileName()+".idx") {
|
||||
v.dirIdx = v.dir
|
||||
}
|
||||
}
|
||||
var indexFile *os.File
|
||||
if v.noWriteOrDelete {
|
||||
glog.V(0).Infoln("open to read file", fileName+".idx")
|
||||
if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil {
|
||||
return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err)
|
||||
glog.V(0).Infoln("open to read file", v.FileName(".idx"))
|
||||
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
|
||||
return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
|
||||
}
|
||||
} else {
|
||||
glog.V(1).Infoln("open to write file", fileName+".idx")
|
||||
if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
||||
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
|
||||
glog.V(1).Infoln("open to write file", v.FileName(".idx"))
|
||||
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
||||
return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
|
||||
}
|
||||
}
|
||||
if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
|
||||
@@ -95,45 +100,45 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
}
|
||||
|
||||
if v.noWriteOrDelete || v.noWriteCanDelete {
|
||||
if v.nm, err = NewSortedFileNeedleMap(fileName, indexFile); err != nil {
|
||||
glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdx", err)
|
||||
if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil {
|
||||
glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
|
||||
}
|
||||
} else {
|
||||
switch needleMapKind {
|
||||
case NeedleMapInMemory:
|
||||
glog.V(0).Infoln("loading index", fileName+".idx", "to memory")
|
||||
glog.V(0).Infoln("loading index", v.FileName(".idx"), "to memory")
|
||||
if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil {
|
||||
glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", err)
|
||||
glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
|
||||
}
|
||||
case NeedleMapLevelDb:
|
||||
glog.V(0).Infoln("loading leveldb", fileName+".ldb")
|
||||
glog.V(0).Infoln("loading leveldb", v.FileName(".ldb"))
|
||||
opts := &opt.Options{
|
||||
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
|
||||
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
|
||||
CompactionTableSizeMultiplier: 10, // default value is 1
|
||||
}
|
||||
if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
||||
}
|
||||
case NeedleMapLevelDbMedium:
|
||||
glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
|
||||
glog.V(0).Infoln("loading leveldb medium", v.FileName(".ldb"))
|
||||
opts := &opt.Options{
|
||||
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
|
||||
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
|
||||
CompactionTableSizeMultiplier: 10, // default value is 1
|
||||
}
|
||||
if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
||||
}
|
||||
case NeedleMapLevelDbLarge:
|
||||
glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
|
||||
glog.V(0).Infoln("loading leveldb large", v.FileName(".ldb"))
|
||||
opts := &opt.Options{
|
||||
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
|
||||
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
|
||||
CompactionTableSizeMultiplier: 10, // default value is 1
|
||||
}
|
||||
if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -56,7 +56,8 @@ func (v *Volume) Destroy() (err error) {
|
||||
}
|
||||
}
|
||||
v.Close()
|
||||
removeVolumeFiles(v.FileName())
|
||||
removeVolumeFiles(v.DataFileName())
|
||||
removeVolumeFiles(v.IndexFileName())
|
||||
return
|
||||
}
|
||||
|
||||
|
@@ -14,7 +14,7 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
|
||||
|
||||
func (v *Volume) maybeLoadVolumeInfo() (found bool) {
|
||||
|
||||
v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif")
|
||||
v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName(".vif"))
|
||||
|
||||
if v.hasRemoteFile {
|
||||
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
|
||||
@@ -43,7 +43,7 @@ func (v *Volume) LoadRemoteFile() error {
|
||||
|
||||
func (v *Volume) SaveVolumeInfo() error {
|
||||
|
||||
tierFileName := v.FileName() + ".vif"
|
||||
tierFileName := v.FileName(".vif")
|
||||
|
||||
return pb.SaveVolumeInfo(tierFileName, v.volumeInfo)
|
||||
|
||||
|
@@ -49,7 +49,6 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
|
||||
v.isCompacting = false
|
||||
}()
|
||||
|
||||
filePath := v.FileName()
|
||||
v.lastCompactIndexOffset = v.IndexFileSize()
|
||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
||||
@@ -59,7 +58,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
glog.V(0).Infof("compact fail to sync volume idx %d", v.Id)
|
||||
}
|
||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
|
||||
return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
|
||||
}
|
||||
|
||||
// compact a volume based on deletions in .idx files
|
||||
@@ -75,7 +74,6 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro
|
||||
v.isCompacting = false
|
||||
}()
|
||||
|
||||
filePath := v.FileName()
|
||||
v.lastCompactIndexOffset = v.IndexFileSize()
|
||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
|
||||
@@ -85,7 +83,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
|
||||
}
|
||||
return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
|
||||
return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
|
||||
}
|
||||
|
||||
func (v *Volume) CommitCompact() error {
|
||||
@@ -113,40 +111,40 @@ func (v *Volume) CommitCompact() error {
|
||||
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
|
||||
|
||||
var e error
|
||||
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
|
||||
if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil {
|
||||
glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e)
|
||||
e = os.Remove(v.FileName() + ".cpd")
|
||||
e = os.Remove(v.FileName(".cpd"))
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
e = os.Remove(v.FileName() + ".cpx")
|
||||
e = os.Remove(v.FileName(".cpx"))
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
} else {
|
||||
if runtime.GOOS == "windows" {
|
||||
e = os.RemoveAll(v.FileName() + ".dat")
|
||||
e = os.RemoveAll(v.FileName(".dat"))
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
e = os.RemoveAll(v.FileName() + ".idx")
|
||||
e = os.RemoveAll(v.FileName(".idx"))
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
var e error
|
||||
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e)
|
||||
if e = os.Rename(v.FileName(".cpd"), v.FileName(".dat")); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName(".cpd"), e)
|
||||
}
|
||||
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e)
|
||||
if e = os.Rename(v.FileName(".cpx"), v.FileName(".idx")); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName(".cpx"), e)
|
||||
}
|
||||
}
|
||||
|
||||
//glog.V(3).Infof("Pretending to be vacuuming...")
|
||||
//time.Sleep(20 * time.Second)
|
||||
|
||||
os.RemoveAll(v.FileName() + ".ldb")
|
||||
os.RemoveAll(v.FileName(".ldb"))
|
||||
|
||||
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
|
||||
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
|
||||
@@ -158,8 +156,8 @@ func (v *Volume) CommitCompact() error {
|
||||
func (v *Volume) cleanupCompact() error {
|
||||
glog.V(0).Infof("Cleaning up volume %d vacuuming...", v.Id)
|
||||
|
||||
e1 := os.Remove(v.FileName() + ".cpd")
|
||||
e2 := os.Remove(v.FileName() + ".cpx")
|
||||
e1 := os.Remove(v.FileName(".cpd"))
|
||||
e2 := os.Remove(v.FileName(".cpx"))
|
||||
if e1 != nil {
|
||||
return e1
|
||||
}
|
||||
|
@@ -69,7 +69,7 @@ func TestCompaction(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir) // clean up
|
||||
|
||||
v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
@@ -96,7 +96,7 @@ func TestCompaction(t *testing.T) {
|
||||
|
||||
v.Close()
|
||||
|
||||
v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0)
|
||||
v, err = NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume reloading: %v", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user