refactoring

This commit is contained in:
Chris Lu
2019-04-20 12:05:28 -07:00
parent 6fc1f53018
commit 2ffe98443b
5 changed files with 128 additions and 126 deletions

View File

@@ -18,7 +18,6 @@ func init() {
}
type commandVolumeMove struct {
grpcDialOption grpc.DialOption
}
func (c *commandVolumeMove) Name() string {
@@ -49,9 +48,6 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *commandEnv, writer io.
fmt.Fprintf(writer, "received args: %+v\n", args)
return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
}
c.grpcDialOption = commandEnv.option.GrpcDialOption
ctx := context.Background()
sourceVolumeServer, targetVolumeServer, volumeIdString := args[0], args[1], args[2]
volumeId, err := needle.NewVolumeId(volumeIdString)
@@ -63,30 +59,36 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *commandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
ctx := context.Background()
return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := c.copyVolume(ctx, volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
if err = c.tailVolume(ctx, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, 5*time.Second); err != nil {
if err = tailVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
if err = c.deleteVolume(ctx, volumeId, sourceVolumeServer); err != nil {
if err = deleteVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer); err != nil {
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
}
log.Printf("moved volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
return nil
}
func (c *commandVolumeMove) copyVolume(ctx context.Context, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
err = operation.WithVolumeServerClient(targetVolumeServer, c.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: sourceVolumeServer,
@@ -100,13 +102,13 @@ func (c *commandVolumeMove) copyVolume(ctx context.Context, volumeId needle.Volu
return
}
func (c *commandVolumeMove) tailVolume(ctx context.Context, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, timeout time.Duration) (err error) {
func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
return operation.WithVolumeServerClient(targetVolumeServer, c.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
DrainingSeconds: uint32(timeout.Seconds()),
IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
SourceVolumeServer: sourceVolumeServer,
})
return replicateErr
@@ -114,8 +116,8 @@ func (c *commandVolumeMove) tailVolume(ctx context.Context, volumeId needle.Volu
}
func (c *commandVolumeMove) deleteVolume(ctx context.Context, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, c.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, unmountErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
})