mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 08:46:54 +08:00
backup: handle volume not found when backing up (#7465)
* handle volume not found when backing up * error handling on reading volume ttl and replication * fix Inconsistent error handling: should continue to next location. * adjust messages * close volume * refactor * refactor * proper v.Close()
This commit is contained in:
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
@@ -66,6 +68,116 @@ var cmdBackup = &Command{
|
||||
`,
|
||||
}
|
||||
|
||||
// parseTTL parses the TTL from user input or volume stats.
|
||||
// Returns (ttl, error, isFatal) where isFatal=true for invalid user input.
|
||||
func parseTTL(userTTL string, statsTTL string) (*needle.TTL, error, bool) {
|
||||
if userTTL != "" {
|
||||
ttl, err := needle.ReadTTL(userTTL)
|
||||
if err != nil {
|
||||
// User-provided TTL is invalid - this is fatal
|
||||
return nil, fmt.Errorf("invalid user-provided ttl %s: %w", userTTL, err), true
|
||||
}
|
||||
return ttl, nil, false
|
||||
}
|
||||
|
||||
ttl, err := needle.ReadTTL(statsTTL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing ttl %s from stats: %w", statsTTL, err), false
|
||||
}
|
||||
return ttl, nil, false
|
||||
}
|
||||
|
||||
// parseReplication parses the replication from user input or volume stats.
|
||||
// Returns (replication, error, isFatal) where isFatal=true for invalid user input.
|
||||
func parseReplication(userReplication string, statsReplication string) (*super_block.ReplicaPlacement, error, bool) {
|
||||
if userReplication != "" {
|
||||
replication, err := super_block.NewReplicaPlacementFromString(userReplication)
|
||||
if err != nil {
|
||||
// User-provided replication is invalid - this is fatal
|
||||
return nil, fmt.Errorf("invalid user-provided replication %s: %w", userReplication, err), true
|
||||
}
|
||||
return replication, nil, false
|
||||
}
|
||||
|
||||
replication, err := super_block.NewReplicaPlacementFromString(statsReplication)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing replication %s from stats: %w", statsReplication, err), false
|
||||
}
|
||||
return replication, nil, false
|
||||
}
|
||||
|
||||
// backupFromLocation attempts to backup a volume from a specific volume server location.
|
||||
// Returns (error, isFatal) where isFatal=true means the error is due to invalid user input
|
||||
// and should not be retried with other locations.
|
||||
func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId) (error, bool) {
|
||||
stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting volume status: %w", err), false
|
||||
}
|
||||
|
||||
// Parse TTL
|
||||
ttl, err, isFatal := parseTTL(*s.ttl, stats.Ttl)
|
||||
if err != nil {
|
||||
return err, isFatal
|
||||
}
|
||||
|
||||
// Parse replication
|
||||
replication, err, isFatal := parseReplication(*s.replication, stats.Replication)
|
||||
if err != nil {
|
||||
return err, isFatal
|
||||
}
|
||||
|
||||
ver := needle.Version(stats.Version)
|
||||
|
||||
// Create or load the volume
|
||||
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating or reading volume: %w", err), false
|
||||
}
|
||||
|
||||
// Handle compaction if needed
|
||||
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
|
||||
if err = v.Compact2(0, 0, nil); err != nil {
|
||||
v.Close()
|
||||
return fmt.Errorf("compacting volume: %w", err), false
|
||||
}
|
||||
if err = v.CommitCompact(); err != nil {
|
||||
v.Close()
|
||||
return fmt.Errorf("committing compaction: %w", err), false
|
||||
}
|
||||
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
|
||||
if _, err = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); err != nil {
|
||||
v.Close()
|
||||
return fmt.Errorf("writing superblock: %w", err), false
|
||||
}
|
||||
}
|
||||
|
||||
datSize, _, _ := v.FileStat()
|
||||
|
||||
// If local volume is larger than remote, recreate it
|
||||
if datSize > stats.TailOffset {
|
||||
if err := v.Destroy(false); err != nil {
|
||||
v.Close()
|
||||
return fmt.Errorf("destroying volume: %w", err), false
|
||||
}
|
||||
v.Close() // Close the destroyed volume
|
||||
// recreate an empty volume
|
||||
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("recreating volume: %w", err), false
|
||||
}
|
||||
}
|
||||
|
||||
// Perform the incremental backup
|
||||
if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil {
|
||||
v.Close()
|
||||
return fmt.Errorf("incremental backup: %w", err), false
|
||||
}
|
||||
|
||||
v.Close()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func runBackup(cmd *Command, args []string) bool {
|
||||
|
||||
util.LoadSecurityConfiguration()
|
||||
@@ -88,83 +200,35 @@ func runBackup(cmd *Command, args []string) bool {
|
||||
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
|
||||
return true
|
||||
}
|
||||
volumeServer := lookup.Locations[0].ServerAddress()
|
||||
|
||||
stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
|
||||
if err != nil {
|
||||
fmt.Printf("Error get volume %d status: %v\n", vid, err)
|
||||
return true
|
||||
}
|
||||
var ttl *needle.TTL
|
||||
if *s.ttl != "" {
|
||||
ttl, err = needle.ReadTTL(*s.ttl)
|
||||
if err != nil {
|
||||
fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err)
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
ttl, err = needle.ReadTTL(stats.Ttl)
|
||||
if err != nil {
|
||||
fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
|
||||
return true
|
||||
}
|
||||
}
|
||||
var replication *super_block.ReplicaPlacement
|
||||
if *s.replication != "" {
|
||||
replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
|
||||
if err != nil {
|
||||
fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
|
||||
if err != nil {
|
||||
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
ver := needle.Version(stats.Version)
|
||||
|
||||
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
|
||||
if len(lookup.Locations) == 0 {
|
||||
fmt.Printf("Error: volume %d has no locations available\n", vid)
|
||||
return true
|
||||
}
|
||||
|
||||
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
|
||||
if err = v.Compact2(0, 0, nil); err != nil {
|
||||
fmt.Printf("Compact Volume before synchronizing %v\n", err)
|
||||
return true
|
||||
}
|
||||
if err = v.CommitCompact(); err != nil {
|
||||
fmt.Printf("Commit Compact before synchronizing %v\n", err)
|
||||
return true
|
||||
}
|
||||
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
|
||||
v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
|
||||
}
|
||||
// Try each available location until one succeeds
|
||||
var lastErr error
|
||||
for i, location := range lookup.Locations {
|
||||
volumeServer := location.ServerAddress()
|
||||
fmt.Printf("Attempting to backup volume %d from location %d/%d: %s\n", vid, i+1, len(lookup.Locations), volumeServer)
|
||||
|
||||
datSize, _, _ := v.FileStat()
|
||||
|
||||
if datSize > stats.TailOffset {
|
||||
// remove the old data
|
||||
if err := v.Destroy(false); err != nil {
|
||||
fmt.Printf("Error destroying volume: %v\n", err)
|
||||
}
|
||||
// recreate an empty volume
|
||||
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
|
||||
err, isFatal := backupFromLocation(volumeServer, grpcDialOption, vid)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
|
||||
return true
|
||||
fmt.Printf("Error backing up volume %d from %s: %v\n", vid, volumeServer, err)
|
||||
lastErr = err
|
||||
// Check if this is a fatal user-input error
|
||||
if isFatal {
|
||||
return true
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
defer v.Close()
|
||||
|
||||
if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil {
|
||||
fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
|
||||
// Success!
|
||||
fmt.Printf("Successfully backed up volume %d from %s\n", vid, volumeServer)
|
||||
return true
|
||||
}
|
||||
|
||||
// All locations failed
|
||||
fmt.Printf("Failed to backup volume %d after trying all %d locations. Last error: %v\n", vid, len(lookup.Locations), lastErr)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user