copy file

This commit is contained in:
chrislu
2025-08-10 14:24:30 -07:00
parent 9e2e600b6d
commit 56d1ed77c1
5 changed files with 52 additions and 22 deletions

View File

@@ -283,6 +283,7 @@ message CopyFileRequest {
string collection = 5;
bool is_ec_volume = 6;
bool ignore_source_file_not_found = 7;
uint32 generation = 8; // generation of files to copy, defaults to 0 for backward compatibility
}
message CopyFileResponse {
bytes file_content = 1;

View File

@@ -1831,6 +1831,7 @@ type CopyFileRequest struct {
Collection string `protobuf:"bytes,5,opt,name=collection,proto3" json:"collection,omitempty"`
IsEcVolume bool `protobuf:"varint,6,opt,name=is_ec_volume,json=isEcVolume,proto3" json:"is_ec_volume,omitempty"`
IgnoreSourceFileNotFound bool `protobuf:"varint,7,opt,name=ignore_source_file_not_found,json=ignoreSourceFileNotFound,proto3" json:"ignore_source_file_not_found,omitempty"`
Generation uint32 `protobuf:"varint,8,opt,name=generation,proto3" json:"generation,omitempty"` // generation of files to copy, defaults to 0 for backward compatibility
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1914,6 +1915,13 @@ func (x *CopyFileRequest) GetIgnoreSourceFileNotFound() bool {
return false
}
func (x *CopyFileRequest) GetGeneration() uint32 {
if x != nil {
return x.Generation
}
return 0
}
type CopyFileResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
FileContent []byte `protobuf:"bytes,1,opt,name=file_content,json=fileContent,proto3" json:"file_content,omitempty"`
@@ -6265,7 +6273,7 @@ const file_volume_server_proto_rawDesc = "" +
"\x12io_byte_per_second\x18\a \x01(\x03R\x0fioBytePerSecond\"h\n" +
"\x12VolumeCopyResponse\x12)\n" +
"\x11last_append_at_ns\x18\x01 \x01(\x04R\x0elastAppendAtNs\x12'\n" +
"\x0fprocessed_bytes\x18\x02 \x01(\x03R\x0eprocessedBytes\"\x94\x02\n" +
"\x0fprocessed_bytes\x18\x02 \x01(\x03R\x0eprocessedBytes\"\xb4\x02\n" +
"\x0fCopyFileRequest\x12\x1b\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x10\n" +
"\x03ext\x18\x02 \x01(\tR\x03ext\x12/\n" +
@@ -6277,7 +6285,10 @@ const file_volume_server_proto_rawDesc = "" +
"collection\x12 \n" +
"\fis_ec_volume\x18\x06 \x01(\bR\n" +
"isEcVolume\x12>\n" +
"\x1cignore_source_file_not_found\x18\a \x01(\bR\x18ignoreSourceFileNotFound\"[\n" +
"\x1cignore_source_file_not_found\x18\a \x01(\bR\x18ignoreSourceFileNotFound\x12\x1e\n" +
"\n" +
"generation\x18\b \x01(\rR\n" +
"generation\"[\n" +
"\x10CopyFileResponse\x12!\n" +
"\ffile_content\x18\x01 \x01(\fR\vfileContent\x12$\n" +
"\x0emodified_ts_ns\x18\x02 \x01(\x03R\fmodifiedTsNs\"z\n" +

View File

@@ -131,7 +131,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget = processed + reportInterval
}
return true
}, throttler); err != nil {
}, throttler, 0); err != nil { // regular volumes use generation 0
return err
}
if sendErr != nil {
@@ -142,14 +142,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
}
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil {
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler, 0); err != nil { // regular volumes use generation 0
return err
}
if modifiedTsNs > 0 {
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil {
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler, 0); err != nil { // regular volumes use generation 0
return err
}
if modifiedTsNs > 0 {
@@ -199,10 +199,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond))
return vs.doCopyFileWithGeneration(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, 0)
}
func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) {
func (vs *VolumeServer) doCopyFileWithGeneration(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, generation uint32) (modifiedTsNs int64, err error) {
return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond), generation)
}
func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler, generation uint32) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -212,6 +216,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe
Collection: collection,
IsEcVolume: isEcVolume,
IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
Generation: generation, // pass generation to source server
})
if err != nil {
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
@@ -332,22 +337,29 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
v.SyncToDisk()
fileName = v.FileName(req.Ext)
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
// Use generation-aware filename for EC volumes
generation := req.Generation
for _, location := range vs.store.Locations {
tName := util.Join(location.Directory, baseFileName)
// Try data directory with generation-aware naming
baseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.Directory, int(req.VolumeId), generation)
tName := baseFileName + req.Ext
if util.FileExists(tName) {
fileName = tName
break
}
tName = util.Join(location.IdxDirectory, baseFileName)
// Try index directory with generation-aware naming
baseFileName = erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.IdxDirectory, int(req.VolumeId), generation)
tName = baseFileName + req.Ext
if util.FileExists(tName) {
fileName = tName
break
}
}
if fileName == "" {
if req.IgnoreSourceFileNotFound {
return nil
}
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
return fmt.Errorf("CopyFile not found ec volume id %d generation %d", req.VolumeId, generation)
}
}

View File

@@ -187,36 +187,40 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
}
}
dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
// Generate target filenames with generation awareness
generation := req.Generation
dataBaseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.Directory, int(req.VolumeId), generation)
indexBaseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.IdxDirectory, int(req.VolumeId), generation)
glog.V(1).Infof("VolumeEcShardsCopy: copying EC shards with generation %d: data=%s, index=%s",
generation, dataBaseFileName, indexBaseFileName)
err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
// copy ec data slices with generation awareness
for _, shardId := range req.ShardIds {
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil {
if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil, generation); err != nil {
return err
}
}
if req.CopyEcxFile {
// copy ecx file
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
// copy ecx file with generation awareness
if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil, generation); err != nil {
return err
}
}
if req.CopyEcjFile {
// copy ecj file
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil {
// copy ecj file with generation awareness
if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil, generation); err != nil {
return err
}
}
if req.CopyVifFile {
// copy vif file
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil {
// copy vif file with generation awareness
if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil, generation); err != nil {
return err
}
}

View File

@@ -150,6 +150,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
Generation: 1, // TODO: implement proper generation tracking in vacuum task
})
if copyErr != nil {
return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr)
@@ -254,6 +255,7 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
Generation: 1, // TODO: implement proper generation tracking in vacuum task
})
if copyErr != nil {
return fmt.Errorf("failed to copy new shards %v from %s to %s: %w", needToDistributeBits.ShardIds(), sourceNode, targetNode, copyErr)