mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-19 14:47:57 +08:00
ADHOC: add read needle meta grpc (#3581)
* ADHOC: add read needle meta grpc * add test * nit Co-authored-by: root <root@HQ-10MSTD3EY.roblox.local>
This commit is contained in:
@@ -210,10 +210,14 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
|
||||
return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
|
||||
}
|
||||
|
||||
shard := ev.Shards[0]
|
||||
intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
|
||||
return
|
||||
}
|
||||
|
||||
func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
|
||||
shard := ev.Shards[0]
|
||||
// calculate the locations in the ec shards
|
||||
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
|
||||
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset, types.Size(needle.GetActualSize(size, version)))
|
||||
|
||||
return
|
||||
}
|
||||
|
@@ -209,8 +209,8 @@ func NeedleBodyLength(needleSize Size, version Version) int64 {
|
||||
return int64(needleSize) + NeedleChecksumSize + int64(PaddingLength(needleSize, version))
|
||||
}
|
||||
|
||||
//n should be a needle already read the header
|
||||
//the input stream will read until next file entry
|
||||
// n should be a needle already read the header
|
||||
// the input stream will read until next file entry
|
||||
func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
|
||||
|
||||
if bodyLength <= 0 {
|
||||
|
@@ -74,7 +74,6 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
|
||||
return ErrorSizeMismatch
|
||||
}
|
||||
}
|
||||
|
||||
n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
|
||||
|
||||
startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
|
||||
@@ -90,7 +89,6 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var index int
|
||||
index, err = n.readNeedleDataVersion2NonData(metaSlice)
|
||||
|
||||
|
@@ -384,6 +384,14 @@ func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption
|
||||
}
|
||||
return 0, fmt.Errorf("volume %d not found", i)
|
||||
}
|
||||
|
||||
func (s *Store) ReadVolumeNeedleMetaAt(i needle.VolumeId, n *needle.Needle, offset int64, size int32) error {
|
||||
if v := s.findVolume(i); v != nil {
|
||||
return v.readNeedleMetaAt(n, offset, size)
|
||||
}
|
||||
return fmt.Errorf("volume %d not found", i)
|
||||
}
|
||||
|
||||
func (s *Store) ReadVolumeNeedleDataInto(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) error {
|
||||
if v := s.findVolume(i); v != nil {
|
||||
return v.readNeedleDataInto(n, readOption, writer, offset, size)
|
||||
|
@@ -80,6 +80,24 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize
|
||||
return -1, ErrorNotFound
|
||||
}
|
||||
|
||||
// read needle at a specific offset
|
||||
func (v *Volume) readNeedleMetaAt(n *needle.Needle, offset int64, size int32) (err error) {
|
||||
v.dataFileAccessLock.RLock()
|
||||
defer v.dataFileAccessLock.RUnlock()
|
||||
// read deleted meta data
|
||||
if size < 0 {
|
||||
size = -size
|
||||
}
|
||||
err = n.ReadNeedleMeta(v.DataBackend, offset, Size(size), v.Version())
|
||||
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
|
||||
err = n.ReadNeedleMeta(v.DataBackend, offset+int64(MaxPossibleVolumeSize), Size(size), v.Version())
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// read fills in Needle content by looking up n.Id from NeedleMapper
|
||||
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
|
||||
v.dataFileAccessLock.RLock()
|
||||
|
91
weed/storage/volume_read_test.go
Normal file
91
weed/storage/volume_read_test.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
type WriteInfo struct {
|
||||
offset int64
|
||||
size int32
|
||||
}
|
||||
writeInfos := make([]WriteInfo, 30)
|
||||
mockLastUpdateTime := uint64(1000000000000)
|
||||
// initialize 20 needles then update first 10 needles
|
||||
for i := 1; i <= 30; i++ {
|
||||
n := newRandomNeedle(uint64(i % 20))
|
||||
n.Flags = 0x08
|
||||
n.LastModified = mockLastUpdateTime
|
||||
mockLastUpdateTime += 2000
|
||||
offset, _, _, err := v.writeNeedle2(n, true, false)
|
||||
if err != nil {
|
||||
t.Fatalf("write needle %d: %v", i, err)
|
||||
}
|
||||
writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(n.Size)}
|
||||
}
|
||||
expectedLastUpdateTime := uint64(1000000000000)
|
||||
for i := 0; i < 30; i++ {
|
||||
testNeedle := new(needle.Needle)
|
||||
testNeedle.Id = types.Uint64ToNeedleId(uint64(i + 1%20))
|
||||
testNeedle.Flags = 0x08
|
||||
v.readNeedleMetaAt(testNeedle, writeInfos[i].offset, writeInfos[i].size)
|
||||
actualLastModifiedTime := testNeedle.LastModified
|
||||
assert.Equal(t, expectedLastUpdateTime, actualLastModifiedTime, "The two words should be the same.")
|
||||
expectedLastUpdateTime += 2000
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
type WriteInfo struct {
|
||||
offset int64
|
||||
size int32
|
||||
}
|
||||
writeInfos := make([]WriteInfo, 10)
|
||||
mockLastUpdateTime := uint64(1000000000000)
|
||||
for i := 1; i <= 10; i++ {
|
||||
n := newRandomNeedle(uint64(i % 5))
|
||||
n.Flags = 0x08
|
||||
n.LastModified = mockLastUpdateTime
|
||||
mockLastUpdateTime += 2000
|
||||
offset, _, _, err := v.writeNeedle2(n, true, false)
|
||||
if err != nil {
|
||||
t.Fatalf("write needle %d: %v", i, err)
|
||||
}
|
||||
if i < 5 {
|
||||
size, err := v.deleteNeedle2(n)
|
||||
if err != nil {
|
||||
t.Fatalf("delete needle %d: %v", i, err)
|
||||
}
|
||||
writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(size)}
|
||||
} else {
|
||||
writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(n.Size)}
|
||||
}
|
||||
}
|
||||
|
||||
expectedLastUpdateTime := uint64(1000000000000)
|
||||
for i := 0; i < 10; i++ {
|
||||
testNeedle := new(needle.Needle)
|
||||
testNeedle.Id = types.Uint64ToNeedleId(uint64(i + 1%5))
|
||||
testNeedle.Flags = 0x08
|
||||
v.readNeedleMetaAt(testNeedle, writeInfos[i].offset, writeInfos[i].size)
|
||||
actualLastModifiedTime := testNeedle.LastModified
|
||||
assert.Equal(t, expectedLastUpdateTime, actualLastModifiedTime, "The two words should be the same.")
|
||||
expectedLastUpdateTime += 2000
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user