add timestamp inside lock

This commit is contained in:
Chris Lu 2020-04-10 01:35:59 -07:00
parent 9fa065f600
commit bcf37346ef
2 changed files with 25 additions and 23 deletions

View File

@ -45,11 +45,11 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
notification.Queue.SendMessage(fullpath, eventNotification) notification.Queue.SendMessage(fullpath, eventNotification)
} }
f.logMetaEvent(time.Now(), fullpath, eventNotification) f.logMetaEvent(fullpath, eventNotification)
} }
func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *filer_pb.EventNotification) { func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) {
dir, _ := util.FullPath(fullpath).DirAndName() dir, _ := util.FullPath(fullpath).DirAndName()
@ -63,7 +63,7 @@ func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *f
return return
} }
f.metaLogBuffer.AddToBuffer(ts, []byte(dir), data) f.metaLogBuffer.AddToBuffer([]byte(dir), data)
} }

View File

@ -46,8 +46,18 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb return lb
} }
func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) { func (m *LogBuffer) AddToBuffer(key, data []byte) {
m.Lock()
defer func() {
m.Unlock()
if m.notifyFn != nil {
m.notifyFn()
}
}()
// need to put the timestamp inside the lock
ts := time.Now()
logEntry := &filer_pb.LogEntry{ logEntry := &filer_pb.LogEntry{
TsNs: ts.UnixNano(), TsNs: ts.UnixNano(),
PartitionKeyHash: util.HashToInt32(key), PartitionKeyHash: util.HashToInt32(key),
@ -58,14 +68,6 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
size := len(logEntryData) size := len(logEntryData)
m.Lock()
defer func() {
m.Unlock()
if m.notifyFn != nil {
m.notifyFn()
}
}()
if m.pos == 0 { if m.pos == 0 {
m.startTime = ts m.startTime = ts
} }
@ -153,18 +155,18 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
l, h := 0, len(m.idx)-1 l, h := 0, len(m.idx)-1
/* /*
for i, pos := range m.idx { for i, pos := range m.idx {
logEntry, ts := readTs(m.buf, pos) logEntry, ts := readTs(m.buf, pos)
event := &filer_pb.FullEventNotification{} event := &filer_pb.FullEventNotification{}
proto.Unmarshal(logEntry.Data, event) proto.Unmarshal(logEntry.Data, event)
entry := event.EventNotification.OldEntry entry := event.EventNotification.OldEntry
if entry == nil { if entry == nil {
entry = event.EventNotification.NewEntry entry = event.EventNotification.NewEntry
}
fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
} }
fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) fmt.Printf("l=%d, h=%d\n", l, h)
} */
fmt.Printf("l=%d, h=%d\n", l, h)
*/
for l <= h { for l <= h {
mid := (l + h) / 2 mid := (l + h) / 2