mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-08-01 17:46:55 +08:00
forget shards that are broken
This commit is contained in:
parent
aaab2c25f5
commit
689930f092
@ -157,13 +157,17 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ecVolume.ShardLocationsLock.RLock()
|
ecVolume.ShardLocationsLock.RLock()
|
||||||
sourceDataNodes, _ := ecVolume.ShardLocations[shardId]
|
sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
|
||||||
ecVolume.ShardLocationsLock.RUnlock()
|
ecVolume.ShardLocationsLock.RUnlock()
|
||||||
|
|
||||||
// try reading directly
|
// try reading directly
|
||||||
_, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
|
if hasShardIdLocation {
|
||||||
if err == nil {
|
_, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
|
||||||
return
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
|
||||||
|
forgetShardId(ecVolume, shardId)
|
||||||
}
|
}
|
||||||
|
|
||||||
// try reading by recovering from other shards
|
// try reading by recovering from other shards
|
||||||
@ -176,6 +180,13 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
|
||||||
|
// failed to access the source data nodes, clear it up
|
||||||
|
ecVolume.ShardLocationsLock.Lock()
|
||||||
|
delete(ecVolume.ShardLocations, shardId)
|
||||||
|
ecVolume.ShardLocationsLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
|
func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
|
||||||
|
|
||||||
if ecVolume.ShardLocationsRefreshTime.Add(10 * time.Minute).After(time.Now()) {
|
if ecVolume.ShardLocationsRefreshTime.Add(10 * time.Minute).After(time.Now()) {
|
||||||
@ -265,7 +276,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
|
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
|
||||||
glog.V(1).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
||||||
|
|
||||||
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
|
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -294,7 +305,8 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *e
|
|||||||
data := make([]byte, len(buf))
|
data := make([]byte, len(buf))
|
||||||
nRead, readErr := s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
|
nRead, readErr := s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
glog.V(4).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
|
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
|
||||||
|
forgetShardId(ecVolume, shardId)
|
||||||
}
|
}
|
||||||
if nRead == len(buf) {
|
if nRead == len(buf) {
|
||||||
bufs[shardId] = data
|
bufs[shardId] = data
|
||||||
@ -309,7 +321,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *e
|
|||||||
glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
|
glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
glog.V(3).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
||||||
|
|
||||||
copy(buf, bufs[shardIdToRecover])
|
copy(buf, bufs[shardIdToRecover])
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user