mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
cleanupGracePeriod
This commit is contained in:
@@ -21,25 +21,27 @@ import (
|
||||
// EcVacuumTask represents an EC vacuum task that collects, decodes, and re-encodes EC volumes
|
||||
type EcVacuumTask struct {
|
||||
*base.BaseTask
|
||||
volumeID uint32
|
||||
collection string
|
||||
sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
|
||||
sourceGeneration uint32 // generation to vacuum from (G)
|
||||
targetGeneration uint32 // generation to create (G+1)
|
||||
tempDir string
|
||||
grpcDialOption grpc.DialOption
|
||||
masterAddress pb.ServerAddress // master server address for activation RPC
|
||||
volumeID uint32
|
||||
collection string
|
||||
sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
|
||||
sourceGeneration uint32 // generation to vacuum from (G)
|
||||
targetGeneration uint32 // generation to create (G+1)
|
||||
tempDir string
|
||||
grpcDialOption grpc.DialOption
|
||||
masterAddress pb.ServerAddress // master server address for activation RPC
|
||||
cleanupGracePeriod time.Duration // grace period before cleaning up old generation
|
||||
}
|
||||
|
||||
// NewEcVacuumTask creates a new EC vacuum task instance
|
||||
func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits, sourceGeneration uint32) *EcVacuumTask {
|
||||
return &EcVacuumTask{
|
||||
BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")),
|
||||
volumeID: volumeID,
|
||||
collection: collection,
|
||||
sourceNodes: sourceNodes,
|
||||
sourceGeneration: sourceGeneration, // generation to vacuum from (G)
|
||||
targetGeneration: sourceGeneration + 1, // generation to create (G+1)
|
||||
BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")),
|
||||
volumeID: volumeID,
|
||||
collection: collection,
|
||||
sourceNodes: sourceNodes,
|
||||
sourceGeneration: sourceGeneration, // generation to vacuum from (G)
|
||||
targetGeneration: sourceGeneration + 1, // generation to create (G+1)
|
||||
cleanupGracePeriod: 5 * time.Minute, // default 5 minute grace period
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +53,7 @@ func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams
|
||||
"source_generation": t.sourceGeneration,
|
||||
"target_generation": t.targetGeneration,
|
||||
"shard_nodes": len(t.sourceNodes),
|
||||
"cleanup_grace": t.cleanupGracePeriod,
|
||||
})
|
||||
|
||||
// Step 1: Create temporary working directory
|
||||
@@ -94,8 +97,11 @@ func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams
|
||||
}
|
||||
|
||||
t.LogInfo("EC vacuum task completed successfully", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"collection": t.collection,
|
||||
"volume_id": t.volumeID,
|
||||
"collection": t.collection,
|
||||
"source_generation": t.sourceGeneration,
|
||||
"target_generation": t.targetGeneration,
|
||||
"note": "Zero-downtime vacuum completed with generation transition",
|
||||
})
|
||||
|
||||
return nil
|
||||
@@ -334,38 +340,131 @@ func (t *EcVacuumTask) activateNewGeneration() error {
|
||||
})
|
||||
}
|
||||
|
||||
// cleanupOldEcShards removes the original volume after successful vacuum
|
||||
// cleanupOldEcShards removes the old generation EC shards after successful activation
|
||||
func (t *EcVacuumTask) cleanupOldEcShards() error {
|
||||
t.LogInfo("Cleaning up original volume", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
t.LogInfo("Starting cleanup of old generation EC shards", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"source_generation": t.sourceGeneration,
|
||||
"grace_period": t.cleanupGracePeriod,
|
||||
})
|
||||
|
||||
// Remove the original normal volume from the source node
|
||||
for targetNode := range t.sourceNodes {
|
||||
err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
// Ignore errors if volume doesn't exist
|
||||
if err != nil {
|
||||
t.LogInfo("Volume delete completed or volume not found", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"node": targetNode,
|
||||
"note": "This is normal if volume was already cleaned up",
|
||||
})
|
||||
}
|
||||
return nil
|
||||
// Step 1: Grace period - wait before cleanup
|
||||
if t.cleanupGracePeriod > 0 {
|
||||
t.LogInfo("Waiting grace period before cleanup", map[string]interface{}{
|
||||
"grace_period": t.cleanupGracePeriod,
|
||||
"reason": "ensuring activation stability",
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
break // Only need to delete from one node
|
||||
time.Sleep(t.cleanupGracePeriod)
|
||||
}
|
||||
|
||||
// Step 2: Safety check - verify new generation is actually active
|
||||
if err := t.verifyNewGenerationActive(); err != nil {
|
||||
t.LogWarning("Skipping cleanup due to safety check failure", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
"action": "manual cleanup may be needed",
|
||||
})
|
||||
return nil // Don't fail the task, but log the issue
|
||||
}
|
||||
|
||||
// Step 3: Unmount and delete old generation shards from each node
|
||||
var cleanupErrors []string
|
||||
for node, shardBits := range t.sourceNodes {
|
||||
if err := t.cleanupOldShardsFromNode(node, shardBits); err != nil {
|
||||
cleanupErrors = append(cleanupErrors, fmt.Sprintf("node %s: %v", node, err))
|
||||
t.LogWarning("Failed to cleanup shards from node", map[string]interface{}{
|
||||
"node": node,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Report cleanup results
|
||||
if len(cleanupErrors) > 0 {
|
||||
t.LogWarning("Cleanup completed with errors", map[string]interface{}{
|
||||
"errors": cleanupErrors,
|
||||
"note": "some old generation files may remain",
|
||||
})
|
||||
// Don't fail the task for cleanup errors - vacuum was successful
|
||||
return nil
|
||||
}
|
||||
|
||||
t.LogInfo("Successfully cleaned up old generation EC shards", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"source_generation": t.sourceGeneration,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyNewGenerationActive checks with master that the new generation is active
|
||||
func (t *EcVacuumTask) verifyNewGenerationActive() error {
|
||||
if t.masterAddress == "" {
|
||||
t.LogWarning("Cannot verify generation activation - master address not set", map[string]interface{}{
|
||||
"note": "skipping safety check",
|
||||
})
|
||||
return nil // Skip verification if we don't have master access
|
||||
}
|
||||
|
||||
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.LookupEcVolume(context.Background(), &master_pb.LookupEcVolumeRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to lookup EC volume from master: %w", err)
|
||||
}
|
||||
|
||||
if resp.ActiveGeneration != t.targetGeneration {
|
||||
return fmt.Errorf("safety check failed: master active generation is %d, expected %d",
|
||||
resp.ActiveGeneration, t.targetGeneration)
|
||||
}
|
||||
|
||||
t.LogInfo("Safety check passed - new generation is active", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"active_generation": resp.ActiveGeneration,
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// cleanupOldShardsFromNode unmounts and deletes old generation shards from a specific node
|
||||
func (t *EcVacuumTask) cleanupOldShardsFromNode(node pb.ServerAddress, shardBits erasure_coding.ShardBits) error {
|
||||
return operation.WithVolumeServerClient(false, node, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
shardIds := shardBits.ToUint32Slice()
|
||||
|
||||
t.LogInfo("Unmounting old generation shards", map[string]interface{}{
|
||||
"node": node,
|
||||
"volume_id": t.volumeID,
|
||||
"source_generation": t.sourceGeneration,
|
||||
"shard_ids": shardIds,
|
||||
})
|
||||
|
||||
// Step 1: Unmount old generation shards
|
||||
_, unmountErr := client.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
|
||||
VolumeId: t.volumeID,
|
||||
ShardIds: shardIds,
|
||||
Generation: t.sourceGeneration,
|
||||
})
|
||||
if unmountErr != nil {
|
||||
// Log but continue - files might already be unmounted
|
||||
t.LogInfo("Unmount completed or shards already unmounted", map[string]interface{}{
|
||||
"node": node,
|
||||
"error": unmountErr.Error(),
|
||||
"note": "this is normal if shards were already unmounted",
|
||||
})
|
||||
}
|
||||
|
||||
// Step 2: Delete old generation files
|
||||
// Note: The volume server should handle file deletion when unmounting,
|
||||
// but we could add explicit file deletion here if needed in the future
|
||||
|
||||
t.LogInfo("Successfully cleaned up old generation shards from node", map[string]interface{}{
|
||||
"node": node,
|
||||
"volume_id": t.volumeID,
|
||||
"source_generation": t.sourceGeneration,
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// cleanup removes temporary files and directories
|
||||
func (t *EcVacuumTask) cleanup() {
|
||||
if t.tempDir != "" {
|
||||
@@ -401,3 +500,8 @@ func (t *EcVacuumTask) SetGrpcDialOption(option grpc.DialOption) {
|
||||
func (t *EcVacuumTask) SetMasterAddress(address pb.ServerAddress) {
|
||||
t.masterAddress = address
|
||||
}
|
||||
|
||||
// SetCleanupGracePeriod sets the grace period before cleaning up old generation
|
||||
func (t *EcVacuumTask) SetCleanupGracePeriod(period time.Duration) {
|
||||
t.cleanupGracePeriod = period
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user