ec deletion works

This commit is contained in:
Chris Lu
2019-06-21 01:14:10 -07:00
parent 613a2e8060
commit f88a8bda7b
8 changed files with 189 additions and 147 deletions

View File

@@ -172,9 +172,9 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
// find the needle from ecx file
offset, size, err = ev.findNeedleFromEcx(needleId)
offset, size, err = ev.FindNeedleFromEcx(needleId)
if err != nil {
return types.Offset{}, 0, nil, fmt.Errorf("findNeedleFromEcx: %v", err)
return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
}
shard := ev.Shards[0]
@@ -185,7 +185,7 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
return
}
func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}

View File

@@ -158,7 +158,7 @@ func (n *Needle) ParsePath(fid string) (err error) {
}
if delta != "" {
if d, e := strconv.ParseUint(delta, 10, 64); e == nil {
n.Id += NeedleId(d)
n.Id += Uint64ToNeedleId(d)
} else {
return e
}

View File

@@ -85,7 +85,7 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
}
offset := BytesToOffset(data[0:OffsetSize])
size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
return &needle_map.NeedleValue{Key: NeedleId(key), Offset: offset, Size: size}, true
return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
}
func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {

View File

@@ -53,7 +53,7 @@ func (mm *mapMetric) DeletedCount() int {
}
func (mm *mapMetric) MaxFileKey() NeedleId {
t := uint64(mm.MaximumFileKey)
return NeedleId(t)
return Uint64ToNeedleId(t)
}
func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
if key > mm.MaxFileKey() {

View File

@@ -135,15 +135,18 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
return 0, fmt.Errorf("entry %s is deleted", n.Id)
}
glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
if len(intervals) > 1 {
glog.V(4).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
}
bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals)
bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals)
if err != nil {
return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
}
if isDeleted {
return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
}
err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
if err != nil {
@@ -165,23 +168,26 @@ func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ec
IsLargeBlock: true, // it could be large block, but ok in this place
LargeBlockRowsCount: 0,
}
data, err := s.readEcShardIntervals(ctx, vid, ecVolume, []erasure_coding.Interval{interval})
data, _, err := s.readEcShardIntervals(ctx, vid, 0, ecVolume, []erasure_coding.Interval{interval})
if err == nil {
ecVolume.Version = needle.Version(data[0])
}
return
}
func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) {
func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
}
for i, interval := range intervals {
if d, e := s.readOneEcShardInterval(ctx, ecVolume, interval); e != nil {
return nil, e
if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil {
return nil, isDeleted, e
} else {
if isDeleted {
is_deleted = true
}
if i == 0 {
data = d
} else {
@@ -192,7 +198,7 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, e
return
}
func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, err error) {
func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
@@ -207,7 +213,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co
// try reading directly
if hasShardIdLocation {
_, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
_, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
if err == nil {
return
}
@@ -216,7 +222,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co
}
// try reading by recovering from other shards
_, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset)
_, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset)
if err == nil {
return
}
@@ -236,11 +242,11 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
shardCount := len(ecVolume.ShardLocations)
if shardCount < erasure_coding.DataShardsCount &&
ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
ecVolume.ShardLocationsRefreshTime.Add(11 * time.Second).After(time.Now()) ||
shardCount == erasure_coding.TotalShardsCount &&
ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
ecVolume.ShardLocationsRefreshTime.Add(37 * time.Minute).After(time.Now()) ||
shardCount >= erasure_coding.DataShardsCount &&
ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
ecVolume.ShardLocationsRefreshTime.Add(7 * time.Minute).After(time.Now()) {
// still fresh
return nil
}
@@ -275,15 +281,15 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
return
}
func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
if len(sourceDataNodes) == 0 {
return 0, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
}
for _, sourceDataNode := range sourceDataNodes {
glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
if err == nil {
return
}
@@ -293,7 +299,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
return
}
func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
@@ -303,6 +309,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
ShardId: uint32(shardId),
Offset: offset,
Size: int64(len(buf)),
FileKey: uint64(needleId),
})
if err != nil {
return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
@@ -316,6 +323,9 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
if receiveErr != nil {
return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
}
if resp.IsDeleted {
is_deleted = true
}
copy(buf[n:n+len(resp.Data)], resp.Data)
n += len(resp.Data)
}
@@ -323,18 +333,18 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
return nil
})
if err != nil {
return 0, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
return 0, is_deleted, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
}
return
}
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
if err != nil {
return 0, fmt.Errorf("failed to create encoder: %v", err)
return 0, false, fmt.Errorf("failed to create encoder: %v", err)
}
bufs := make([][]byte, erasure_coding.TotalShardsCount)
@@ -357,11 +367,14 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *e
go func(shardId erasure_coding.ShardId, locations []string) {
defer wg.Done()
data := make([]byte, len(buf))
nRead, readErr := s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset)
if readErr != nil {
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
forgetShardId(ecVolume, shardId)
}
if isDeleted {
is_deleted = true
}
if nRead == len(buf) {
bufs[shardId] = data
}
@@ -373,13 +386,13 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *e
if err = enc.ReconstructData(bufs); err != nil {
glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
return 0, err
return 0, false, err
}
glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
copy(buf, bufs[shardIdToRecover])
return len(buf), nil
return len(buf), is_deleted, nil
}
func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {