async chan write read, no write for closed chan

This commit is contained in:
Chris Lu
2020-05-10 03:48:35 -07:00
parent 78afb8bf46
commit 6bf3eb69cb
5 changed files with 34 additions and 5 deletions

View File

@@ -98,13 +98,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}
func (m *LogBuffer) Shutdown() {
m.Lock()
defer m.Unlock()
if m.isStopping {
return
}
m.isStopping = true
m.Lock()
toFlush := m.copyToFlush()
m.Unlock()
m.flushChan <- toFlush
close(m.flushChan)
}
@@ -123,10 +124,14 @@ func (m *LogBuffer) loopInterval() {
for !m.isStopping {
time.Sleep(m.flushInterval)
m.Lock()
if m.isStopping {
m.Unlock()
return
}
// println("loop interval")
toFlush := m.copyToFlush()
m.Unlock()
m.flushChan <- toFlush
m.Unlock()
}
}