mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-08-20 05:10:44 +08:00
make ReadNeedleMeta a thin wrapper
This commit is contained in:
parent
69df7a9182
commit
3cf04eabef
@ -1,6 +1,7 @@
|
||||
package needle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||
@ -18,7 +19,13 @@ type NeedleReadOptions struct {
|
||||
// ReadFromFile reads the Needle from the backend file according to the specified options.
|
||||
// - If only ReadHeader is true, only the header is read and parsed.
|
||||
// - If ReadData or ReadMeta is true, reads GetActualSize(size, version) bytes from disk (size is the logical body size).
|
||||
func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size Size, version Version, opts NeedleReadOptions) error {
|
||||
func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size Size, version Version, opts NeedleReadOptions) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("panic occurred: %+v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if opts.ReadHeader && !opts.ReadData && !opts.ReadMeta {
|
||||
// Only read the header
|
||||
header := make([]byte, NeedleHeaderSize)
|
||||
@ -33,63 +40,54 @@ func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size S
|
||||
return nil
|
||||
}
|
||||
if opts.ReadHeader && opts.ReadMeta && !opts.ReadData {
|
||||
// Read header first
|
||||
header := make([]byte, NeedleHeaderSize)
|
||||
count, err := r.ReadAt(header, offset)
|
||||
if err == io.EOF && count == NeedleHeaderSize {
|
||||
// Optimized: Read header and DataSize in one call
|
||||
buf := make([]byte, NeedleHeaderSize+DataSizeSize)
|
||||
count, err := r.ReadAt(buf, offset)
|
||||
if err == io.EOF && count == NeedleHeaderSize+DataSizeSize {
|
||||
err = nil
|
||||
}
|
||||
if count != NeedleHeaderSize || err != nil {
|
||||
if count != NeedleHeaderSize+DataSizeSize || err != nil {
|
||||
return err
|
||||
}
|
||||
n.ParseNeedleHeader(header)
|
||||
n.ParseNeedleHeader(buf[:NeedleHeaderSize])
|
||||
if n.Size != size {
|
||||
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
|
||||
return ErrorSizeMismatch
|
||||
}
|
||||
}
|
||||
|
||||
// Now read meta fields after DataSize+Data
|
||||
metaOffset := offset + int64(NeedleHeaderSize)
|
||||
metaIndex := 0
|
||||
if version == Version2 || version == Version3 {
|
||||
// Read DataSize to know how much to skip
|
||||
dsBuf := make([]byte, 4)
|
||||
count, err := r.ReadAt(dsBuf, metaOffset)
|
||||
if err == io.EOF && count == 4 {
|
||||
err = nil
|
||||
n.DataSize = util.BytesToUint32(buf[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
|
||||
|
||||
startOffset := offset + NeedleHeaderSize
|
||||
if size.IsValid() {
|
||||
startOffset = offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
|
||||
}
|
||||
if count != 4 || err != nil {
|
||||
return err
|
||||
}
|
||||
dataSize := int(util.BytesToUint32(dsBuf))
|
||||
metaIndex = 4 + dataSize
|
||||
// Read meta fields (Flags, Name, Mime, etc.)
|
||||
metaFieldsLen := int(n.Size) - dataSize // upper bound, may be more than needed
|
||||
dataSize := GetActualSize(size, version)
|
||||
stopOffset := offset + dataSize
|
||||
metaFieldsLen := stopOffset - startOffset
|
||||
|
||||
metaFieldsBuf := make([]byte, metaFieldsLen)
|
||||
count, err = r.ReadAt(metaFieldsBuf, metaOffset+int64(metaIndex))
|
||||
if err == io.EOF && count == metaFieldsLen {
|
||||
count, err = r.ReadAt(metaFieldsBuf, startOffset)
|
||||
if err == io.EOF && int64(count) == metaFieldsLen {
|
||||
err = nil
|
||||
}
|
||||
if count <= 0 || err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = n.readNeedleDataVersion2NonData(metaFieldsBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
var index int
|
||||
if size.IsValid() {
|
||||
index, err = n.readNeedleDataVersion2NonData(metaFieldsBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Now read checksum and (for v3) appendAtNs at the end
|
||||
endMetaOffset := offset + int64(NeedleHeaderSize) + int64(n.Size)
|
||||
endMetaLen := NeedleChecksumSize
|
||||
|
||||
n.Checksum = CRC(util.BytesToUint32(metaFieldsBuf[index : index+NeedleChecksumSize]))
|
||||
if version == Version3 {
|
||||
endMetaLen += TimestampSize
|
||||
}
|
||||
endMetaBuf := make([]byte, endMetaLen)
|
||||
count, err = r.ReadAt(endMetaBuf, endMetaOffset)
|
||||
if err == io.EOF && count == endMetaLen {
|
||||
err = nil
|
||||
}
|
||||
if count != endMetaLen || err != nil {
|
||||
return err
|
||||
}
|
||||
n.Checksum = CRC(util.BytesToUint32(endMetaBuf[:NeedleChecksumSize]))
|
||||
if version == Version3 {
|
||||
n.AppendAtNs = util.BytesToUint64(endMetaBuf[NeedleChecksumSize : NeedleChecksumSize+TimestampSize])
|
||||
n.AppendAtNs = util.BytesToUint64(metaFieldsBuf[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -150,6 +150,9 @@ func TestReadFromFile_OptionsMatrix(t *testing.T) {
|
||||
if nMeta.Cookie != n.Cookie || nMeta.Id != n.Id || nMeta.Size != n.Size {
|
||||
t.Errorf("header fields mismatch: got %+v want %+v", nMeta, n)
|
||||
}
|
||||
if nMeta.Checksum != n.Checksum {
|
||||
t.Errorf("checksum mismatch: got %d want %d", nMeta.Checksum, n.Checksum)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ReadHeader+ReadData", func(t *testing.T) {
|
||||
|
@ -1,12 +1,11 @@
|
||||
package needle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"io"
|
||||
)
|
||||
|
||||
// ReadNeedleData uses a needle without n.Data to read the content
|
||||
@ -33,57 +32,12 @@ func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64
|
||||
}
|
||||
|
||||
// ReadNeedleMeta fills all metadata except the n.Data
|
||||
func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("panic occurred: %+v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
|
||||
|
||||
count, err := r.ReadAt(bytes, offset)
|
||||
if err == io.EOF && count == NeedleHeaderSize+DataSizeSize {
|
||||
err = nil
|
||||
}
|
||||
if count != NeedleHeaderSize+DataSizeSize || err != nil {
|
||||
return err
|
||||
}
|
||||
n.ParseNeedleHeader(bytes)
|
||||
if n.Size != size {
|
||||
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
|
||||
return ErrorSizeMismatch
|
||||
}
|
||||
}
|
||||
n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
|
||||
startOffset := offset + NeedleHeaderSize
|
||||
if size.IsValid() {
|
||||
startOffset = offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
|
||||
}
|
||||
dataSize := GetActualSize(size, version)
|
||||
stopOffset := offset + dataSize
|
||||
metaSize := stopOffset - startOffset
|
||||
metaSlice := make([]byte, int(metaSize))
|
||||
|
||||
count, err = r.ReadAt(metaSlice, startOffset)
|
||||
if err != nil && int64(count) == metaSize {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var index int
|
||||
if size.IsValid() {
|
||||
index, err = n.readNeedleDataVersion2NonData(metaSlice)
|
||||
}
|
||||
|
||||
n.Checksum = CRC(util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize]))
|
||||
if version == Version3 {
|
||||
n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
|
||||
}
|
||||
return err
|
||||
|
||||
func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) error {
|
||||
return n.ReadFromFile(r, offset, size, version, NeedleReadOptions{
|
||||
ReadHeader: true,
|
||||
ReadMeta: true,
|
||||
ReadData: false,
|
||||
})
|
||||
}
|
||||
|
||||
func min(x, y int64) int64 {
|
||||
|
Loading…
Reference in New Issue
Block a user