mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
delete in ecj, add logs
This commit is contained in:
@@ -469,7 +469,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
|
|||||||
|
|
||||||
func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
|
func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
|
||||||
|
|
||||||
glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
|
glog.Infof("🔍 GRPC EC BLOB DELETE: volume %d, needle %d", req.VolumeId, req.FileKey)
|
||||||
|
|
||||||
resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
|
resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
|
||||||
|
|
||||||
@@ -481,14 +481,18 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
|
|||||||
return nil, fmt.Errorf("locate in local ec volume: %w", err)
|
return nil, fmt.Errorf("locate in local ec volume: %w", err)
|
||||||
}
|
}
|
||||||
if size.IsDeleted() {
|
if size.IsDeleted() {
|
||||||
|
glog.Infof("✅ GRPC EC DELETE: needle %d already deleted", req.FileKey)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
glog.Infof("📝 GRPC EC DELETE: recording needle %d in .ecj", req.FileKey)
|
||||||
err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
|
err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf("❌ GRPC EC DELETE: failed to record needle %d: %v", req.FileKey, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
glog.Infof("✅ GRPC EC DELETE: successfully recorded needle %d", req.FileKey)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,10 +88,15 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
|
ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
|
||||||
|
|
||||||
|
glog.Infof("🔍 DELETE REQUEST: volume %d, needle %d, hasEcVolume=%t", volumeId, n.Id, hasEcVolume)
|
||||||
|
|
||||||
if hasEcVolume {
|
if hasEcVolume {
|
||||||
|
glog.Infof("🎯 ROUTING TO EC DELETION: volume %d, needle %d", volumeId, n.Id)
|
||||||
count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie)
|
count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie)
|
||||||
writeDeleteResult(err, count, w, r)
|
writeDeleteResult(err, count, w, r)
|
||||||
return
|
return
|
||||||
|
} else {
|
||||||
|
glog.Infof("🎯 ROUTING TO REGULAR DELETION: volume %d, needle %d", volumeId, n.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
|
_, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
@@ -30,20 +31,48 @@ func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == NotFoundError {
|
if err == NotFoundError {
|
||||||
|
glog.Infof("❓ EC NEEDLE NOT FOUND: needle %d not in .ecx index for volume %d generation %d - skipping .ecj recording",
|
||||||
|
needleId, ev.VolumeId, ev.Generation)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
glog.Errorf("❌ EC INDEX SEARCH ERROR: needle %d volume %d generation %d: %v", needleId, ev.VolumeId, ev.Generation, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Needle found and marked deleted in .ecx, now record in .ecj
|
||||||
|
glog.Infof("📝 EC NEEDLE FOUND: recording needle %d in .ecj for volume %d generation %d",
|
||||||
|
needleId, ev.VolumeId, ev.Generation)
|
||||||
|
|
||||||
b := make([]byte, types.NeedleIdSize)
|
b := make([]byte, types.NeedleIdSize)
|
||||||
types.NeedleIdToBytes(b, needleId)
|
types.NeedleIdToBytes(b, needleId)
|
||||||
|
|
||||||
ev.ecjFileAccessLock.Lock()
|
ev.ecjFileAccessLock.Lock()
|
||||||
|
defer ev.ecjFileAccessLock.Unlock()
|
||||||
|
|
||||||
ev.ecjFile.Seek(0, io.SeekEnd)
|
if ev.ecjFile == nil {
|
||||||
ev.ecjFile.Write(b)
|
glog.Errorf("EC deletion: .ecj file is nil for volume %d generation %d", ev.VolumeId, ev.Generation)
|
||||||
|
return fmt.Errorf("ecjFile is nil")
|
||||||
|
}
|
||||||
|
|
||||||
ev.ecjFileAccessLock.Unlock()
|
_, err = ev.ecjFile.Seek(0, io.SeekEnd)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("EC deletion: failed to seek .ecj file for volume %d generation %d: %v", ev.VolumeId, ev.Generation, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := ev.ecjFile.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("EC deletion: failed to write to .ecj file for volume %d generation %d: %v", ev.VolumeId, ev.Generation, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if n != len(b) {
|
||||||
|
glog.Errorf("EC deletion: partial write to .ecj file for volume %d generation %d: wrote %d bytes, expected %d",
|
||||||
|
ev.VolumeId, ev.Generation, n, len(b))
|
||||||
|
return fmt.Errorf("partial write: wrote %d bytes, expected %d", n, len(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infof("✅ EC JOURNAL WRITE SUCCESS: wrote %d bytes to .ecj for volume %d generation %d", n, ev.VolumeId, ev.Generation)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package storage
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
@@ -15,6 +16,8 @@ import (
|
|||||||
|
|
||||||
func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
|
func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
|
||||||
|
|
||||||
|
glog.Infof("🚀 EC DELETE SHARD NEEDLE: starting deletion for needle %d volume %d", n.Id, ecVolume.VolumeId)
|
||||||
|
|
||||||
count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
|
count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -29,6 +32,16 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record the deletion locally in the .ecj journal file
|
||||||
|
glog.Infof("🔍 EC DELETION: Recording needle %d in volume %d generation %d",
|
||||||
|
n.Id, ecVolume.VolumeId, ecVolume.Generation)
|
||||||
|
if err = ecVolume.DeleteNeedleFromEcx(n.Id); err != nil {
|
||||||
|
glog.Errorf("❌ Failed to record EC deletion in journal for needle %d: %v", n.Id, err)
|
||||||
|
// Continue even if journal write fails - the remote deletion succeeded
|
||||||
|
} else {
|
||||||
|
glog.Infof("✅ EC deletion recording completed for needle %d", n.Id)
|
||||||
|
}
|
||||||
|
|
||||||
return int64(count), nil
|
return int64(count), nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user