fix command_volume_tier_upload bug (#7041)

* fix command_volume_tier_upload bug: Avoid deleting volumes under the same collection

* simplify a bit

---------

Co-authored-by: hzxialei <hzxialei@corp.netease.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
This commit is contained in:
chalet 2025-07-29 03:34:43 +08:00 committed by GitHub
parent 6225e3caab
commit 1009b3cdce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -4,17 +4,21 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
func init() {
@ -110,9 +114,33 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ
func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
// find volume location
existingLocations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !found {
return fmt.Errorf("volume %d not found", vid)
topoInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return fmt.Errorf("collect topology info: %v", err)
}
var existingLocations []wdclient.Location
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
for _, disk := range dn.DiskInfos {
for _, vi := range disk.VolumeInfos {
if needle.VolumeId(vi.Id) == vid && (collection == "" || vi.Collection == collection) {
fmt.Printf("find volume %d from Url:%s, GrpcPort:%d, DC:%s\n", vid, dn.Id, dn.GrpcPort, string(dc))
existingLocations = append(existingLocations, wdclient.Location{
Url: dn.Id,
PublicUrl: dn.Id,
GrpcPort: int(dn.GrpcPort),
DataCenter: string(dc),
})
}
}
}
})
if len(existingLocations) == 0 {
if collection == "" {
return fmt.Errorf("volume %d not found", vid)
}
return fmt.Errorf("volume %d not found in collection %s", vid, collection)
}
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false)
@ -135,7 +163,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
if i == 0 {
continue
}
fmt.Printf("delete volume %d from %s\n", vid, location.Url)
fmt.Printf("delete volume %d from Url:%s\n", vid, location.Url)
err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false)
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)