From 3cf04eabefa1e29e17d4d23d3f2e6e85ce93852b Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 7 Jun 2025 23:15:23 -0700 Subject: [PATCH] make ReadNeedleMeta a thin wrapper --- weed/storage/needle/needle_read_options.go | 82 +++++++++---------- .../needle/needle_read_options_test.go | 3 + weed/storage/needle/needle_read_page.go | 62 ++------------ 3 files changed, 51 insertions(+), 96 deletions(-) diff --git a/weed/storage/needle/needle_read_options.go b/weed/storage/needle/needle_read_options.go index 77f0f26b8..3e1167aa0 100644 --- a/weed/storage/needle/needle_read_options.go +++ b/weed/storage/needle/needle_read_options.go @@ -1,6 +1,7 @@ package needle import ( + "fmt" "io" "github.com/seaweedfs/seaweedfs/weed/storage/backend" @@ -18,7 +19,13 @@ type NeedleReadOptions struct { // ReadFromFile reads the Needle from the backend file according to the specified options. // - If only ReadHeader is true, only the header is read and parsed. // - If ReadData or ReadMeta is true, reads GetActualSize(size, version) bytes from disk (size is the logical body size). -func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size Size, version Version, opts NeedleReadOptions) error { +func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size Size, version Version, opts NeedleReadOptions) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic occurred: %+v", r) + } + }() + if opts.ReadHeader && !opts.ReadData && !opts.ReadMeta { // Only read the header header := make([]byte, NeedleHeaderSize) @@ -33,63 +40,54 @@ func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size S return nil } if opts.ReadHeader && opts.ReadMeta && !opts.ReadData { - // Read header first - header := make([]byte, NeedleHeaderSize) - count, err := r.ReadAt(header, offset) - if err == io.EOF && count == NeedleHeaderSize { + // Optimized: Read header and DataSize in one call + buf := make([]byte, NeedleHeaderSize+DataSizeSize) + count, err := r.ReadAt(buf, offset) + if err == io.EOF && count == NeedleHeaderSize+DataSizeSize { err = nil } - if count != NeedleHeaderSize || err != nil { + if count != NeedleHeaderSize+DataSizeSize || err != nil { return err } - n.ParseNeedleHeader(header) + n.ParseNeedleHeader(buf[:NeedleHeaderSize]) + if n.Size != size { + if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { + return ErrorSizeMismatch + } + } // Now read meta fields after DataSize+Data - metaOffset := offset + int64(NeedleHeaderSize) - metaIndex := 0 if version == Version2 || version == Version3 { - // Read DataSize to know how much to skip - dsBuf := make([]byte, 4) - count, err := r.ReadAt(dsBuf, metaOffset) - if err == io.EOF && count == 4 { - err = nil + n.DataSize = util.BytesToUint32(buf[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize]) + + startOffset := offset + NeedleHeaderSize + if size.IsValid() { + startOffset = offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize) } - if count != 4 || err != nil { - return err - } - dataSize := int(util.BytesToUint32(dsBuf)) - metaIndex = 4 + dataSize - // Read meta fields (Flags, Name, Mime, etc.) - metaFieldsLen := int(n.Size) - dataSize // upper bound, may be more than needed + dataSize := GetActualSize(size, version) + stopOffset := offset + dataSize + metaFieldsLen := stopOffset - startOffset + metaFieldsBuf := make([]byte, metaFieldsLen) - count, err = r.ReadAt(metaFieldsBuf, metaOffset+int64(metaIndex)) - if err == io.EOF && count == metaFieldsLen { + count, err = r.ReadAt(metaFieldsBuf, startOffset) + if err == io.EOF && int64(count) == metaFieldsLen { err = nil } if count <= 0 || err != nil { return err } - _, err = n.readNeedleDataVersion2NonData(metaFieldsBuf) - if err != nil { - return err + + var index int + if size.IsValid() { + index, err = n.readNeedleDataVersion2NonData(metaFieldsBuf) + if err != nil { + return err + } } - // Now read checksum and (for v3) appendAtNs at the end - endMetaOffset := offset + int64(NeedleHeaderSize) + int64(n.Size) - endMetaLen := NeedleChecksumSize + + n.Checksum = CRC(util.BytesToUint32(metaFieldsBuf[index : index+NeedleChecksumSize])) if version == Version3 { - endMetaLen += TimestampSize - } - endMetaBuf := make([]byte, endMetaLen) - count, err = r.ReadAt(endMetaBuf, endMetaOffset) - if err == io.EOF && count == endMetaLen { - err = nil - } - if count != endMetaLen || err != nil { - return err - } - n.Checksum = CRC(util.BytesToUint32(endMetaBuf[:NeedleChecksumSize])) - if version == Version3 { - n.AppendAtNs = util.BytesToUint64(endMetaBuf[NeedleChecksumSize : NeedleChecksumSize+TimestampSize]) + n.AppendAtNs = util.BytesToUint64(metaFieldsBuf[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize]) } return nil } diff --git a/weed/storage/needle/needle_read_options_test.go b/weed/storage/needle/needle_read_options_test.go index f86bab57a..03452b02a 100644 --- a/weed/storage/needle/needle_read_options_test.go +++ b/weed/storage/needle/needle_read_options_test.go @@ -150,6 +150,9 @@ func TestReadFromFile_OptionsMatrix(t *testing.T) { if nMeta.Cookie != n.Cookie || nMeta.Id != n.Id || nMeta.Size != n.Size { t.Errorf("header fields mismatch: got %+v want %+v", nMeta, n) } + if nMeta.Checksum != n.Checksum { + t.Errorf("checksum mismatch: got %d want %d", nMeta.Checksum, n.Checksum) + } }) t.Run("ReadHeader+ReadData", func(t *testing.T) { diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index 4e1032de8..fab52c8d9 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -1,12 +1,11 @@ package needle import ( - "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util" - "io" ) // ReadNeedleData uses a needle without n.Data to read the content @@ -33,57 +32,12 @@ func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64 } // ReadNeedleMeta fills all metadata except the n.Data -func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) { - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("panic occurred: %+v", r) - } - }() - - bytes := make([]byte, NeedleHeaderSize+DataSizeSize) - - count, err := r.ReadAt(bytes, offset) - if err == io.EOF && count == NeedleHeaderSize+DataSizeSize { - err = nil - } - if count != NeedleHeaderSize+DataSizeSize || err != nil { - return err - } - n.ParseNeedleHeader(bytes) - if n.Size != size { - if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { - return ErrorSizeMismatch - } - } - n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize]) - startOffset := offset + NeedleHeaderSize - if size.IsValid() { - startOffset = offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize) - } - dataSize := GetActualSize(size, version) - stopOffset := offset + dataSize - metaSize := stopOffset - startOffset - metaSlice := make([]byte, int(metaSize)) - - count, err = r.ReadAt(metaSlice, startOffset) - if err != nil && int64(count) == metaSize { - err = nil - } - if err != nil { - return err - } - - var index int - if size.IsValid() { - index, err = n.readNeedleDataVersion2NonData(metaSlice) - } - - n.Checksum = CRC(util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize])) - if version == Version3 { - n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize]) - } - return err - +func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) error { + return n.ReadFromFile(r, offset, size, version, NeedleReadOptions{ + ReadHeader: true, + ReadMeta: true, + ReadData: false, + }) } func min(x, y int64) int64 {