mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-09 03:56:13 +08:00
Collecting volume locations for volumes before EC encoding
fix https://github.com/seaweedfs/seaweedfs/issues/6963
This commit is contained in:
@@ -4,10 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||||
@@ -115,6 +116,14 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
|||||||
balanceCollections = []string{*collection}
|
balanceCollections = []string{*collection}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Collect volume locations BEFORE EC encoding starts to avoid race condition
|
||||||
|
// where the master metadata is updated after EC encoding but before deletion
|
||||||
|
fmt.Printf("Collecting volume locations for %d volumes before EC encoding...\n", len(volumeIds))
|
||||||
|
volumeLocationsMap, err := volumeLocations(commandEnv, volumeIds)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// encode all requested volumes...
|
// encode all requested volumes...
|
||||||
if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
|
if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
|
||||||
return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
|
return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
|
||||||
@@ -123,10 +132,12 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
|||||||
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
|
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
|
||||||
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
|
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
|
||||||
}
|
}
|
||||||
// ...then delete original volumes.
|
// ...then delete original volumes using pre-collected locations.
|
||||||
if err := doDeleteVolumes(commandEnv, volumeIds, *maxParallelization); err != nil {
|
fmt.Printf("Deleting original volumes after EC encoding...\n")
|
||||||
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
|
if err := doDeleteVolumesWithLocations(commandEnv, volumeIds, volumeLocationsMap, *maxParallelization); err != nil {
|
||||||
|
return fmt.Errorf("delete original volumes after EC encoding: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("Successfully completed EC encoding for %d volumes\n", len(volumeIds))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -150,7 +161,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
|
|||||||
}
|
}
|
||||||
locations, err := volumeLocations(commandEnv, volumeIds)
|
locations, err := volumeLocations(commandEnv, volumeIds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return fmt.Errorf("failed to get volume locations for EC encoding: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark volumes as readonly
|
// mark volumes as readonly
|
||||||
@@ -207,18 +218,22 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func doDeleteVolumes(commandEnv *CommandEnv, volumeIds []needle.VolumeId, maxParallelization int) error {
|
// doDeleteVolumesWithLocations deletes volumes using pre-collected location information
|
||||||
|
// This avoids race conditions where master metadata is updated after EC encoding
|
||||||
|
func doDeleteVolumesWithLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId, volumeLocationsMap map[needle.VolumeId][]wdclient.Location, maxParallelization int) error {
|
||||||
if !commandEnv.isLocked() {
|
if !commandEnv.isLocked() {
|
||||||
return fmt.Errorf("lock is lost")
|
return fmt.Errorf("lock is lost")
|
||||||
}
|
}
|
||||||
locations, err := volumeLocations(commandEnv, volumeIds)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ewg := NewErrorWaitGroup(maxParallelization)
|
ewg := NewErrorWaitGroup(maxParallelization)
|
||||||
for _, vid := range volumeIds {
|
for _, vid := range volumeIds {
|
||||||
for _, l := range locations[vid] {
|
locations, found := volumeLocationsMap[vid]
|
||||||
|
if !found {
|
||||||
|
fmt.Printf("warning: no locations found for volume %d, skipping deletion\n", vid)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, l := range locations {
|
||||||
ewg.Add(func() error {
|
ewg.Add(func() error {
|
||||||
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
|
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
|
||||||
return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
|
return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
|
||||||
|
|||||||
Reference in New Issue
Block a user