diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 553644f5f..079bac676 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -237,13 +237,29 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku resp := &master_pb.LookupEcVolumeResponse{} - ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId)) + // Determine which generation to lookup + targetGeneration := req.Generation + if targetGeneration == 0 { + // If no specific generation requested, use the active generation + if activeGen, found := ms.Topo.GetEcActiveGeneration(needle.VolumeId(req.VolumeId)); found { + targetGeneration = activeGen + } + // If no active generation found, fall back to 0 for backward compatibility + } + + ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId), targetGeneration) if !found { return resp, fmt.Errorf("ec volume %d not found", req.VolumeId) } resp.VolumeId = req.VolumeId + // Set the active generation from the master's tracking + if activeGen, found := ms.Topo.GetEcActiveGeneration(needle.VolumeId(req.VolumeId)); found { + resp.ActiveGeneration = activeGen + } else { + resp.ActiveGeneration = ecLocations.Generation // fallback to the generation we found + } for shardId, shardLocations := range ecLocations.Locations { var locations []*master_pb.Location @@ -255,8 +271,9 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku }) } resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{ - ShardId: uint32(shardId), - Locations: locations, + ShardId: uint32(shardId), + Locations: locations, + Generation: ecLocations.Generation, // set generation for each shard location }) } diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 53b352168..bafadaf45 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -16,6 +16,7 @@ type EcVolumeInfo struct { DiskId uint32 // ID of the disk this EC volume is on ExpireAtSec uint64 // ec volume destroy time, calculated from the ec volume was created ShardSizes []int64 // optimized: sizes for shards in order of set bits in ShardBits + Generation uint32 // generation of this EC volume, defaults to 0 for backward compatibility } func (ecInfo *EcVolumeInfo) AddShardId(id ShardId) { @@ -80,6 +81,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { DiskType: ecInfo.DiskType, DiskId: ecInfo.DiskId, ExpireAtSec: ecInfo.ExpireAtSec, + Generation: ecInfo.Generation, } // Initialize optimized ShardSizes for the result @@ -107,6 +109,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb. DiskType: ecInfo.DiskType, ExpireAtSec: ecInfo.ExpireAtSec, DiskId: ecInfo.DiskId, + Generation: ecInfo.Generation, } // Directly set the optimized ShardSizes diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 750c00ea2..419520752 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -32,9 +32,13 @@ type Topology struct { NodeImpl collectionMap *util.ConcurrentReadMap - ecShardMap map[needle.VolumeId]*EcShardLocations + ecShardMap map[EcVolumeGenerationKey]*EcShardLocations ecShardMapLock sync.RWMutex + // Track active generation for each EC volume + ecActiveGenerationMap map[needle.VolumeId]uint32 + ecActiveGenerationMapLock sync.RWMutex + pulse int64 volumeSizeLimit uint64 @@ -68,7 +72,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.diskUsages = newDiskUsages() t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() - t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) + t.ecShardMap = make(map[EcVolumeGenerationKey]*EcShardLocations) + t.ecActiveGenerationMap = make(map[needle.VolumeId]uint32) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit t.replicationAsMin = replicationAsMin @@ -211,7 +216,12 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* } } - if locations, found := t.LookupEcShards(vid); found { + // Use active generation for EC shard lookup, fallback to 0 for backward compatibility + activeGeneration := uint32(0) + if activeGen, found := t.GetEcActiveGeneration(vid); found { + activeGeneration = activeGen + } + if locations, found := t.LookupEcShards(vid, activeGeneration); found { for _, loc := range locations.Locations { dataNodes = append(dataNodes, loc...) } diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 844e92f55..faca9a5b4 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -1,6 +1,8 @@ package topology import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -8,8 +10,19 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) +// EcVolumeGenerationKey represents a unique key for EC volume with generation +type EcVolumeGenerationKey struct { + VolumeId needle.VolumeId + Generation uint32 +} + +func (k EcVolumeGenerationKey) String() string { + return fmt.Sprintf("v%d-g%d", k.VolumeId, k.Generation) +} + type EcShardLocations struct { Collection string + Generation uint32 // generation of this set of shard locations Locations [erasure_coding.TotalShardsCount][]*DataNode } @@ -26,6 +39,7 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, ShardSizes: shardInfo.ShardSizes, + Generation: shardInfo.Generation, // extract generation from heartbeat } shards = append(shards, ecVolumeInfo) @@ -54,6 +68,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, ShardSizes: shardInfo.ShardSizes, + Generation: shardInfo.Generation, // extract generation from incremental heartbeat } newShards = append(newShards, ecVolumeInfo) @@ -68,6 +83,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, ShardSizes: shardInfo.ShardSizes, + Generation: shardInfo.Generation, // extract generation from incremental heartbeat } deletedShards = append(deletedShards, ecVolumeInfo) @@ -83,9 +99,10 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards } } -func NewEcShardLocations(collection string) *EcShardLocations { +func NewEcShardLocations(collection string, generation uint32) *EcShardLocations { return &EcShardLocations{ Collection: collection, + Generation: generation, } } @@ -120,14 +137,27 @@ func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, d t.ecShardMapLock.Lock() defer t.ecShardMapLock.Unlock() - locations, found := t.ecShardMap[ecShardInfos.VolumeId] + key := EcVolumeGenerationKey{ + VolumeId: ecShardInfos.VolumeId, + Generation: ecShardInfos.Generation, + } + locations, found := t.ecShardMap[key] if !found { - locations = NewEcShardLocations(ecShardInfos.Collection) - t.ecShardMap[ecShardInfos.VolumeId] = locations + locations = NewEcShardLocations(ecShardInfos.Collection, ecShardInfos.Generation) + t.ecShardMap[key] = locations } for _, shardId := range ecShardInfos.ShardIds() { locations.AddShard(shardId, dn) } + + // Update active generation if this is newer or first time seeing this volume + t.ecActiveGenerationMapLock.Lock() + currentActive, exists := t.ecActiveGenerationMap[ecShardInfos.VolumeId] + if !exists || ecShardInfos.Generation >= currentActive { + t.ecActiveGenerationMap[ecShardInfos.VolumeId] = ecShardInfos.Generation + glog.V(2).Infof("Updated active generation for EC volume %d to %d", ecShardInfos.VolumeId, ecShardInfos.Generation) + } + t.ecActiveGenerationMapLock.Unlock() } func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { @@ -135,20 +165,66 @@ func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, t.ecShardMapLock.Lock() defer t.ecShardMapLock.Unlock() - locations, found := t.ecShardMap[ecShardInfos.VolumeId] + key := EcVolumeGenerationKey{ + VolumeId: ecShardInfos.VolumeId, + Generation: ecShardInfos.Generation, + } + locations, found := t.ecShardMap[key] if !found { return } for _, shardId := range ecShardInfos.ShardIds() { locations.DeleteShard(shardId, dn) } + + // Check if this generation is now empty and clean up if needed + isEmpty := true + for _, shardLocations := range locations.Locations { + if len(shardLocations) > 0 { + isEmpty = false + break + } + } + + if isEmpty { + // Remove empty generation from map + delete(t.ecShardMap, key) + glog.V(2).Infof("Removed empty EC volume generation %d:%d", ecShardInfos.VolumeId, ecShardInfos.Generation) + + // Check if this was the active generation and update if needed + t.ecActiveGenerationMapLock.Lock() + if activeGen, exists := t.ecActiveGenerationMap[ecShardInfos.VolumeId]; exists && activeGen == ecShardInfos.Generation { + // Find the highest remaining generation for this volume + maxGeneration := uint32(0) + hasRemaining := false + for otherKey := range t.ecShardMap { + if otherKey.VolumeId == ecShardInfos.VolumeId && otherKey.Generation > maxGeneration { + maxGeneration = otherKey.Generation + hasRemaining = true + } + } + + if hasRemaining { + t.ecActiveGenerationMap[ecShardInfos.VolumeId] = maxGeneration + glog.V(1).Infof("Updated active generation for EC volume %d to %d after cleanup", ecShardInfos.VolumeId, maxGeneration) + } else { + delete(t.ecActiveGenerationMap, ecShardInfos.VolumeId) + glog.V(1).Infof("Removed active generation tracking for EC volume %d (no generations remain)", ecShardInfos.VolumeId) + } + } + t.ecActiveGenerationMapLock.Unlock() + } } -func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) { +func (t *Topology) LookupEcShards(vid needle.VolumeId, generation uint32) (locations *EcShardLocations, found bool) { t.ecShardMapLock.RLock() defer t.ecShardMapLock.RUnlock() - locations, found = t.ecShardMap[vid] + key := EcVolumeGenerationKey{ + VolumeId: vid, + Generation: generation, + } + locations, found = t.ecShardMap[key] return } @@ -179,14 +255,53 @@ func (t *Topology) DeleteEcCollection(collection string) { t.ecShardMapLock.Lock() defer t.ecShardMapLock.Unlock() - var vids []needle.VolumeId - for vid, ecVolumeLocation := range t.ecShardMap { + var keysToDelete []EcVolumeGenerationKey + var volumeIdsToDelete []needle.VolumeId + for key, ecVolumeLocation := range t.ecShardMap { if ecVolumeLocation.Collection == collection { - vids = append(vids, vid) + keysToDelete = append(keysToDelete, key) + volumeIdsToDelete = append(volumeIdsToDelete, key.VolumeId) } } - for _, vid := range vids { - delete(t.ecShardMap, vid) + for _, key := range keysToDelete { + delete(t.ecShardMap, key) } + + // Also clean up active generation tracking + t.ecActiveGenerationMapLock.Lock() + for _, vid := range volumeIdsToDelete { + delete(t.ecActiveGenerationMap, vid) + } + t.ecActiveGenerationMapLock.Unlock() +} + +// GetEcActiveGeneration returns the current active generation for an EC volume +func (t *Topology) GetEcActiveGeneration(vid needle.VolumeId) (uint32, bool) { + t.ecActiveGenerationMapLock.RLock() + defer t.ecActiveGenerationMapLock.RUnlock() + + generation, found := t.ecActiveGenerationMap[vid] + return generation, found +} + +// SetEcActiveGeneration sets the active generation for an EC volume +func (t *Topology) SetEcActiveGeneration(vid needle.VolumeId, generation uint32) { + t.ecActiveGenerationMapLock.Lock() + defer t.ecActiveGenerationMapLock.Unlock() + + t.ecActiveGenerationMap[vid] = generation + glog.V(1).Infof("Set active generation for EC volume %d to %d", vid, generation) +} + +// ListEcVolumesWithActiveGeneration returns all EC volumes and their active generations +func (t *Topology) ListEcVolumesWithActiveGeneration() map[needle.VolumeId]uint32 { + t.ecActiveGenerationMapLock.RLock() + defer t.ecActiveGenerationMapLock.RUnlock() + + result := make(map[needle.VolumeId]uint32) + for vid, generation := range t.ecActiveGenerationMap { + result[vid] = generation + } + return result }