mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 07:09:56 +08:00
resolve concurrent modification problem
n and err can be modified concurrently
This commit is contained in:
@@ -157,11 +157,8 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co
|
||||
}
|
||||
} else {
|
||||
ecVolume.ShardLocationsLock.RLock()
|
||||
sourceDataNodes, found := ecVolume.ShardLocations[shardId]
|
||||
sourceDataNodes, _ := ecVolume.ShardLocations[shardId]
|
||||
ecVolume.ShardLocationsLock.RUnlock()
|
||||
if !found || len(sourceDataNodes) == 0 {
|
||||
return nil, fmt.Errorf("failed to find ec shard %d.%d", ecVolume.VolumeId, shardId)
|
||||
}
|
||||
|
||||
// try reading directly
|
||||
_, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
|
||||
@@ -215,6 +212,10 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
|
||||
|
||||
func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
|
||||
|
||||
if len(sourceDataNodes) == 0 {
|
||||
return 0, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
|
||||
}
|
||||
|
||||
for _, sourceDataNode := range sourceDataNodes {
|
||||
glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
|
||||
n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
|
||||
@@ -291,13 +292,12 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *e
|
||||
go func(shardId erasure_coding.ShardId, locations []string) {
|
||||
defer wg.Done()
|
||||
data := make([]byte, len(buf))
|
||||
n, err = s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("readRemoteEcShardInterval %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
|
||||
nRead, readErr := s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
|
||||
if readErr != nil {
|
||||
glog.V(4).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
|
||||
}
|
||||
if n == len(buf) {
|
||||
if nRead == len(buf) {
|
||||
bufs[shardId] = data
|
||||
return
|
||||
}
|
||||
}(shardId, locations)
|
||||
}
|
||||
@@ -306,6 +306,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *e
|
||||
wg.Wait()
|
||||
|
||||
if err = enc.ReconstructData(bufs); err != nil {
|
||||
glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
|
||||
return 0, err
|
||||
}
|
||||
glog.V(3).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
||||
|
Reference in New Issue
Block a user