mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-22 00:29:23 +08:00
volume: detect and drop volumes with disk IO error
from Jethro in slack: is it possible to make the assign request a bit smarter? Currently I’m in the state that a disk failed but all assign request are being send to this volume. It would be cool if the master sees this and stopped using this volume. e=HTTP(http://x:8089/913,045a782b63176edf) not 200 but 500 Internal Server Error Body={"size":740167,"error":"failed to write to local disk: write /mnt/v9/913.dat: input/output error","eTag":"ee4381e202212ff3aee647704c036689"} e=HTTP(http://x:8089/913,045a782c90240077) not 200 but 500 Internal Server Error Body={"size":792779,"error":"failed to write to local disk: write /mnt/v9/913.dat: input/output error","eTag":"c43463ccc11eb6eb2fc306f407a6a953"} e=HTTP(http://x:8089/913,045a782e6b7901ea) not 200 but 500 Internal Server Error Body={"size":3962392,"error":"failed to write to local disk: write /mnt/v9/913.dat: input/output error","eTag":"04c91198e9b276c81f11dbf189af5d28"}
This commit is contained in:
@@ -221,7 +221,12 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
|||||||
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
||||||
deleteVids = append(deleteVids, v.Id)
|
deleteVids = append(deleteVids, v.Id)
|
||||||
} else {
|
} else {
|
||||||
glog.V(0).Infoln("volume", v.Id, "is expired.")
|
glog.V(0).Infoln("volume %d is expired", v.Id)
|
||||||
|
}
|
||||||
|
if v.lastIoError != nil {
|
||||||
|
deleteVids = append(deleteVids, v.Id)
|
||||||
|
} else {
|
||||||
|
glog.Warningf("volume %d has IO error", v.Id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
collectionVolumeSize[v.Collection] += volumeMessage.Size
|
collectionVolumeSize[v.Collection] += volumeMessage.Size
|
||||||
|
@@ -46,6 +46,8 @@ type Volume struct {
|
|||||||
|
|
||||||
volumeInfo *volume_server_pb.VolumeInfo
|
volumeInfo *volume_server_pb.VolumeInfo
|
||||||
location *DiskLocation
|
location *DiskLocation
|
||||||
|
|
||||||
|
lastIoError error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
|
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
|
||||||
@@ -86,10 +88,10 @@ func (v *Volume) IndexFileName() (fileName string) {
|
|||||||
func (v *Volume) FileName(ext string) (fileName string) {
|
func (v *Volume) FileName(ext string) (fileName string) {
|
||||||
switch ext {
|
switch ext {
|
||||||
case ".idx", ".cpx", ".ldb":
|
case ".idx", ".cpx", ".ldb":
|
||||||
return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))+ext
|
return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) + ext
|
||||||
}
|
}
|
||||||
// .dat, .cpd, .vif
|
// .dat, .cpd, .vif
|
||||||
return VolumeFileName(v.dir, v.Collection, int(v.Id))+ext
|
return VolumeFileName(v.dir, v.Collection, int(v.Id)) + ext
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) Version() needle.Version {
|
func (v *Volume) Version() needle.Version {
|
||||||
|
@@ -19,6 +19,18 @@ var ErrorNotFound = errors.New("not found")
|
|||||||
var ErrorDeleted = errors.New("already deleted")
|
var ErrorDeleted = errors.New("already deleted")
|
||||||
var ErrorSizeMismatch = errors.New("size mismatch")
|
var ErrorSizeMismatch = errors.New("size mismatch")
|
||||||
|
|
||||||
|
func (v *Volume) checkReadWriteError(err error) {
|
||||||
|
if err == nil {
|
||||||
|
if v.lastIoError != nil {
|
||||||
|
v.lastIoError = nil
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err.Error() == "input/output error" {
|
||||||
|
v.lastIoError = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// isFileUnchanged checks whether this needle to write is same as last one.
|
// isFileUnchanged checks whether this needle to write is same as last one.
|
||||||
// It requires serialized access in the same volume.
|
// It requires serialized access in the same volume.
|
||||||
func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
|
func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
|
||||||
@@ -115,7 +127,9 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
|
|||||||
|
|
||||||
// append to dat file
|
// append to dat file
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
|
offset, size, _, err = n.Append(v.DataBackend, v.Version())
|
||||||
|
v.checkReadWriteError(err)
|
||||||
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -179,7 +193,9 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
|
|||||||
|
|
||||||
// append to dat file
|
// append to dat file
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
|
offset, size, _, err = n.Append(v.DataBackend, v.Version())
|
||||||
|
v.checkReadWriteError(err)
|
||||||
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
v.lastAppendAtNs = n.AppendAtNs
|
v.lastAppendAtNs = n.AppendAtNs
|
||||||
@@ -214,6 +230,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
|
|||||||
n.Data = nil
|
n.Data = nil
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
offset, _, _, err := n.Append(v.DataBackend, v.Version())
|
offset, _, _, err := n.Append(v.DataBackend, v.Version())
|
||||||
|
v.checkReadWriteError(err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
@@ -252,6 +269,7 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
|
|||||||
n.Data = nil
|
n.Data = nil
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
offset, _, _, err := n.Append(v.DataBackend, v.Version())
|
offset, _, _, err := n.Append(v.DataBackend, v.Version())
|
||||||
|
v.checkReadWriteError(err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
@@ -289,6 +307,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
|
|||||||
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
|
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
|
||||||
err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
|
err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
|
||||||
}
|
}
|
||||||
|
v.checkReadWriteError(err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user