mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
Merge f524d0801e into c96b2ce8e5
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
||||
@@ -35,7 +36,7 @@ func (c *commandVolumeConfigureReplication) Help() string {
|
||||
`
|
||||
}
|
||||
|
||||
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) {
|
||||
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||
|
||||
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
|
||||
@@ -67,39 +68,50 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
||||
vid := needle.VolumeId(*volumeIdInt)
|
||||
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
|
||||
|
||||
eg, gCtx := errgroup.WithContext(context.Background())
|
||||
_ = gCtx
|
||||
// find all data nodes with volumes that needs replication change
|
||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
var targetVolumeIds []uint32
|
||||
for _, diskInfo := range dn.DiskInfos {
|
||||
for _, v := range diskInfo.VolumeInfos {
|
||||
if volumeFilter(v) {
|
||||
targetVolumeIds = append(targetVolumeIds, v.Id)
|
||||
eg.Go(func() error {
|
||||
var targetVolumeIds []uint32
|
||||
for _, diskInfo := range dn.DiskInfos {
|
||||
for _, v := range diskInfo.VolumeInfos {
|
||||
if volumeFilter(v) {
|
||||
targetVolumeIds = append(targetVolumeIds, v.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(targetVolumeIds) == 0 {
|
||||
return
|
||||
}
|
||||
err = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
for _, targetVolumeId := range targetVolumeIds {
|
||||
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
|
||||
VolumeId: targetVolumeId,
|
||||
Replication: replicaPlacement.String(),
|
||||
})
|
||||
if configureErr != nil {
|
||||
return configureErr
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return errors.New(resp.Error)
|
||||
if len(targetVolumeIds) == 0 {
|
||||
return nil
|
||||
}
|
||||
fmt.Fprintf(writer, "volume server %s has %d volumes\n", dn.Id, len(targetVolumeIds))
|
||||
err = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
for i, targetVolumeId := range targetVolumeIds {
|
||||
if i%100 == 0 {
|
||||
fmt.Fprintf(writer, "volume marking progress on %s: %.2f (%d/%d)\n", dn.Id, float32(i)/float32(len(targetVolumeIds))*100, i, len(targetVolumeIds))
|
||||
}
|
||||
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
|
||||
VolumeId: targetVolumeId,
|
||||
Replication: replicaPlacement.String(),
|
||||
})
|
||||
if configureErr != nil {
|
||||
return configureErr
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return errors.New(resp.Error)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
err = eg.Wait()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user