mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
VolumeEcShardsGenerate updated for generation-specific file creation
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -44,31 +45,49 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
}
|
||||
baseFileName := v.DataFileName()
|
||||
|
||||
if v.Collection != req.Collection {
|
||||
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
|
||||
}
|
||||
|
||||
// Generate output filenames with generation suffix
|
||||
generation := req.Generation
|
||||
// Extract base names by removing file extensions
|
||||
dataFileName := v.DataFileName() // e.g., "/data/collection_123.dat"
|
||||
indexFileName := v.IndexFileName() // e.g., "/index/collection_123.idx"
|
||||
|
||||
// Remove the .dat and .idx extensions to get base filenames
|
||||
dataBaseName := dataFileName[:len(dataFileName)-4] // removes ".dat"
|
||||
indexBaseName := indexFileName[:len(indexFileName)-4] // removes ".idx"
|
||||
|
||||
// Apply generation naming
|
||||
dataBaseFileName := erasure_coding.EcShardFileNameWithGeneration(v.Collection, filepath.Dir(dataBaseName), int(req.VolumeId), generation)
|
||||
indexBaseFileName := erasure_coding.EcShardFileNameWithGeneration(v.Collection, filepath.Dir(indexBaseName), int(req.VolumeId), generation)
|
||||
|
||||
glog.V(1).Infof("VolumeEcShardsGenerate: generating EC shards with generation %d: data=%s, index=%s",
|
||||
generation, dataBaseFileName, indexBaseFileName)
|
||||
|
||||
shouldCleanup := true
|
||||
defer func() {
|
||||
if !shouldCleanup {
|
||||
return
|
||||
}
|
||||
// Clean up generation-specific files on error
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
|
||||
os.Remove(fmt.Sprintf("%s.ec%02d", dataBaseFileName, i))
|
||||
}
|
||||
os.Remove(v.IndexFileName() + ".ecx")
|
||||
os.Remove(indexBaseFileName + ".ecx")
|
||||
os.Remove(dataBaseFileName + ".vif")
|
||||
}()
|
||||
|
||||
// write .ec00 ~ .ec13 files
|
||||
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
|
||||
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
|
||||
// write .ec00 ~ .ec13 files with generation-specific names
|
||||
if err := erasure_coding.WriteEcFiles(dataBaseFileName); err != nil {
|
||||
return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err)
|
||||
}
|
||||
|
||||
// write .ecx file
|
||||
if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
|
||||
return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
|
||||
// write .ecx file with generation-specific name
|
||||
if err := erasure_coding.WriteSortedFileFromIdxToTarget(v.IndexFileName(), indexBaseFileName+".ecx"); err != nil {
|
||||
return nil, fmt.Errorf("WriteSortedFileFromIdxToTarget %s: %v", indexBaseFileName, err)
|
||||
}
|
||||
|
||||
// write .vif files
|
||||
@@ -84,8 +103,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
|
||||
|
||||
datSize, _, _ := v.FileStat()
|
||||
volumeInfo.DatFileSize = int64(datSize)
|
||||
if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil {
|
||||
return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err)
|
||||
if err := volume_info.SaveVolumeInfo(dataBaseFileName+".vif", volumeInfo); err != nil {
|
||||
return nil, fmt.Errorf("SaveVolumeInfo %s: %v", dataBaseFileName, err)
|
||||
}
|
||||
|
||||
shouldCleanup = false
|
||||
|
||||
@@ -262,6 +262,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
|
||||
_, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
Collection: collection,
|
||||
Generation: 0, // shell commands operate on existing (generation 0) volumes
|
||||
})
|
||||
return genErr
|
||||
})
|
||||
|
||||
@@ -26,8 +26,14 @@ const (
|
||||
// WriteSortedFileFromIdx generates .ecx file from existing .idx file
|
||||
// all keys are sorted in ascending order
|
||||
func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
|
||||
return WriteSortedFileFromIdxToTarget(baseFileName, baseFileName+ext)
|
||||
}
|
||||
|
||||
nm, err := readNeedleMap(baseFileName)
|
||||
// WriteSortedFileFromIdxToTarget generates .ecx file from existing .idx file to specified target
|
||||
// all keys are sorted in ascending order
|
||||
func WriteSortedFileFromIdxToTarget(sourceBaseFileName string, targetFileName string) (e error) {
|
||||
|
||||
nm, err := readNeedleMap(sourceBaseFileName)
|
||||
if nm != nil {
|
||||
defer nm.Close()
|
||||
}
|
||||
@@ -35,7 +41,7 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
|
||||
return fmt.Errorf("readNeedleMap: %w", err)
|
||||
}
|
||||
|
||||
ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
ecxFile, err := os.OpenFile(targetFileName, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open ecx file: %w", err)
|
||||
}
|
||||
|
||||
@@ -205,6 +205,7 @@ func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error
|
||||
_, err := client.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
Generation: 1, // TODO: implement proper generation tracking in vacuum task
|
||||
})
|
||||
return err
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user