EcVolume creation properly refactored

This commit is contained in:
chrislu
2025-08-10 13:45:08 -07:00
parent ef5f9f629a
commit 8c31d5e331
5 changed files with 70 additions and 20 deletions

View File

@@ -303,7 +303,7 @@ func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_ser
glog.V(0).Infof("VolumeEcShardsMount: %v", req)
for _, shardId := range req.ShardIds {
err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId), req.Generation)
if err != nil {
glog.Errorf("ec shard mount %v: %v", req, err)

View File

@@ -72,9 +72,9 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S
return nil, false
}
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolume, error) {
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, generation uint32) (*erasure_coding.EcVolume, error) {
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId)
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId, generation)
if err != nil {
if err == os.ErrNotExist {
return nil, os.ErrNotExist
@@ -85,7 +85,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
if !found {
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid)
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid, generation)
if err != nil {
return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err)
}
@@ -116,7 +116,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding
return true
}
func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId) (err error) {
func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId, generation uint32) (err error) {
for _, shard := range shards {
shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64)
@@ -124,7 +124,7 @@ func (l *DiskLocation) loadEcShards(shards []string, collection string, vid need
return fmt.Errorf("failed to parse ec shard name %v: %w", shard, err)
}
_, err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId))
_, err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId), generation)
if err != nil {
return fmt.Errorf("failed to load ec shard %v: %w", shard, err)
}
@@ -183,8 +183,13 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
}
if ext == ".ecx" && volumeId == prevVolumeId {
if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil {
return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err)
// Parse generation from the first shard filename
generation := uint32(0)
if len(sameVolumeShards) > 0 {
generation = erasure_coding.ParseGenerationFromFileName(sameVolumeShards[0])
}
if err = l.loadEcShards(sameVolumeShards, collection, volumeId, generation); err != nil {
return fmt.Errorf("loadEcShards collection:%v volumeId:%d generation:%d : %v", collection, volumeId, generation, err)
}
prevVolumeId = volumeId
continue

View File

@@ -25,11 +25,11 @@ type EcVolumeShard struct {
DiskType types.DiskType
}
func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId, generation uint32) (v *EcVolumeShard, e error) {
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType}
baseFileName := v.FileName()
baseFileName := v.FileNameWithGeneration(generation)
// open ecd file
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
@@ -151,3 +151,24 @@ func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
return n, err
}
// ParseGenerationFromFileName extracts generation from EC volume filename
// Returns 0 for files without generation suffix (backward compatibility)
func ParseGenerationFromFileName(fileName string) uint32 {
// Remove extension first
baseName := fileName
if lastDot := strings.LastIndex(fileName, "."); lastDot >= 0 {
baseName = fileName[:lastDot]
}
// Look for _g{N} pattern at the end
if gIndex := strings.LastIndex(baseName, "_g"); gIndex >= 0 {
generationStr := baseName[gIndex+2:]
if generation, err := strconv.ParseUint(generationStr, 10, 32); err == nil {
return uint32(generation)
}
}
// No generation suffix found, return 0 for backward compatibility
return 0
}

View File

@@ -27,6 +27,7 @@ var (
type EcVolume struct {
VolumeId needle.VolumeId
Collection string
Generation uint32 // generation of this EC volume, defaults to 0 for backward compatibility
dir string
dirIdx string
ecxFile *os.File
@@ -44,11 +45,19 @@ type EcVolume struct {
ExpireAtSec uint64 //ec volume destroy time, calculated from the ec volume was created
}
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId, generation uint32) (ev *EcVolume, err error) {
ev = &EcVolume{
dir: dir,
dirIdx: dirIdx,
Collection: collection,
VolumeId: vid,
Generation: generation,
diskType: diskType,
}
dataBaseFileName := EcShardFileName(collection, dir, int(vid))
indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
// Use generation-aware filenames
dataBaseFileName := EcShardFileNameWithGeneration(collection, dir, int(vid), generation)
indexBaseFileName := EcShardFileNameWithGeneration(collection, dirIdx, int(vid), generation)
// open ecx file
if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
@@ -74,7 +83,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
ev.datFileSize = volumeInfo.DatFileSize
ev.ExpireAtSec = volumeInfo.ExpireAtSec
} else {
glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName)
glog.V(1).Infof("vif file not found for volume %d generation %d, creating new one: %s", vid, generation, dataBaseFileName)
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
}
@@ -155,7 +164,7 @@ func (ev *EcVolume) Destroy() {
}
func (ev *EcVolume) FileName(ext string) string {
return ev.FileNameWithGeneration(ext, 0)
return ev.FileNameWithGeneration(ext, ev.Generation)
}
func (ev *EcVolume) FileNameWithGeneration(ext string, generation uint32) string {
@@ -168,7 +177,7 @@ func (ev *EcVolume) FileNameWithGeneration(ext string, generation uint32) string
}
func (ev *EcVolume) DataBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
return EcShardFileNameWithGeneration(ev.Collection, ev.dir, int(ev.VolumeId), ev.Generation)
}
func (ev *EcVolume) DataBaseFileNameWithGeneration(generation uint32) string {
@@ -176,7 +185,7 @@ func (ev *EcVolume) DataBaseFileNameWithGeneration(generation uint32) string {
}
func (ev *EcVolume) IndexBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
return EcShardFileNameWithGeneration(ev.Collection, ev.dirIdx, int(ev.VolumeId), ev.Generation)
}
func (ev *EcVolume) IndexBaseFileNameWithGeneration(generation uint32) string {
@@ -190,6 +199,20 @@ func (ev *EcVolume) ShardSize() uint64 {
return 0
}
// String returns a string representation of the EC volume including generation
func (ev *EcVolume) String() string {
return fmt.Sprintf("EcVolume{Id:%d, Collection:%s, Generation:%d, Shards:%d}",
ev.VolumeId, ev.Collection, ev.Generation, len(ev.Shards))
}
// Key returns a unique key for this EC volume including generation
func (ev *EcVolume) Key() string {
if ev.Collection == "" {
return fmt.Sprintf("%d_g%d", ev.VolumeId, ev.Generation)
}
return fmt.Sprintf("%s_%d_g%d", ev.Collection, ev.VolumeId, ev.Generation)
}
func (ev *EcVolume) Size() (size uint64) {
for _, shard := range ev.Shards {
if shardSize := shard.Size(); shardSize > 0 {
@@ -239,6 +262,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [
DiskType: string(ev.diskType),
ExpireAtSec: ev.ExpireAtSec,
DiskId: diskId,
Generation: ev.Generation, // include generation in heartbeat message
}
messages = append(messages, m)
}

View File

@@ -48,9 +48,9 @@ func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
}
func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, generation uint32) error {
for diskId, location := range s.Locations {
if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil {
if ecVolume, err := location.LoadEcShard(collection, vid, shardId, generation); err == nil {
glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId)
var shardBits erasure_coding.ShardBits