read ec volume shards locations from master

This commit is contained in:
Chris Lu
2019-05-28 00:13:13 -07:00
parent 713596e781
commit 2858a954b3
5 changed files with 291 additions and 106 deletions

View File

@@ -9,7 +9,7 @@ import (
type EcShardLocations struct {
Collection string
locations [erasure_coding.TotalShardsCount][]*DataNode
Locations [erasure_coding.TotalShardsCount][]*DataNode
}
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
@@ -69,18 +69,18 @@ func NewEcShardLocations(collection string) *EcShardLocations {
}
func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
dataNodes := loc.locations[shardId]
dataNodes := loc.Locations[shardId]
for _, n := range dataNodes {
if n.Id() == dn.Id() {
return false
}
}
loc.locations[shardId] = append(dataNodes, dn)
loc.Locations[shardId] = append(dataNodes, dn)
return true
}
func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
dataNodes := loc.locations[shardId]
dataNodes := loc.Locations[shardId]
foundIndex := -1
for index, n := range dataNodes {
if n.Id() == dn.Id() {
@@ -90,7 +90,7 @@ func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *Dat
if foundIndex < 0 {
return false
}
loc.locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
return true
}
@@ -122,3 +122,12 @@ func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo,
locations.DeleteShard(shardId, dn)
}
}
func (t *Topology) LookupEcShards(vid needle.VolumeId)(locations *EcShardLocations, found bool) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
locations, found = t.ecShardMap[vid]
return
}