mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 22:57:57 +08:00
Shell: support regular expression for collection selection (#7158)
* support regular expression for collection selection * refactor * ordering * fix exact match * Update command_volume_balance_test.go * simplify * Update command_volume_balance.go * comment
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
@@ -53,6 +54,11 @@ func (c *commandEcEncode) Help() string {
|
||||
If you only have less than 4 volume servers, with erasure coding, at least you can afford to
|
||||
have 4 corrupted shard files.
|
||||
|
||||
The -collection parameter supports regular expressions for pattern matching:
|
||||
- Use exact match: ec.encode -collection="^mybucket$"
|
||||
- Match multiple buckets: ec.encode -collection="bucket.*"
|
||||
- Match all collections: ec.encode -collection=".*"
|
||||
|
||||
Options:
|
||||
-verbose: show detailed reasons why volumes are not selected for encoding
|
||||
|
||||
@@ -112,12 +118,11 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
volumeIds = append(volumeIds, vid)
|
||||
balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
|
||||
} else {
|
||||
// apply to all volumes for the given collection
|
||||
volumeIds, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
|
||||
// apply to all volumes for the given collection pattern (regex)
|
||||
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
balanceCollections = []string{*collection}
|
||||
}
|
||||
|
||||
// Collect volume locations BEFORE EC encoding starts to avoid race condition
|
||||
@@ -270,7 +275,13 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
|
||||
|
||||
}
|
||||
|
||||
func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, sourceDiskType *types.DiskType, fullPercentage float64, quietPeriod time.Duration, verbose bool) (vids []needle.VolumeId, err error) {
|
||||
func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, collectionPattern string, sourceDiskType *types.DiskType, fullPercentage float64, quietPeriod time.Duration, verbose bool) (vids []needle.VolumeId, matchedCollections []string, err error) {
|
||||
// compile regex pattern for collection matching
|
||||
collectionRegex, err := compileCollectionPattern(collectionPattern)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err)
|
||||
}
|
||||
|
||||
// collect topology information
|
||||
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
|
||||
if err != nil {
|
||||
@@ -280,7 +291,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
|
||||
quietSeconds := int64(quietPeriod / time.Second)
|
||||
nowUnixSeconds := time.Now().Unix()
|
||||
|
||||
fmt.Printf("collect volumes quiet for: %d seconds and %.1f%% full\n", quietSeconds, fullPercentage)
|
||||
fmt.Printf("collect volumes with collection pattern '%s', quiet for: %d seconds and %.1f%% full\n", collectionPattern, quietSeconds, fullPercentage)
|
||||
|
||||
// Statistics for verbose mode
|
||||
var (
|
||||
@@ -294,6 +305,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
|
||||
)
|
||||
|
||||
vidMap := make(map[uint32]bool)
|
||||
collectionSet := make(map[string]bool)
|
||||
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
for _, diskInfo := range dn.DiskInfos {
|
||||
for _, v := range diskInfo.VolumeInfos {
|
||||
@@ -309,16 +321,19 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
|
||||
continue
|
||||
}
|
||||
|
||||
// check collection
|
||||
if v.Collection != selectedCollection {
|
||||
// check collection against regex pattern
|
||||
if !collectionRegex.MatchString(v.Collection) {
|
||||
wrongCollection++
|
||||
if verbose {
|
||||
fmt.Printf("skip volume %d on %s: wrong collection (expected: %s, actual: %s)\n",
|
||||
v.Id, dn.Id, selectedCollection, v.Collection)
|
||||
fmt.Printf("skip volume %d on %s: collection doesn't match pattern (pattern: %s, actual: %s)\n",
|
||||
v.Id, dn.Id, collectionPattern, v.Collection)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// track matched collection
|
||||
collectionSet[v.Collection] = true
|
||||
|
||||
// check disk type
|
||||
if sourceDiskType != nil && types.ToDiskType(v.DiskType) != *sourceDiskType {
|
||||
wrongDiskType++
|
||||
@@ -393,11 +408,18 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
|
||||
}
|
||||
}
|
||||
|
||||
// Convert collection set to slice
|
||||
for collection := range collectionSet {
|
||||
matchedCollections = append(matchedCollections, collection)
|
||||
}
|
||||
sort.Strings(matchedCollections)
|
||||
|
||||
// Print summary statistics in verbose mode or when no volumes selected
|
||||
if verbose || len(vids) == 0 {
|
||||
fmt.Printf("\nVolume selection summary:\n")
|
||||
fmt.Printf(" Total volumes examined: %d\n", totalVolumes)
|
||||
fmt.Printf(" Selected for encoding: %d\n", len(vids))
|
||||
fmt.Printf(" Collections matched: %v\n", matchedCollections)
|
||||
|
||||
if totalVolumes > 0 {
|
||||
fmt.Printf("\nReasons for exclusion:\n")
|
||||
@@ -405,7 +427,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
|
||||
fmt.Printf(" Remote volumes: %d\n", remoteVolumes)
|
||||
}
|
||||
if wrongCollection > 0 {
|
||||
fmt.Printf(" Wrong collection: %d\n", wrongCollection)
|
||||
fmt.Printf(" Collection doesn't match pattern: %d\n", wrongCollection)
|
||||
}
|
||||
if wrongDiskType > 0 {
|
||||
fmt.Printf(" Wrong disk type: %d\n", wrongDiskType)
|
||||
|
Reference in New Issue
Block a user