mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 10:57:24 +08:00
pb shard info uses ShardBits instead one message for one shard
This commit is contained in:
@@ -2,8 +2,10 @@ package erasure_coding
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
@@ -44,6 +46,10 @@ func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) boo
|
||||
}
|
||||
}
|
||||
*shards = append(*shards, ecVolumeShard)
|
||||
sort.Slice(shards, func(i, j int) bool {
|
||||
return (*shards)[i].VolumeId < (*shards)[j].VolumeId ||
|
||||
(*shards)[i].VolumeId == (*shards)[j].VolumeId && (*shards)[i].ShardId < (*shards)[j].ShardId
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -68,14 +74,19 @@ func (shards *EcVolumeShards) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (shards *EcVolumeShards) ToVolumeInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
|
||||
func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
|
||||
prevVolumeId := needle.VolumeId(math.MaxUint32)
|
||||
var m *master_pb.VolumeEcShardInformationMessage
|
||||
for _, s := range *shards {
|
||||
m := &master_pb.VolumeEcShardInformationMessage{
|
||||
Id: uint32(s.VolumeId),
|
||||
Collection: s.Collection,
|
||||
EcIndex: uint32(s.ShardId),
|
||||
if s.VolumeId != prevVolumeId {
|
||||
m = &master_pb.VolumeEcShardInformationMessage{
|
||||
Id: uint32(s.VolumeId),
|
||||
Collection: s.Collection,
|
||||
}
|
||||
messages = append(messages, m)
|
||||
}
|
||||
messages = append(messages, m)
|
||||
prevVolumeId = s.VolumeId
|
||||
m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@@ -1,8 +1,6 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
@@ -11,59 +9,74 @@ import (
|
||||
type EcVolumeInfo struct {
|
||||
VolumeId needle.VolumeId
|
||||
Collection string
|
||||
shardIds uint16 // use bits to indicate the shard id
|
||||
ShardBits ShardBits
|
||||
}
|
||||
|
||||
func NewEcVolumeInfo(collection string, vid needle.VolumeId) *EcVolumeInfo {
|
||||
func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
|
||||
return &EcVolumeInfo{
|
||||
Collection: collection,
|
||||
VolumeId: vid,
|
||||
ShardBits: shardBits,
|
||||
}
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) AddShardId(id ShardId) {
|
||||
ecInfo.shardIds |= (1 << id)
|
||||
ecInfo.ShardBits = ecInfo.ShardBits.AddShardId(id)
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) RemoveShardId(id ShardId) {
|
||||
ecInfo.shardIds &^= (1 << id)
|
||||
ecInfo.ShardBits = ecInfo.ShardBits.RemoveShardId(id)
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) HasShardId(id ShardId) bool {
|
||||
return ecInfo.shardIds&(1<<id) > 0
|
||||
return ecInfo.ShardBits.HasShardId(id)
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) ShardIds() (ret []ShardId) {
|
||||
for i := ShardId(0); i < DataShardsCount+ParityShardsCount; i++ {
|
||||
if ecInfo.HasShardId(i) {
|
||||
ret = append(ret, i)
|
||||
}
|
||||
}
|
||||
return
|
||||
return ecInfo.ShardBits.ShardIds()
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) (*EcVolumeInfo) {
|
||||
ret := &EcVolumeInfo{
|
||||
VolumeId: ecInfo.VolumeId,
|
||||
Collection: ecInfo.Collection,
|
||||
shardIds: ecInfo.shardIds &^ other.shardIds,
|
||||
ShardBits: ecInfo.ShardBits.Minus(other.ShardBits),
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret []*master_pb.VolumeEcShardInformationMessage) {
|
||||
for _, shard := range ecInfo.ShardIds() {
|
||||
ret = append(ret, &master_pb.VolumeEcShardInformationMessage{
|
||||
Id: uint32(ecInfo.VolumeId),
|
||||
EcIndex: uint32(shard),
|
||||
Collection: ecInfo.Collection,
|
||||
})
|
||||
func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.VolumeEcShardInformationMessage) {
|
||||
return &master_pb.VolumeEcShardInformationMessage{
|
||||
Id: uint32(ecInfo.VolumeId),
|
||||
EcIndexBits: uint32(ecInfo.ShardBits),
|
||||
Collection: ecInfo.Collection,
|
||||
}
|
||||
}
|
||||
|
||||
type ShardBits uint32 // use bits to indicate the shard id, use 32 bits just for possible future extension
|
||||
|
||||
func (b ShardBits) AddShardId(id ShardId) ShardBits {
|
||||
return b | (1 << id)
|
||||
}
|
||||
|
||||
func (b ShardBits) RemoveShardId(id ShardId) ShardBits {
|
||||
return b &^ (1 << id)
|
||||
}
|
||||
|
||||
func (b ShardBits) HasShardId(id ShardId) bool {
|
||||
return b&(1<<id) > 0
|
||||
}
|
||||
|
||||
func (b ShardBits) ShardIds() (ret []ShardId) {
|
||||
for i := ShardId(0); i < DataShardsCount+ParityShardsCount; i++ {
|
||||
if b.HasShardId(i) {
|
||||
ret = append(ret, i)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) String() string {
|
||||
return fmt.Sprintf("id:%d shard:%v collection:%v", ecInfo.VolumeId, ecInfo.ShardIds(), ecInfo.Collection)
|
||||
func (b ShardBits) Minus(other ShardBits) (ShardBits) {
|
||||
return b &^ other
|
||||
}
|
||||
|
@@ -9,7 +9,7 @@ func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
|
||||
for _, location := range s.Locations {
|
||||
location.ecShardsLock.RLock()
|
||||
for _, ecShards := range location.ecShards {
|
||||
ecShardMessages = append(ecShardMessages, ecShards.ToVolumeInformationMessage()...)
|
||||
ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
|
||||
}
|
||||
location.ecShardsLock.RUnlock()
|
||||
}
|
||||
|
Reference in New Issue
Block a user