This commit is contained in:
chrislu
2022-09-14 23:06:44 -07:00
parent c8645fd232
commit 21c0587900
45 changed files with 730 additions and 694 deletions

View File

@@ -73,39 +73,48 @@ func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (*rem
}
/*
This function update entry.RemoteEntry if the remote has any changes.
This function update entry.RemoteEntry if the remote has any changes.
To pull remote updates, or created for the first time, the criteria is:
entry == nil or (entry.RemoteEntry != nil and (entry.RemoteEntry.RemoteTag != remote.RemoteTag or entry.RemoteEntry.RemoteMTime < remote.RemoteMTime ))
After the meta pull, the entry.RemoteEntry will have:
remoteEntry.LastLocalSyncTsNs == 0
Attributes.FileSize = uint64(remoteEntry.RemoteSize)
Attributes.Mtime = remoteEntry.RemoteMtime
remoteEntry.RemoteTag = actual remote tag
chunks = nil
To pull remote updates, or created for the first time, the criteria is:
When reading the file content or pulling the file content in "remote.cache", the criteria is:
Attributes.FileSize > 0 and len(chunks) == 0
After caching the file content, the entry.RemoteEntry will be
remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano()
Attributes.FileSize = uint64(remoteEntry.RemoteSize)
Attributes.Mtime = remoteEntry.RemoteMtime
chunks = non-emtpy
entry == nil or (entry.RemoteEntry != nil and (entry.RemoteEntry.RemoteTag != remote.RemoteTag or entry.RemoteEntry.RemoteMTime < remote.RemoteMTime ))
When "weed filer.remote.sync" to upload local changes to remote, the criteria is:
Attributes.Mtime > remoteEntry.RemoteMtime
Right after "weed filer.remote.sync", the entry.RemoteEntry will be
remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano()
remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize
remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime
remoteEntry.RemoteTag = actual remote tag
After the meta pull, the entry.RemoteEntry will have:
remoteEntry.LastLocalSyncTsNs == 0
Attributes.FileSize = uint64(remoteEntry.RemoteSize)
Attributes.Mtime = remoteEntry.RemoteMtime
remoteEntry.RemoteTag = actual remote tag
chunks = nil
If entry does not exists, need to pull meta
If entry.RemoteEntry == nil, this is a new local change and should not be overwritten
If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag {
the remote version is updated, need to pull meta
}
When reading the file content or pulling the file content in "remote.cache", the criteria is:
Attributes.FileSize > 0 and len(chunks) == 0
After caching the file content, the entry.RemoteEntry will be
remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano()
Attributes.FileSize = uint64(remoteEntry.RemoteSize)
Attributes.Mtime = remoteEntry.RemoteMtime
chunks = non-emtpy
When "weed filer.remote.sync" to upload local changes to remote, the criteria is:
Attributes.Mtime > remoteEntry.RemoteMtime
Right after "weed filer.remote.sync", the entry.RemoteEntry will be
remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano()
remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize
remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime
remoteEntry.RemoteTag = actual remote tag
If entry does not exists, need to pull meta
If entry.RemoteEntry == nil, this is a new local change and should not be overwritten
If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag {
the remote version is updated, need to pull meta
}
*/
func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *remote_pb.RemoteConf) error {

View File

@@ -164,15 +164,20 @@ func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty
}
// if an entry has synchronized metadata but has not synchronized content
// entry.Attributes.FileSize == entry.RemoteEntry.RemoteSize
// entry.Attributes.Mtime == entry.RemoteEntry.RemoteMtime
// entry.RemoteEntry.LastLocalSyncTsNs == 0
//
// entry.Attributes.FileSize == entry.RemoteEntry.RemoteSize
// entry.Attributes.Mtime == entry.RemoteEntry.RemoteMtime
// entry.RemoteEntry.LastLocalSyncTsNs == 0
//
// if an entry has synchronized metadata but has synchronized content before
// entry.Attributes.FileSize == entry.RemoteEntry.RemoteSize
// entry.Attributes.Mtime == entry.RemoteEntry.RemoteMtime
// entry.RemoteEntry.LastLocalSyncTsNs > 0
//
// entry.Attributes.FileSize == entry.RemoteEntry.RemoteSize
// entry.Attributes.Mtime == entry.RemoteEntry.RemoteMtime
// entry.RemoteEntry.LastLocalSyncTsNs > 0
//
// if an entry has synchronized metadata but has new updates
// entry.Attributes.Mtime * 1,000,000,000 > entry.RemoteEntry.LastLocalSyncTsNs
//
// entry.Attributes.Mtime * 1,000,000,000 > entry.RemoteEntry.LastLocalSyncTsNs
func doSaveRemoteEntry(client filer_pb.SeaweedFilerClient, localDir string, existingEntry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
existingEntry.RemoteEntry = remoteEntry
existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)

View File

@@ -70,7 +70,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
takeAction := !*skipChange
doDeletes := !*noDelete;
doDeletes := !*noDelete
underReplicatedVolumeIdsCount := 1
for underReplicatedVolumeIdsCount > 0 {
@@ -331,34 +331,40 @@ func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
}
/*
if on an existing data node {
return false
}
if different from existing dcs {
if lack on different dcs {
return true
}else{
return false
}
}
if not on primary dc {
return false
}
if different from existing racks {
if lack on different racks {
return true
}else{
return false
}
}
if not on primary rack {
return false
}
if lacks on same rack {
return true
} else {
return false
}
if on an existing data node {
return false
}
if different from existing dcs {
if lack on different dcs {
return true
}else{
return false
}
}
if not on primary dc {
return false
}
if different from existing racks {
if lack on different racks {
return true
}else{
return false
}
}
if not on primary rack {
return false
}
if lacks on same rack {
return true
} else {
return false
}
*/
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {

View File

@@ -1,6 +1,8 @@
package shell
import (
"context"
"errors"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -12,11 +14,9 @@ import (
"path/filepath"
"sync"
"time"
"context"
"errors"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@@ -63,7 +63,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("force", false, "actually apply the changes")
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
replicationString := tierCommand.String("toReplication", "", "the new target replication setting");
replicationString := tierCommand.String("toReplication", "", "the new target replication setting")
if err = tierCommand.Parse(args); err != nil {
return nil
@@ -226,7 +226,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil
}
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string ) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -247,25 +247,25 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
}
// If move is successful and replication is not empty, alter moved volume's replication setting
if *replicationString != "" {
err = operation.WithVolumeServerClient(false, newAddress, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: *replicationString,
})
if configureErr != nil {
return configureErr
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
})
// If move is successful and replication is not empty, alter moved volume's replication setting
if *replicationString != "" {
err = operation.WithVolumeServerClient(false, newAddress, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: *replicationString,
})
if configureErr != nil {
return configureErr
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
})
if err != nil {
glog.Errorf("update volume %d replication on %s: %v", vid, locations[0].Url, err)
glog.Errorf("update volume %d replication on %s: %v", vid, locations[0].Url, err)
}
}
}
// remove the remaining replicas
for _, loc := range locations {