mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 10:17:24 +08:00
avoid reusing context object
fix https://github.com/chrislusf/seaweedfs/issues/1182
This commit is contained in:
@@ -116,7 +116,7 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
|
||||
func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
|
||||
for _, location := range s.Locations {
|
||||
if localEcVolume, found := location.FindEcVolume(vid); found {
|
||||
|
||||
@@ -133,7 +133,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
|
||||
if len(intervals) > 1 {
|
||||
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
|
||||
}
|
||||
bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals)
|
||||
bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
|
||||
}
|
||||
@@ -152,14 +152,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
|
||||
return 0, fmt.Errorf("ec shard %d not found", vid)
|
||||
}
|
||||
|
||||
func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
|
||||
if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
|
||||
if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
|
||||
return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
|
||||
}
|
||||
|
||||
for i, interval := range intervals {
|
||||
if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil {
|
||||
if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil {
|
||||
return nil, isDeleted, e
|
||||
} else {
|
||||
if isDeleted {
|
||||
@@ -175,7 +175,7 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, n
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
|
||||
data = make([]byte, interval.Size)
|
||||
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
|
||||
@@ -190,7 +190,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl
|
||||
|
||||
// try reading directly
|
||||
if hasShardIdLocation {
|
||||
_, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
|
||||
_, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
@@ -199,7 +199,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl
|
||||
}
|
||||
|
||||
// try reading by recovering from other shards
|
||||
_, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset)
|
||||
_, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
@@ -215,7 +215,7 @@ func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.Sha
|
||||
ecVolume.ShardLocationsLock.Unlock()
|
||||
}
|
||||
|
||||
func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
|
||||
func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) {
|
||||
|
||||
shardCount := len(ecVolume.ShardLocations)
|
||||
if shardCount < erasure_coding.DataShardsCount &&
|
||||
@@ -230,11 +230,11 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
|
||||
|
||||
glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
|
||||
|
||||
err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
|
||||
err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
||||
req := &master_pb.LookupEcVolumeRequest{
|
||||
VolumeId: uint32(ecVolume.VolumeId),
|
||||
}
|
||||
resp, err := masterClient.LookupEcVolume(ctx, req)
|
||||
resp, err := masterClient.LookupEcVolume(context.Background(), req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
|
||||
}
|
||||
@@ -258,7 +258,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||
func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||
|
||||
if len(sourceDataNodes) == 0 {
|
||||
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
|
||||
@@ -266,7 +266,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
|
||||
|
||||
for _, sourceDataNode := range sourceDataNodes {
|
||||
glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
|
||||
n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
|
||||
n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
@@ -276,12 +276,12 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||
func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||
|
||||
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
// copy data slice
|
||||
shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
|
||||
shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
|
||||
VolumeId: uint32(vid),
|
||||
ShardId: uint32(shardId),
|
||||
Offset: offset,
|
||||
@@ -316,7 +316,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||
func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||
glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
||||
|
||||
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
|
||||
@@ -344,7 +344,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId ty
|
||||
go func(shardId erasure_coding.ShardId, locations []string) {
|
||||
defer wg.Done()
|
||||
data := make([]byte, len(buf))
|
||||
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset)
|
||||
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
|
||||
if readErr != nil {
|
||||
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
|
||||
forgetShardId(ecVolume, shardId)
|
||||
|
@@ -14,7 +14,7 @@ import (
|
||||
|
||||
func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
|
||||
|
||||
count, err := s.ReadEcShardNeedle(ctx, ecVolume.VolumeId, n)
|
||||
count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -24,7 +24,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin
|
||||
return 0, fmt.Errorf("unexpected cookie %x", cookie)
|
||||
}
|
||||
|
||||
if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx, ecVolume, n.Id); err != nil {
|
||||
if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin
|
||||
|
||||
}
|
||||
|
||||
func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
|
||||
func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
|
||||
|
||||
_, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
|
||||
|
||||
@@ -43,13 +43,13 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context,
|
||||
shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
|
||||
|
||||
hasDeletionSuccess := false
|
||||
err = s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId)
|
||||
err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
|
||||
if err == nil {
|
||||
hasDeletionSuccess = true
|
||||
}
|
||||
|
||||
for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
|
||||
if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId); parityDeletionError == nil {
|
||||
if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
|
||||
hasDeletionSuccess = true
|
||||
}
|
||||
}
|
||||
@@ -62,7 +62,7 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context,
|
||||
|
||||
}
|
||||
|
||||
func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
|
||||
func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
|
||||
|
||||
ecVolume.ShardLocationsLock.RLock()
|
||||
sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
|
||||
@@ -74,7 +74,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar
|
||||
|
||||
for _, sourceDataNode := range sourceDataNodes {
|
||||
glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
|
||||
err := s.doDeleteNeedleFromRemoteEcShard(ctx, sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
|
||||
err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -85,12 +85,12 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar
|
||||
|
||||
}
|
||||
|
||||
func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
|
||||
func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
|
||||
|
||||
return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||
return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
// copy data slice
|
||||
_, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{
|
||||
_, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
|
||||
VolumeId: uint32(vid),
|
||||
Collection: collection,
|
||||
FileKey: uint64(needleId),
|
||||
|
@@ -72,9 +72,9 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
|
||||
|
||||
writeOffset := int64(startFromOffset)
|
||||
|
||||
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
|
||||
stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{
|
||||
VolumeId: uint32(v.Id),
|
||||
SinceNs: appendAtNs,
|
||||
})
|
||||
|
Reference in New Issue
Block a user