This commit is contained in:
Chris Lu
2021-07-16 11:46:04 -07:00
parent 3f31c4091a
commit fb7a1be1c4

View File

@@ -157,61 +157,69 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
for _, vid := range underReplicatedVolumeIds { for _, vid := range underReplicatedVolumeIds {
replicas := volumeReplicas[vid] err := c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations)
replica := pickOneReplicaToCopyFrom(replicas) if err != nil {
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) return err
foundNewLocation := false }
hasSkippedCollection := false
keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) }
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) return nil
for _, dst := range allLocations { }
// check whether data nodes satisfy the constraints
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
// check collection name pattern replicas := volumeReplicas[vid]
if *c.collectionPattern != "" { replica := pickOneReplicaToCopyFrom(replicas)
matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
if err != nil { foundNewLocation := false
return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) hasSkippedCollection := false
} keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
if !matched { fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
hasSkippedCollection = true for _, dst := range allLocations {
break // check whether data nodes satisfy the constraints
} if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// check collection name pattern
if *c.collectionPattern != "" {
matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
if err != nil {
return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
} }
if !matched {
// ask the volume server to replicate the volume hasSkippedCollection = true
foundNewLocation = true
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
if !takeAction {
break break
} }
}
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { // ask the volume server to replicate the volume
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ foundNewLocation = true
VolumeId: replica.info.Id, fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
SourceDataNode: replica.location.dataNode.Id,
})
if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
return nil
})
if err != nil { if !takeAction {
return err
}
// adjust free volume count
dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
break break
} }
}
if !foundNewLocation && !hasSkippedCollection { err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
} VolumeId: replica.info.Id,
SourceDataNode: replica.location.dataNode.Id,
})
if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
return nil
})
if err != nil {
return err
}
// adjust free volume count
dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
break
}
}
if !foundNewLocation && !hasSkippedCollection {
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
} }
return nil return nil
} }