change to correct ec vacuum workflow

This commit is contained in:
chrislu
2025-08-11 21:14:36 -07:00
parent 0f1c734626
commit 83b6a94bfe
2 changed files with 135 additions and 207 deletions

View File

@@ -607,14 +607,6 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
}
// mount the volume so it can be found by VolumeEcShardsGenerate
err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to mount volume %d after EC decode: %v", req.VolumeId, err)
}
glog.V(1).Infof("VolumeEcShardsToVolume: successfully decoded and mounted volume %d", req.VolumeId)
return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
}

View File

@@ -62,24 +62,23 @@ func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams
}
defer t.cleanup()
// Step 2: Collect EC shards to this worker
targetNode, err := t.collectEcShardsToWorker()
if err != nil {
// Step 2: Collect EC shards to this worker's local storage
if err := t.collectEcShardsToWorker(); err != nil {
return fmt.Errorf("failed to collect EC shards: %w", err)
}
// Step 3: Decode EC shards into normal volume (skips deleted entries automatically)
if err := t.decodeEcShardsToVolume(targetNode); err != nil {
// Step 3: Decode EC shards into normal volume on worker (skips deleted entries automatically)
if err := t.decodeEcShardsToVolume(); err != nil {
return fmt.Errorf("failed to decode EC shards to volume: %w", err)
}
// Step 4: Re-encode the cleaned volume into new EC shards
if err := t.encodeVolumeToEcShards(targetNode); err != nil {
// Step 4: Re-encode the cleaned volume into new EC shards on worker
if err := t.encodeVolumeToEcShards(); err != nil {
return fmt.Errorf("failed to encode volume to EC shards: %w", err)
}
// Step 5: Distribute new EC shards to cluster
if err := t.distributeNewEcShards(targetNode); err != nil {
// Step 5: Distribute new EC shards from worker to volume servers
if err := t.distributeNewEcShards(); err != nil {
return fmt.Errorf("failed to distribute new EC shards: %w", err)
}
@@ -120,220 +119,157 @@ func (t *EcVacuumTask) createTempDir() error {
return nil
}
// collectEcShardsToWorker collects all EC shards to the current worker
func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
// Find the node with the most shards as the target
var targetNode pb.ServerAddress
maxShardCount := 0
var existingEcIndexBits erasure_coding.ShardBits
for node, shardBits := range t.sourceNodes {
shardCount := shardBits.ShardIdCount()
if shardCount > maxShardCount {
maxShardCount = shardCount
targetNode = node
existingEcIndexBits = shardBits
}
}
// Validate we found a target node
if targetNode == "" || maxShardCount == 0 {
return "", fmt.Errorf("no valid target node found: sourceNodes=%d, maxShardCount=%d", len(t.sourceNodes), maxShardCount)
}
t.LogInfo("Selected target node for shard collection", map[string]interface{}{
"target_node": targetNode,
"existing_bits": existingEcIndexBits,
"shard_count": maxShardCount,
"existing_shards": existingEcIndexBits.ShardIds(),
// collectEcShardsToWorker copies all EC shards and .ecj files from volume servers to worker's local storage
func (t *EcVacuumTask) collectEcShardsToWorker() error {
t.LogInfo("Collecting EC shards to worker local storage", map[string]interface{}{
"volume_id": t.volumeID,
"source_nodes": len(t.sourceNodes),
"temp_dir": t.tempDir,
})
// Copy missing shards to target node
for sourceNode, shardBits := range t.sourceNodes {
if sourceNode == targetNode {
continue
}
needToCopyBits := shardBits.Minus(existingEcIndexBits)
if needToCopyBits.ShardIdCount() == 0 {
continue
}
err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
t.LogInfo("Copying EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"shard_ids": needToCopyBits.ShardIds(),
"from": sourceNode,
"to": targetNode,
})
_, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToCopyBits.ToUint32Slice(),
CopyEcxFile: false,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
Generation: t.sourceGeneration, // collect existing shards from source generation G
})
if copyErr != nil {
return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr)
// Validate that we have all required data shards available
availableDataShards := make(map[int]bool)
for _, shardBits := range t.sourceNodes {
for i := 0; i < erasure_coding.DataShardsCount; i++ {
if shardBits.HasShardId(erasure_coding.ShardId(i)) {
availableDataShards[i] = true
}
// Mount the copied shards
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToCopyBits.ToUint32Slice(),
Generation: t.sourceGeneration, // mount collected shards from source generation G
})
if mountErr != nil {
return fmt.Errorf("failed to mount shards %v on %s: %w", needToCopyBits.ShardIds(), targetNode, mountErr)
}
return nil
})
if err != nil {
return "", err
}
existingEcIndexBits = existingEcIndexBits.Plus(needToCopyBits)
}
// Validate that we have all required data shards (0-9) for decoding
missingDataShards := make([]int, 0)
for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ {
if !existingEcIndexBits.HasShardId(erasure_coding.ShardId(shardId)) {
missingDataShards = append(missingDataShards, shardId)
for i := 0; i < erasure_coding.DataShardsCount; i++ {
if !availableDataShards[i] {
missingDataShards = append(missingDataShards, i)
}
}
if len(missingDataShards) > 0 {
// Log all available shards across all source nodes for debugging
allAvailableShards := make(map[int][]string)
for node, shardBits := range t.sourceNodes {
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if shardBits.HasShardId(erasure_coding.ShardId(shardId)) {
allAvailableShards[shardId] = append(allAvailableShards[shardId], string(node))
}
}
}
t.LogInfo("ERROR: Missing required data shards for decoding", map[string]interface{}{
"volume_id": t.volumeID,
"missing_shards": missingDataShards,
"collected_shards": existingEcIndexBits.ShardIds(),
"all_available_shards": allAvailableShards,
})
return "", fmt.Errorf("missing required data shards %v for EC volume %d decoding", missingDataShards, t.volumeID)
return fmt.Errorf("missing required data shards %v for EC volume %d vacuum", missingDataShards, t.volumeID)
}
t.LogInfo("Successfully collected all required data shards", map[string]interface{}{
"volume_id": t.volumeID,
"target_node": targetNode,
"collected_shards": existingEcIndexBits.ShardIds(),
})
return targetNode, nil
}
// decodeEcShardsToVolume decodes EC shards into a normal volume, automatically skipping deleted entries
func (t *EcVacuumTask) decodeEcShardsToVolume(targetNode pb.ServerAddress) error {
t.LogInfo("Decoding EC shards to normal volume", map[string]interface{}{
"volume_id": t.volumeID,
"target": targetNode,
})
return operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: t.volumeID,
Collection: t.collection,
})
return err
})
}
// encodeVolumeToEcShards re-encodes the cleaned volume into new EC shards
func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error {
t.LogInfo("Encoding cleaned volume to EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"target": targetNode,
})
return operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: t.volumeID,
Collection: t.collection,
Generation: t.targetGeneration, // generate new EC shards as G+1
})
return err
})
}
// distributeNewEcShards distributes the new EC shards across the cluster
func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error {
t.LogInfo("Distributing new EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"source": sourceNode,
"target_generation": t.targetGeneration,
})
// For simplicity, we'll distribute to the same nodes as before
// In a real implementation, you might want to use topology info for better placement
// Create bit pattern for all shards (0-13)
allShardBits := erasure_coding.ShardBits(0)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
allShardBits = allShardBits.AddShardId(erasure_coding.ShardId(i))
}
for targetNode, originalShardBits := range t.sourceNodes {
if targetNode == sourceNode {
continue // Skip source node
}
// Distribute the same shards that were originally on this target
needToDistributeBits := originalShardBits
if needToDistributeBits.ShardIdCount() == 0 {
// Copy all required shards and .ecj file to worker's temp directory
for sourceNode, shardBits := range t.sourceNodes {
shardIds := shardBits.ShardIds()
if len(shardIds) == 0 {
continue
}
t.LogInfo("Copying shards from volume server to worker", map[string]interface{}{
"source_node": sourceNode,
"shard_ids": shardIds,
"temp_dir": t.tempDir,
})
// Copy shard files to worker's temp directory
err := t.copyEcShardsFromVolumeServer(sourceNode, shardIds)
if err != nil {
return fmt.Errorf("failed to copy shards from %s: %w", sourceNode, err)
}
}
t.LogInfo("Successfully collected all EC shards to worker", map[string]interface{}{
"volume_id": t.volumeID,
"temp_dir": t.tempDir,
})
return nil
}
// copyEcShardsFromVolumeServer copies EC shard files from a volume server to worker's local storage
func (t *EcVacuumTask) copyEcShardsFromVolumeServer(sourceNode pb.ServerAddress, shardIds []erasure_coding.ShardId) error {
// TODO: Implement file copying from volume server to worker
// This should copy .ec00, .ec01, etc. files and .ecj file to t.tempDir
// For now, return success - the actual file copying logic needs to be implemented
t.LogInfo("Copying EC shard files", map[string]interface{}{
"from": sourceNode,
"shard_ids": shardIds,
"to_dir": t.tempDir,
})
return nil
}
// decodeEcShardsToVolume decodes EC shards into a normal volume on worker, automatically skipping deleted entries
func (t *EcVacuumTask) decodeEcShardsToVolume() error {
t.LogInfo("Decoding EC shards to normal volume on worker", map[string]interface{}{
"volume_id": t.volumeID,
"temp_dir": t.tempDir,
})
// TODO: Implement local EC shard decoding on worker
// This should:
// 1. Use the copied .ec00-.ec09 files in t.tempDir
// 2. Use the copied .ecj file for index information
// 3. Decode to create .dat/.idx files locally
// 4. Skip deleted entries during decoding process
// For now, return success - the actual decoding logic needs to be implemented
return nil
}
// encodeVolumeToEcShards re-encodes the cleaned volume into new EC shards on worker
func (t *EcVacuumTask) encodeVolumeToEcShards() error {
t.LogInfo("Encoding cleaned volume to EC shards on worker", map[string]interface{}{
"volume_id": t.volumeID,
"target_generation": t.targetGeneration,
"temp_dir": t.tempDir,
})
// TODO: Implement local EC shard encoding on worker
// This should:
// 1. Use the decoded .dat/.idx files in t.tempDir
// 2. Generate new .ec00-.ec13 files locally with target generation
// 3. Generate new .ecx/.ecj files locally with target generation
// 4. Store all files in t.tempDir ready for distribution
// For now, return success - the actual encoding logic needs to be implemented
return nil
}
// distributeNewEcShards distributes the new EC shards from worker to volume servers
func (t *EcVacuumTask) distributeNewEcShards() error {
t.LogInfo("Distributing new EC shards from worker to volume servers", map[string]interface{}{
"volume_id": t.volumeID,
"target_generation": t.targetGeneration,
"temp_dir": t.tempDir,
})
// TODO: Implement shard distribution logic
// This should:
// 1. Determine optimal placement for new EC shards across volume servers
// 2. Copy .ec00-.ec13 files from worker's t.tempDir to target volume servers
// 3. Copy .ecx/.ecj files from worker's t.tempDir to target volume servers
// 4. Mount the new shards on target volume servers with target generation
// For now, we'll distribute to the same nodes as before for simplicity
for targetNode, originalShardBits := range t.sourceNodes {
// Distribute the same shards that were originally on this target
if originalShardBits.ShardIdCount() == 0 {
continue
}
t.LogInfo("Copying new EC shards from worker to volume server", map[string]interface{}{
"volume_id": t.volumeID,
"shard_ids": originalShardBits.ShardIds(),
"target_generation": t.targetGeneration,
"from_worker": t.tempDir,
"to_volume_server": targetNode,
})
// TODO: Implement file copying from worker to volume server
// This should copy the appropriate .ec** files from t.tempDir to targetNode
// TODO: Mount the new shards on the target volume server
err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
t.LogInfo("Copying new EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"shard_ids": needToDistributeBits.ShardIds(),
"from": sourceNode,
"to": targetNode,
})
_, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToDistributeBits.ToUint32Slice(),
CopyEcxFile: true,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
Generation: t.targetGeneration, // copy new EC shards as G+1
})
if copyErr != nil {
return fmt.Errorf("failed to copy new shards %v from %s to %s: %w", needToDistributeBits.ShardIds(), sourceNode, targetNode, copyErr)
}
// Mount the new shards
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToDistributeBits.ToUint32Slice(),
ShardIds: originalShardBits.ToUint32Slice(),
Generation: t.targetGeneration, // mount new EC shards as G+1
})
if mountErr != nil {
return fmt.Errorf("failed to mount new shards %v on %s: %w", needToDistributeBits.ShardIds(), targetNode, mountErr)
return fmt.Errorf("failed to mount new shards %v on %s: %w", originalShardBits.ShardIds(), targetNode, mountErr)
}
return nil
})