Files
seaweedfs/weed/storage/needle/needle_write.go

109 lines
3.0 KiB
Go
Raw Normal View History

2022-06-03 23:31:14 -07:00
package needle
import (
2025-06-06 01:35:48 -07:00
"bytes"
2022-06-03 23:31:14 -07:00
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
2025-06-06 01:35:48 -07:00
"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
2022-06-03 23:31:14 -07:00
)
func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
2025-06-06 01:35:48 -07:00
end, _, e := w.GetStat()
if e != nil {
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
offset = uint64(end)
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
return
}
bytesBuffer := buffer_pool.SyncPoolGetBuffer()
defer func() {
if err != nil {
if te := w.Truncate(end); te != nil {
// handle error or log
}
}
buffer_pool.SyncPoolPutBuffer(bytesBuffer)
}()
2022-06-03 23:31:14 -07:00
switch version {
case Version1:
2025-06-06 08:55:32 -07:00
size, actualSize, err = writeNeedleV1(n, offset, bytesBuffer)
case Version2:
2025-06-06 08:55:32 -07:00
size, actualSize, err = writeNeedleV2(n, offset, bytesBuffer)
case Version3:
2025-06-06 08:55:32 -07:00
size, actualSize, err = writeNeedleV3(n, offset, bytesBuffer)
default:
err = fmt.Errorf("unsupported version: %d", version)
2022-06-03 23:31:14 -07:00
return
}
2025-06-06 08:55:32 -07:00
if err == nil {
_, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset))
if err != nil {
err = fmt.Errorf("failed to write %d bytes to %s at offset %d: %w", actualSize, w.Name(), offset, err)
}
}
return offset, size, actualSize, err
2022-06-03 23:31:14 -07:00
}
func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) {
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.BackendStorageFile, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
}
}
}(w, end)
offset = uint64(end)
} else {
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
return
}
if version == Version3 {
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs)
}
if err == nil {
_, err = w.WriteAt(dataSlice, int64(offset))
}
return
}
2025-06-06 01:35:48 -07:00
// prepareNeedleWrite encapsulates the common beginning logic for all versioned writeNeedle functions.
func prepareNeedleWrite(w backend.BackendStorageFile, n *Needle) (offset uint64, bytesBuffer *bytes.Buffer, cleanup func(err error), err error) {
end, _, e := w.GetStat()
if e != nil {
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
offset = uint64(end)
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
return
}
bytesBuffer = buffer_pool.SyncPoolGetBuffer()
cleanup = func(err error) {
if err != nil {
if te := w.Truncate(end); te != nil {
// handle error or log
}
}
buffer_pool.SyncPoolPutBuffer(bytesBuffer)
}
return
}