change server address from string to a type

This commit is contained in:
Chris Lu
2021-09-12 22:47:52 -07:00
parent 2c9d4c8f43
commit e5fc35ed0c
128 changed files with 2138 additions and 2082 deletions

View File

@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"sort"
@@ -215,10 +216,10 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle
duplicatedShardIds := []uint32{uint32(shardId)}
for _, ecNode := range ecNodes[1:] {
if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)

View File

@@ -3,6 +3,7 @@ package shell
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"math"
"sort"
@@ -22,20 +23,22 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
if applyBalancing {
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
if err != nil {
return err
}
// unmount the to be deleted shards
err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
if err != nil {
return err
}
@@ -53,13 +56,14 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetServer.info.Id != existingLocation {
if targetAddress != existingLocation {
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
@@ -69,7 +73,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcxFile: true,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: existingLocation,
SourceDataNode: string(existingLocation),
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
@@ -86,7 +90,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
}
if targetServer.info.Id != existingLocation {
if targetAddress != existingLocation {
copiedShardIds = shardIdsToCopy
glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
}
@@ -233,7 +237,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
return
}
func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
@@ -248,7 +252,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin
}
func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
@@ -261,7 +265,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s
})
}
func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)

View File

@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
@@ -100,7 +101,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
return nil
}
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -132,7 +133,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, ta
return nil
}
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
@@ -148,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
}
func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
maxShardCount := 0
var exisitngEcIndexBits erasure_coding.ShardBits
@@ -185,7 +186,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur
CopyEcxFile: false,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: loc,
SourceDataNode: string(loc),
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
@@ -243,14 +244,14 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri
return
}
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)
}
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"sync"
"time"
@@ -106,7 +107,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
}
// generate ec shards
err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].Url)
err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress())
if err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
@@ -120,7 +121,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
return nil
}
func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
@@ -161,13 +162,13 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection
}
// unmount the to be deleted shards
err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
if err != nil {
return err
}
// ask the source volume server to clean up copied ec shards
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
if err != nil {
return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
@@ -175,7 +176,7 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection
// ask the source volume server to delete the original volume
for _, location := range existingLocations {
fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress())
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
}
@@ -194,7 +195,7 @@ func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer
copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
defer wg.Done()
copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
allocatedEcShardIds, volumeId, collection, existingLocation.Url)
allocatedEcShardIds, volumeId, collection, existingLocation.ServerAddress())
if copyErr != nil {
err = copyErr
} else {

View File

@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -141,7 +142,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
// clean up working files
// ask the rebuilder to delete the copied shards
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), copiedShardIds)
if err != nil {
fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds)
}
@@ -153,13 +154,13 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
}
// generate ec shards, and maybe ecx file
generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info))
if err != nil {
return err
}
// mount the generated shards
err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), generatedShardIds)
if err != nil {
return err
}
@@ -169,7 +170,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
return nil
}
func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) {
err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
@@ -212,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection
var copyErr error
if applyBalancing {
copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,

View File

@@ -63,8 +63,8 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
fileName := *outputFileName
if fileName == "" {
t := time.Now()
fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta",
commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
fileName = fmt.Sprintf("%s-%4d%02d%02d-%02d%02d%02d.meta",
commandEnv.option.FilerAddress.ToHttpAddress(), t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
}
dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
@@ -105,7 +105,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
})
if err == nil {
fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", commandEnv.option.FilerHost, commandEnv.option.FilerPort, path, fileName)
fmt.Fprintf(writer, "meta data for http://%s%s is saved to %s\n", commandEnv.option.FilerAddress.ToHttpAddress(), path, fileName)
}
return err

View File

@@ -78,7 +78,7 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io
}
for _, staleUpload := range staleUploads {
deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerHost, commandEnv.option.FilerPort, uploadsDir, staleUpload)
deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
err = util.Delete(deleteUrl, "")

View File

@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
@@ -325,7 +326,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType, false)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, false)
}
return nil
}

View File

@@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"io"
@@ -108,10 +109,10 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(aDB *needle_map.MemDb, bDB *nee
aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb()
// read index db
if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil {
if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), *verbose, writer); err != nil {
return err
}
if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil {
if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), *verbose, writer); err != nil {
return err
}
@@ -155,7 +156,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
for _, needleValue := range missingNeedles {
needleBlob, err := c.readSourceNeedleBlob(source.location.dataNode.Id, source.info.Id, needleValue)
needleBlob, err := c.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue)
if err != nil {
return hasChanges, err
}
@@ -170,7 +171,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
hasChanges = true
if err = c.writeNeedleBlobToTarget(target.location.dataNode.Id, source.info.Id, needleValue, needleBlob); err != nil {
if err = c.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
return hasChanges, err
}
@@ -179,7 +180,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
return
}
func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
@@ -197,7 +198,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string,
return
}
func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
@@ -211,7 +212,7 @@ func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer stri
}
func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer string, verbose bool, writer io.Writer) error {
func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer) error {
var buf bytes.Buffer
if err := c.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer); err != nil {
@@ -226,7 +227,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect
}
func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer string, buf *bytes.Buffer, verbose bool, writer io.Writer) error {
func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error {
return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {

View File

@@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -83,7 +84,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
for _, dst := range allLocations {
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),

View File

@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -44,7 +45,7 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr
sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr)
volumeId := needle.VolumeId(*volumeIdInt)

View File

@@ -2,6 +2,7 @@ package shell
import (
"flag"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -41,7 +42,7 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i
return nil
}
sourceVolumeServer := *nodeStr
sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)

View File

@@ -2,6 +2,7 @@ package shell
import (
"flag"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
@@ -58,7 +59,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri
if v.Size <= 8 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
if *applyBalancing {
log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), dn.Id); deleteErr != nil {
if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(dn)); deleteErr != nil {
err = deleteErr
}
continue

View File

@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
@@ -147,7 +148,7 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
break
}
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode)); err != nil {
return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
}
@@ -200,10 +201,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
break
}
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
SourceDataNode: replica.location.dataNode.Id,
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
})
if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)

View File

@@ -5,6 +5,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
"io/ioutil"
@@ -437,7 +438,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri
}
type VInfo struct {
server string
server pb.ServerAddress
collection string
isEcVolume bool
}
@@ -459,14 +460,14 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo
for _, diskInfo := range t.DiskInfos {
for _, vi := range diskInfo.VolumeInfos {
volumeIdToServer[vi.Id] = VInfo{
server: t.Id,
server: pb.NewServerAddressFromDataNode(t),
collection: vi.Collection,
isEcVolume: false,
}
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
volumeIdToServer[ecShardInfo.Id] = VInfo{
server: t.Id,
server: pb.NewServerAddressFromDataNode(t),
collection: ecShardInfo.Collection,
isEcVolume: true,
}
@@ -491,7 +492,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
var wg sync.WaitGroup
for _, location := range locations {
wg.Add(1)
go func(server string, fidList []string) {
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
@@ -500,7 +501,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
resultChan <- deleteResults
}
}(location.Url, fileIds)
}(location.ServerAddress(), fileIds)
}
wg.Wait()
close(resultChan)

View File

@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -47,7 +48,7 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.
markWritable = true
}
sourceVolumeServer := *nodeStr
sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)

View File

@@ -3,6 +3,7 @@ package shell
import (
"context"
"flag"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -45,7 +46,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
return nil
}
sourceVolumeServer := *nodeStr
sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
@@ -53,7 +54,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
}
func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),

View File

@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"log"
@@ -62,7 +63,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr
sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
@@ -74,7 +75,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
@@ -100,7 +101,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
return nil
}
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, diskType string) (lastAppendAtNs uint64, err error) {
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
// check to see if the volume is already read-only and if its not then we need
// to mark it as read-only and then before we return we need to undo what we
@@ -142,7 +143,7 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: sourceVolumeServer,
SourceDataNode: string(sourceVolumeServer),
DiskType: diskType,
})
if replicateErr == nil {
@@ -154,21 +155,21 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
return
}
func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
SourceVolumeServer: sourceVolumeServer,
SourceVolumeServer: string(sourceVolumeServer),
})
return replicateErr
})
}
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
@@ -177,7 +178,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
})
}
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) {
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
@@ -195,7 +196,7 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error {
for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
if err := markVolumeWritable(grpcDialOption, volumeId, location.Url, writable); err != nil {
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil {
return err
}
}

View File

@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
"io"
@@ -50,11 +51,11 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri
return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
}
return volumeServerLeave(commandEnv.option.GrpcDialOption, *volumeServer, writer)
return volumeServerLeave(commandEnv.option.GrpcDialOption, pb.ServerAddress(*volumeServer), writer)
}
func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer string, writer io.Writer) (err error) {
func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) {
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
if leaveErr != nil {

View File

@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"google.golang.org/grpc"
@@ -112,7 +113,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s
// TODO parallelize this
for _, loc := range locations {
// copy the .dat file from remote tier to local
err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.ServerAddress())
if err != nil {
return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err)
}
@@ -121,7 +122,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s
return nil
}
func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error {
err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{

View File

@@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
@@ -20,7 +21,7 @@ func init() {
}
type commandVolumeTierMove struct {
activeServers map[string]struct{}
activeServers map[pb.ServerAddress]struct{}
activeServersLock sync.Mutex
activeServersCond *sync.Cond
}
@@ -42,7 +43,7 @@ func (c *commandVolumeTierMove) Help() string {
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
c.activeServers = make(map[string]struct{})
c.activeServers = make(map[pb.ServerAddress]struct{})
c.activeServersCond = sync.NewCond(new(sync.Mutex))
if err = commandEnv.confirmIsLocked(); err != nil {
@@ -117,10 +118,10 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
if isOneOf(dst.dataNode.Id, locations) {
continue
}
sourceVolumeServer := ""
var sourceVolumeServer pb.ServerAddress
for _, loc := range locations {
if loc.Url != dst.dataNode.Id {
sourceVolumeServer = loc.Url
sourceVolumeServer = loc.ServerAddress()
}
}
if sourceVolumeServer == "" {
@@ -135,16 +136,17 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
break
}
destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
c.activeServersCond.L.Lock()
_, isSourceActive := c.activeServers[sourceVolumeServer]
_, isDestActive := c.activeServers[dst.dataNode.Id]
_, isDestActive := c.activeServers[destServerAddress]
for isSourceActive || isDestActive {
c.activeServersCond.Wait()
_, isSourceActive = c.activeServers[sourceVolumeServer]
_, isDestActive = c.activeServers[dst.dataNode.Id]
_, isDestActive = c.activeServers[destServerAddress]
}
c.activeServers[sourceVolumeServer] = struct{}{}
c.activeServers[dst.dataNode.Id] = struct{}{}
c.activeServers[destServerAddress] = struct{}{}
c.activeServersCond.L.Unlock()
wg.Add(1)
@@ -153,7 +155,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", vid, sourceVolumeServer, dst.dataNode.Id, err)
}
delete(c.activeServers, sourceVolumeServer)
delete(c.activeServers, dst.dataNode.Id)
delete(c.activeServers, destServerAddress)
c.activeServersCond.Signal()
wg.Done()
}(dst)
@@ -170,13 +172,13 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil
}
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location) (err error) {
// mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil {
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
@@ -191,8 +193,8 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
// remove the remaining replicas
for _, loc := range locations {
if loc.Url != dst.dataNode.Id && loc.Url != sourceVolumeServer {
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil {
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"time"
@@ -107,7 +108,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
}
// copy the .dat file to remote tier
err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].ServerAddress(), dest, keepLocalDatFile)
if err != nil {
return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err)
}
@@ -115,7 +116,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return nil
}
func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error {
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{

View File

@@ -3,6 +3,7 @@ package shell
import (
"context"
"flag"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -45,7 +46,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
sourceVolumeServer := *nodeStr
sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
@@ -53,7 +54,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
}
func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),

View File

@@ -22,7 +22,7 @@ type ShellOptions struct {
// shell transient context
FilerHost string
FilerPort int64
FilerAddress string
FilerAddress pb.ServerAddress
Directory string
}
@@ -46,7 +46,7 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()),
option: options,
}
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
@@ -98,8 +98,7 @@ var _ = filer_pb.FilerClient(&CommandEnv{})
func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress := util.JoinHostPort(ce.option.FilerHost, int(ce.option.FilerPort+10000))
return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn)
return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
}