Merge remote-tracking branch 'origin/master'

This commit is contained in:
bingoohuang
2021-02-18 14:05:51 +08:00
147 changed files with 4359 additions and 2928 deletions

View File

@@ -2,6 +2,7 @@ package storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io/ioutil"
"os"
"path/filepath"
@@ -19,6 +20,7 @@ import (
type DiskLocation struct {
Directory string
IdxDirectory string
DiskType types.DiskType
MaxVolumeCount int
OriginalMaxVolumeCount int
MinFreeSpacePercent float32
@@ -32,7 +34,7 @@ type DiskLocation struct {
isDiskSpaceLow bool
}
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation {
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType types.DiskType) *DiskLocation {
dir = util.ResolvePath(dir)
if idxDir == "" {
idxDir = dir
@@ -42,6 +44,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
location := &DiskLocation{
Directory: dir,
IdxDirectory: idxDir,
DiskType: diskType,
MaxVolumeCount: maxVolumeCount,
OriginalMaxVolumeCount: maxVolumeCount,
MinFreeSpacePercent: minFreeSpacePercent,

View File

@@ -57,7 +57,7 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) {
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId)
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId)
if err != nil {
if err == os.ErrNotExist {
return os.ErrNotExist
@@ -68,7 +68,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
if !found {
ecVolume, err = erasure_coding.NewEcVolume(l.Directory, l.IdxDirectory, collection, vid)
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid)
if err != nil {
return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
}

View File

@@ -2,6 +2,7 @@ package erasure_coding
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"path"
"strconv"
@@ -20,11 +21,12 @@ type EcVolumeShard struct {
dir string
ecdFile *os.File
ecdFileSize int64
DiskType types.DiskType
}
func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType}
baseFileName := v.FileName()

View File

@@ -36,10 +36,11 @@ type EcVolume struct {
Version needle.Version
ecjFile *os.File
ecjFileAccessLock sync.Mutex
diskType types.DiskType
}
func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid}
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
dataBaseFileName := EcShardFileName(collection, dir, int(vid))
indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
@@ -191,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
DiskType: string(ev.diskType),
}
messages = append(messages, m)
}

View File

@@ -10,13 +10,15 @@ type EcVolumeInfo struct {
VolumeId needle.VolumeId
Collection string
ShardBits ShardBits
DiskType string
}
func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
return &EcVolumeInfo{
Collection: collection,
VolumeId: vid,
ShardBits: shardBits,
DiskType: diskType,
}
}
@@ -45,6 +47,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo {
VolumeId: ecInfo.VolumeId,
Collection: ecInfo.Collection,
ShardBits: ecInfo.ShardBits.Minus(other.ShardBits),
DiskType: ecInfo.DiskType,
}
return ret
@@ -55,6 +58,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.
Id: uint32(ecInfo.VolumeId),
EcIndexBits: uint32(ecInfo.ShardBits),
Collection: ecInfo.Collection,
DiskType: ecInfo.DiskType,
}
}

View File

@@ -52,11 +52,11 @@ func (s *Store) String() (str string) {
return
}
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind) (s *Store) {
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder)
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i])
location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
@@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil {
return e
}
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
@@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume {
}
return nil
}
func (s *Store) FindFreeLocation() (ret *DiskLocation) {
func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
if diskType != location.DiskType {
continue
}
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
currentFreeCount *= erasure_coding.DataShardsCount
currentFreeCount -= location.EcVolumesLen()
@@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.FindFreeLocation(); location != nil {
if location := s.FindFreeLocation(diskType); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
@@ -130,6 +133,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
ReplicaPlacement: uint32(replicaPlacement.Byte()),
Version: uint32(volume.Version()),
Ttl: ttl.ToUint32(),
DiskType: string(diskType),
}
return nil
} else {
@@ -169,6 +173,7 @@ func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) {
ReadOnly: v.IsReadOnly(),
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
DiskType: v.DiskType().String(),
}
s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
@@ -202,13 +207,13 @@ func (s *Store) GetRack() string {
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
maxVolumeCounts := make(map[string]uint32)
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount)
location.volumesLock.RLock()
for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
@@ -280,15 +285,15 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
return &master_pb.Heartbeat{
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0,
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCounts: maxVolumeCounts,
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0,
}
}
@@ -371,6 +376,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
}
return nil
}
@@ -390,6 +396,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
@@ -414,6 +421,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
if err := location.DeleteVolume(i); err == nil {
@@ -463,6 +471,9 @@ func (s *Store) GetVolumeSizeLimit() uint64 {
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
if volumeSizeLimit == 0 {
return
}
for _, diskLocation := range s.Locations {
if diskLocation.OriginalMaxVolumeCount == 0 {
currentMaxVolumeCount := diskLocation.MaxVolumeCount

View File

@@ -58,6 +58,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
DiskType: string(location.DiskType),
}
return nil
} else if err == os.ErrNotExist {
@@ -82,6 +83,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
Id: uint32(vid),
Collection: ecShard.Collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
DiskType: string(ecShard.DiskType),
}
for _, location := range s.Locations {

View File

@@ -0,0 +1,33 @@
package types
import (
"strings"
)
type DiskType string
const (
HardDriveType DiskType = ""
SsdType = "ssd"
)
func ToDiskType(vt string) (diskType DiskType) {
vt = strings.ToLower(vt)
diskType = HardDriveType
switch vt {
case "", "hdd":
diskType = HardDriveType
case "ssd":
diskType = SsdType
default:
diskType = DiskType(vt)
}
return
}
func (diskType DiskType) String() string {
if diskType == "" {
return ""
}
return string(diskType)
}

View File

@@ -171,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
}
func (v *Volume) DiskType() types.DiskType {
return v.location.DiskType
}
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
@@ -262,6 +266,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
DiskType: string(v.location.DiskType),
}
volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()

View File

@@ -2,6 +2,7 @@ package storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"io"
"os"
@@ -148,3 +149,18 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V
}
return n.AppendAtNs, err
}
func (v *Volume) checkIdxFile() error {
datFileSize, _, err := v.DataBackend.GetStat()
if err != nil {
return fmt.Errorf("get stat %s: %v", v.FileName(".dat"), err)
}
if datFileSize <= super_block.SuperBlockSize {
return nil
}
indexFileName := v.FileName(".idx")
if util.FileExists(indexFileName) {
return nil
}
return fmt.Errorf("idx file %s does not exists", indexFileName)
}

View File

@@ -14,6 +14,7 @@ type VolumeInfo struct {
Size uint64
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
DiskType string
Collection string
Version needle.Version
FileCount int
@@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
ModifiedAtSecond: m.ModifiedAtSecond,
RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey,
DiskType: m.DiskType,
}
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
@@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
}
vi.ReplicaPlacement = rp
vi.Ttl = needle.LoadTTLFromUint32(m.Ttl)
vi.DiskType = m.DiskType
return vi, nil
}
@@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe
ModifiedAtSecond: vi.ModifiedAtSecond,
RemoteStorageName: vi.RemoteStorageName,
RemoteStorageKey: vi.RemoteStorageKey,
DiskType: vi.DiskType,
}
}

View File

@@ -96,6 +96,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
v.dirIdx = v.dir
}
}
// check volume idx files
if err := v.checkIdxFile(); err != nil {
glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
}
var indexFile *os.File
if v.noWriteOrDelete {
glog.V(0).Infoln("open to read file", v.FileName(".idx"))