mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 18:17:23 +08:00
Move memory_map_windows to new folder, intercept most of the read and write calls to volume dat files
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/joeslay/seaweedfs/weed/storage/memory_map"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -29,39 +30,25 @@ func (n *Needle) DiskSize(version Version) int64 {
|
||||
return GetActualSize(n.Size, version)
|
||||
}
|
||||
|
||||
func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
|
||||
if end, e := w.Seek(0, io.SeekEnd); e == nil {
|
||||
defer func(w *os.File, off int64) {
|
||||
if err != nil {
|
||||
if te := w.Truncate(end); te != nil {
|
||||
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
|
||||
}
|
||||
}
|
||||
}(w, end)
|
||||
offset = uint64(end)
|
||||
} else {
|
||||
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
|
||||
return
|
||||
}
|
||||
func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) {
|
||||
|
||||
writeBytes := make([]byte, 0)
|
||||
|
||||
switch version {
|
||||
case Version1:
|
||||
header := make([]byte, NeedleHeaderSize)
|
||||
CookieToBytes(header[0:CookieSize], n.Cookie)
|
||||
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
|
||||
n.Size = uint32(len(n.Data))
|
||||
size = n.Size
|
||||
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
|
||||
if _, err = w.Write(header); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Data); err != nil {
|
||||
return
|
||||
}
|
||||
actualSize = NeedleHeaderSize + int64(n.Size)
|
||||
size := n.Size
|
||||
actualSize := NeedleHeaderSize + int64(n.Size)
|
||||
writeBytes = append(writeBytes, header...)
|
||||
writeBytes = append(writeBytes, n.Data...)
|
||||
padding := PaddingLength(n.Size, version)
|
||||
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
|
||||
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
|
||||
return
|
||||
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
|
||||
return writeBytes, size, actualSize, nil
|
||||
case Version2, Version3:
|
||||
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
|
||||
CookieToBytes(header[0:CookieSize], n.Cookie)
|
||||
@@ -92,82 +79,103 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32
|
||||
} else {
|
||||
n.Size = 0
|
||||
}
|
||||
size = n.DataSize
|
||||
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
|
||||
if _, err = w.Write(header[0:NeedleHeaderSize]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...)
|
||||
if n.DataSize > 0 {
|
||||
util.Uint32toBytes(header[0:4], n.DataSize)
|
||||
if _, err = w.Write(header[0:4]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Data); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:4]...)
|
||||
writeBytes = append(writeBytes, n.Data...)
|
||||
util.Uint8toBytes(header[0:1], n.Flags)
|
||||
if _, err = w.Write(header[0:1]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:1]...)
|
||||
if n.HasName() {
|
||||
util.Uint8toBytes(header[0:1], n.NameSize)
|
||||
if _, err = w.Write(header[0:1]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Name[:n.NameSize]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:1]...)
|
||||
writeBytes = append(writeBytes, n.Name[:n.NameSize]...)
|
||||
}
|
||||
if n.HasMime() {
|
||||
util.Uint8toBytes(header[0:1], n.MimeSize)
|
||||
if _, err = w.Write(header[0:1]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Mime); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:1]...)
|
||||
writeBytes = append(writeBytes, n.Mime...)
|
||||
}
|
||||
if n.HasLastModifiedDate() {
|
||||
util.Uint64toBytes(header[0:8], n.LastModified)
|
||||
if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...)
|
||||
}
|
||||
if n.HasTtl() && n.Ttl != nil {
|
||||
n.Ttl.ToBytes(header[0:TtlBytesLength])
|
||||
if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:TtlBytesLength]...)
|
||||
}
|
||||
if n.HasPairs() {
|
||||
util.Uint16toBytes(header[0:2], n.PairsSize)
|
||||
if _, err = w.Write(header[0:2]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Pairs); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:2]...)
|
||||
writeBytes = append(writeBytes, n.Pairs...)
|
||||
}
|
||||
}
|
||||
padding := PaddingLength(n.Size, version)
|
||||
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
|
||||
if version == Version2 {
|
||||
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
|
||||
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
|
||||
} else {
|
||||
// version3
|
||||
util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs)
|
||||
_, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding])
|
||||
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...)
|
||||
}
|
||||
|
||||
return offset, n.DataSize, GetActualSize(n.Size, version), err
|
||||
return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil
|
||||
}
|
||||
return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
||||
|
||||
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
||||
}
|
||||
|
||||
func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
|
||||
|
||||
mem_map, exists := memory_map.FileMemoryMap[w.Name()]
|
||||
if !exists {
|
||||
if end, e := w.Seek(0, io.SeekEnd); e == nil {
|
||||
defer func(w *os.File, off int64) {
|
||||
if err != nil {
|
||||
if te := w.Truncate(end); te != nil {
|
||||
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
|
||||
}
|
||||
}
|
||||
}(w, end)
|
||||
offset = uint64(end)
|
||||
} else {
|
||||
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
offset = uint64(mem_map.End_Of_File + 1)
|
||||
}
|
||||
|
||||
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
|
||||
|
||||
if err == nil {
|
||||
if exists {
|
||||
memory_map.WriteMemory(mem_map, offset, uint64(len(bytesToWrite)), bytesToWrite)
|
||||
} else {
|
||||
_, err = w.Write(bytesToWrite)
|
||||
}
|
||||
}
|
||||
|
||||
return offset, size, actualSize, err
|
||||
}
|
||||
|
||||
func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
|
||||
dataSlice = make([]byte, int(GetActualSize(size, version)))
|
||||
_, err = r.ReadAt(dataSlice, offset)
|
||||
return dataSlice, err
|
||||
|
||||
dataSize := GetActualSize(size, version)
|
||||
dataSlice = make([]byte, dataSize)
|
||||
|
||||
mem_map, exists := memory_map.FileMemoryMap[r.Name()]
|
||||
if exists {
|
||||
mem_buffer, err := memory_map.ReadMemory(mem_map, uint64(offset), uint64(dataSize))
|
||||
copy(dataSlice, mem_buffer.Buffer)
|
||||
memory_map.ReleaseMemory(mem_buffer)
|
||||
return dataSlice, err
|
||||
} else {
|
||||
_, err = r.ReadAt(dataSlice, offset)
|
||||
return dataSlice, err
|
||||
}
|
||||
}
|
||||
|
||||
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
|
||||
@@ -280,14 +288,27 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, byt
|
||||
n = new(Needle)
|
||||
if version == Version1 || version == Version2 || version == Version3 {
|
||||
bytes = make([]byte, NeedleHeaderSize)
|
||||
var count int
|
||||
count, err = r.ReadAt(bytes, offset)
|
||||
if count <= 0 || err != nil {
|
||||
return nil, bytes, 0, err
|
||||
|
||||
mem_map, exists := memory_map.FileMemoryMap[r.Name()]
|
||||
if exists {
|
||||
mem_buffer, err := memory_map.ReadMemory(mem_map, uint64(offset), NeedleHeaderSize)
|
||||
copy(bytes, mem_buffer.Buffer)
|
||||
memory_map.ReleaseMemory(mem_buffer)
|
||||
|
||||
if err != nil {
|
||||
return nil, bytes, 0, err
|
||||
}
|
||||
} else {
|
||||
var count int
|
||||
count, err = r.ReadAt(bytes, offset)
|
||||
if count <= 0 || err != nil {
|
||||
return nil, bytes, 0, err
|
||||
}
|
||||
}
|
||||
n.ParseNeedleHeader(bytes)
|
||||
bodyLength = NeedleBodyLength(n.Size, version)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user