fix needle map entry size

This commit is contained in:
Chris Lu
2019-04-19 00:39:34 -07:00
parent e5506152c0
commit ac2727853f
18 changed files with 195 additions and 184 deletions

View File

@@ -54,8 +54,9 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM
mutex.Lock()
l.volumes[vid] = v
mutex.Unlock()
size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}

View File

@@ -45,7 +45,7 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32
}
switch version {
case Version1:
header := make([]byte, NeedleEntrySize)
header := make([]byte, NeedleHeaderSize)
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
n.Size = uint32(len(n.Data))
@@ -57,13 +57,13 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32
if _, err = w.Write(n.Data); err != nil {
return
}
actualSize = NeedleEntrySize + int64(n.Size)
actualSize = NeedleHeaderSize + int64(n.Size)
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
return
case Version2, Version3:
header := make([]byte, NeedleEntrySize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
if len(n.Name) >= math.MaxUint8 {
@@ -94,7 +94,7 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32
}
size = n.DataSize
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
if _, err = w.Write(header[0:NeedleEntrySize]); err != nil {
if _, err = w.Write(header[0:NeedleHeaderSize]); err != nil {
return
}
if n.DataSize > 0 {
@@ -181,21 +181,21 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version
}
switch version {
case Version1:
n.Data = bytes[NeedleEntrySize : NeedleEntrySize+size]
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
case Version2, Version3:
err = n.readNeedleDataVersion2(bytes[NeedleEntrySize : NeedleEntrySize+int(n.Size)])
err = n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
}
if size == 0 || err != nil {
return err
}
checksum := util.BytesToUint32(bytes[NeedleEntrySize+size : NeedleEntrySize+size+NeedleChecksumSize])
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() {
return errors.New("CRC error! Data On Disk Corrupted")
}
n.Checksum = newChecksum
if version == Version3 {
tsOffset := NeedleEntrySize + size + NeedleChecksumSize
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize])
}
return nil
@@ -204,7 +204,7 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version
func (n *Needle) ParseNeedleHeader(bytes []byte) {
n.Cookie = BytesToCookie(bytes[0:CookieSize])
n.Id = BytesToNeedleId(bytes[CookieSize : CookieSize+NeedleIdSize])
n.Size = util.BytesToUint32(bytes[CookieSize+NeedleIdSize : NeedleEntrySize])
n.Size = util.BytesToUint32(bytes[CookieSize+NeedleIdSize : NeedleHeaderSize])
}
func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
@@ -271,7 +271,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
n = new(Needle)
if version == Version1 || version == Version2 || version == Version3 {
bytes = make([]byte, NeedleEntrySize)
bytes = make([]byte, NeedleHeaderSize)
var count int
count, err = r.ReadAt(bytes, offset)
if count <= 0 || err != nil {
@@ -286,9 +286,9 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, byt
func PaddingLength(needleSize uint32, version Version) uint32 {
if version == Version3 {
// this is same value as version2, but just listed here for clarity
return NeedlePaddingSize - ((NeedleEntrySize + needleSize + NeedleChecksumSize + TimestampSize) % NeedlePaddingSize)
return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize + TimestampSize) % NeedlePaddingSize)
}
return NeedlePaddingSize - ((NeedleEntrySize + needleSize + NeedleChecksumSize) % NeedlePaddingSize)
return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize) % NeedlePaddingSize)
}
func NeedleBodyLength(needleSize uint32, version Version) int64 {
@@ -386,6 +386,6 @@ func (n *Needle) SetHasPairs() {
}
func getActualSize(size uint32, version Version) int64 {
return NeedleEntrySize + NeedleBodyLength(size, version)
return NeedleHeaderSize + NeedleBodyLength(size, version)
}

View File

@@ -52,11 +52,11 @@ func TestMemoryUsage(t *testing.T) {
func loadNewNeedleMap(file *os.File) (*CompactMap, uint64) {
m := NewCompactMap()
bytes := make([]byte, NeedleEntrySize)
bytes := make([]byte, NeedleMapEntrySize)
rowCount := uint64(0)
count, e := file.Read(bytes)
for count > 0 && e == nil {
for i := 0; i < count; i += NeedleEntrySize {
for i := 0; i < count; i += NeedleMapEntrySize {
rowCount++
key := BytesToNeedleId(bytes[i : i+NeedleIdSize])
offset := BytesToOffset(bytes[i+NeedleIdSize : i+NeedleIdSize+OffsetSize])

View File

@@ -73,7 +73,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
// stops with the error returned by the fn function
func WalkIndexFile(r *os.File, fn func(key NeedleId, offset Offset, size uint32) error) error {
var readerOffset int64
bytes := make([]byte, NeedleEntrySize*RowsToRead)
bytes := make([]byte, NeedleMapEntrySize*RowsToRead)
count, e := r.ReadAt(bytes, readerOffset)
glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
readerOffset += int64(count)
@@ -85,8 +85,8 @@ func WalkIndexFile(r *os.File, fn func(key NeedleId, offset Offset, size uint32)
)
for count > 0 && e == nil || e == io.EOF {
for i = 0; i+NeedleEntrySize <= count; i += NeedleEntrySize {
key, offset, size = IdxFileEntry(bytes[i : i+NeedleEntrySize])
for i = 0; i+NeedleMapEntrySize <= count; i += NeedleMapEntrySize {
key, offset, size = IdxFileEntry(bytes[i : i+NeedleMapEntrySize])
if e = fn(key, offset, size); e != nil {
return e
}

View File

@@ -96,16 +96,16 @@ func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key
return fmt.Errorf("file %s stat error: %v", r.Name(), err)
}
fileSize := fi.Size()
if fileSize%NeedleEntrySize != 0 {
if fileSize%NeedleMapEntrySize != 0 {
return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize)
}
entryCount := fileSize / NeedleEntrySize
entryCount := fileSize / NeedleMapEntrySize
initFn(entryCount)
batchSize := int64(1024 * 4)
bytes := make([]byte, NeedleEntrySize*batchSize)
bytes := make([]byte, NeedleMapEntrySize*batchSize)
nextBatchSize := entryCount % batchSize
if nextBatchSize == 0 {
nextBatchSize = batchSize
@@ -113,13 +113,13 @@ func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key
remainingCount := entryCount - nextBatchSize
for remainingCount >= 0 {
_, e := r.ReadAt(bytes[:NeedleEntrySize*nextBatchSize], NeedleEntrySize*remainingCount)
// glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleEntrySize*remainingCount, "count", count, "e", e)
_, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
// glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e)
if e != nil {
return e
}
for i := int(nextBatchSize) - 1; i >= 0; i-- {
key, offset, size := IdxFileEntry(bytes[i*NeedleEntrySize : i*NeedleEntrySize+NeedleEntrySize])
key, offset, size := IdxFileEntry(bytes[i*NeedleMapEntrySize : i*NeedleMapEntrySize+NeedleMapEntrySize])
if e = fn(key, offset, size); e != nil {
return e
}

View File

@@ -22,12 +22,13 @@ type OffsetLower struct {
type Cookie uint32
const (
SizeSize = 4 // uint32 size
NeedleEntrySize = CookieSize + NeedleIdSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
TombstoneFileSize = math.MaxUint32
CookieSize = 4
SizeSize = 4 // uint32 size
NeedleHeaderSize = CookieSize + NeedleIdSize + SizeSize
NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
TombstoneFileSize = math.MaxUint32
CookieSize = 4
)
func CookieToBytes(bytes []byte, cookie Cookie) {

View File

@@ -27,8 +27,9 @@ type Volume struct {
SuperBlock
dataFileAccessLock sync.Mutex
lastModifiedTime uint64 //unix time in seconds
dataFileAccessLock sync.Mutex
lastModifiedTsSeconds uint64 //unix time in seconds
lastAppendAtNs uint64 //unix time in nanoseconds
lastCompactIndexOffset uint64
lastCompactRevision uint16
@@ -66,37 +67,26 @@ func (v *Volume) Version() needle.Version {
return v.SuperBlock.Version()
}
func (v *Volume) Size() int64 {
func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if v.dataFile == nil {
return 0
return
}
stat, e := v.dataFile.Stat()
if e == nil {
return stat.Size()
return uint64(stat.Size()), v.nm.IndexFileSize(), stat.ModTime()
}
glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
return 0 // -1 causes integer overflow and the volume to become unwritable.
return // -1 causes integer overflow and the volume to become unwritable.
}
func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
}
func (v *Volume) DataFileSize() uint64 {
return uint64(v.Size())
}
/**
unix time in seconds
*/
func (v *Volume) LastModifiedTime() uint64 {
return v.lastModifiedTime
}
func (v *Volume) FileCount() uint64 {
return uint64(v.nm.FileCount())
}
@@ -138,8 +128,8 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
@@ -157,16 +147,17 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
removalDelay = maxDelayMinutes
}
if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTsSeconds < uint64(time.Now().Unix()) {
return true
}
return false
}
func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
size, _, _ := v.FileStat()
return &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
Size: uint64(v.Size()),
Size: size,
Collection: v.Collection,
FileCount: uint64(v.nm.FileCount()),
DeleteCount: uint64(v.nm.DeletedCount()),

View File

@@ -60,7 +60,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
ctx := context.Background()
startFromOffset := v.Size()
startFromOffset, _, _ := v.FileStat()
appendAtNs, err := v.findLastAppendAtNs()
if err != nil {
return err
@@ -76,7 +76,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
return err
}
v.dataFile.Seek(startFromOffset, io.SeekStart)
v.dataFile.Seek(int64(startFromOffset), io.SeekStart)
for {
resp, recvErr := stream.Recv()
@@ -103,7 +103,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
// add to needle map
return ScanVolumeFileFrom(v.version, v.dataFile, startFromOffset, &VolumeFileScanner4GenIdx{v: v})
return ScanVolumeFileFrom(v.version, v.dataFile, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
}
@@ -130,16 +130,16 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
return Offset{}, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
}
fileSize := fi.Size()
if fileSize%NeedleEntrySize != 0 {
if fileSize%NeedleMapEntrySize != 0 {
return Offset{}, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
}
if fileSize == 0 {
return Offset{}, nil
}
bytes := make([]byte, NeedleEntrySize)
n, e := indexFile.ReadAt(bytes, fileSize-NeedleEntrySize)
if n != NeedleEntrySize {
bytes := make([]byte, NeedleMapEntrySize)
n, e := indexFile.ReadAt(bytes, fileSize-NeedleMapEntrySize)
if n != NeedleMapEntrySize {
return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
}
_, offset, _ := IdxFileEntry(bytes)
@@ -153,7 +153,7 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
if err != nil {
return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
}
_, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleEntrySize), bodyLength)
_, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
}
@@ -176,13 +176,13 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
return
}
fileSize := fi.Size()
if fileSize%NeedleEntrySize != 0 {
if fileSize%NeedleMapEntrySize != 0 {
err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
return
}
bytes := make([]byte, NeedleEntrySize)
entryCount := fileSize / NeedleEntrySize
bytes := make([]byte, NeedleMapEntrySize)
entryCount := fileSize / NeedleMapEntrySize
l := int64(0)
h := entryCount
@@ -225,9 +225,9 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
}
// bytes is of size NeedleEntrySize
// bytes is of size NeedleMapEntrySize
func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
if _, readErr := indexFile.ReadAt(bytes, m*NeedleEntrySize); readErr != nil && readErr != io.EOF {
if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF {
return Offset{}, readErr
}
_, offset, _ := IdxFileEntry(bytes)

View File

@@ -19,7 +19,7 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
return nil
}
var lastIdxEntry []byte
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleEntrySize); e != nil {
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
}
key, offset, size := IdxFileEntry(lastIdxEntry)
@@ -35,7 +35,7 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
if indexSize, err = util.GetFileSize(indexFile); err == nil {
if indexSize%NeedleEntrySize != 0 {
if indexSize%NeedleMapEntrySize != 0 {
err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize)
}
}
@@ -47,7 +47,7 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
err = fmt.Errorf("offset %d for index file is invalid", offset)
return
}
bytes = make([]byte, NeedleEntrySize)
bytes = make([]byte, NeedleMapEntrySize)
_, err = indexFile.ReadAt(bytes, offset)
return
}

View File

@@ -30,7 +30,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
v.lastModifiedTime = uint64(modifiedTime.Unix())
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")

View File

@@ -95,6 +95,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err
if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
return
}
v.lastAppendAtNs = n.AppendAtNs
nv, ok := v.nm.Get(n.Id)
if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
@@ -102,8 +103,8 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
}
if v.lastModifiedTime < n.LastModified {
v.lastModifiedTime = n.LastModified
if v.lastModifiedTsSeconds < n.LastModified {
v.lastModifiedTsSeconds = n.LastModified
}
return
}
@@ -125,6 +126,7 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
if err != nil {
return size, err
}
v.lastAppendAtNs = n.AppendAtNs
if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
return size, err
}
@@ -205,7 +207,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
}
for n != nil {
if volumeFileScanner.ReadNeedleBody() {
if _, err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil {
if _, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
@@ -218,7 +220,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
if err != nil {
glog.V(0).Infof("visit needle error: %v", err)
}
offset += NeedleEntrySize + rest
offset += NeedleHeaderSize + rest
glog.V(4).Infof("==> new entry offset %d", offset)
if n, _, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil {
if err == io.EOF {
@@ -241,7 +243,7 @@ func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset
}
for n != nil {
var needleBody []byte
if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil {
if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
@@ -251,7 +253,7 @@ func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset
glog.V(0).Infof("visit needle error: %v", err)
return
}
offset += NeedleEntrySize + rest
offset += NeedleHeaderSize + rest
glog.V(4).Infof("==> new entry offset %d", offset)
if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil {
if err == io.EOF {

View File

@@ -138,7 +138,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField)
for idxOffset := indexSize - NeedleEntrySize; uint64(idxOffset) >= v.lastCompactIndexOffset; idxOffset -= NeedleEntrySize {
for idxOffset := indexSize - NeedleMapEntrySize; uint64(idxOffset) >= v.lastCompactIndexOffset; idxOffset -= NeedleMapEntrySize {
var IdxEntry []byte
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idxOffset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idxOffset, err)