mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-05-04 05:27:48 +08:00
fix bug due to data racing on VidMap (#3606)
This commit is contained in:
parent
7c277f36ec
commit
bc629665de
@ -27,7 +27,7 @@ type MasterClient struct {
|
|||||||
masters map[string]pb.ServerAddress
|
masters map[string]pb.ServerAddress
|
||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
|
|
||||||
vidMap
|
*vidMap
|
||||||
vidMapCacheSize int
|
vidMapCacheSize int
|
||||||
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
|
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
|
||||||
OnPeerUpdateLock sync.RWMutex
|
OnPeerUpdateLock sync.RWMutex
|
||||||
@ -303,9 +303,12 @@ func (mc *MasterClient) resetVidMap() {
|
|||||||
DataCenter: mc.DataCenter,
|
DataCenter: mc.DataCenter,
|
||||||
cache: mc.cache,
|
cache: mc.cache,
|
||||||
}
|
}
|
||||||
mc.vidMap = newVidMap(mc.DataCenter)
|
|
||||||
mc.vidMap.cache = tail
|
|
||||||
|
|
||||||
|
nvm := newVidMap(mc.DataCenter)
|
||||||
|
nvm.cache = tail
|
||||||
|
mc.vidMap = nvm
|
||||||
|
|
||||||
|
//trim
|
||||||
for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ {
|
for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ {
|
||||||
if i == mc.vidMapCacheSize-1 {
|
if i == mc.vidMapCacheSize-1 {
|
||||||
tail.cache = nil
|
tail.cache = nil
|
||||||
|
@ -43,8 +43,8 @@ type vidMap struct {
|
|||||||
cache *vidMap
|
cache *vidMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func newVidMap(dataCenter string) vidMap {
|
func newVidMap(dataCenter string) *vidMap {
|
||||||
return vidMap{
|
return &vidMap{
|
||||||
vid2Locations: make(map[uint32][]Location),
|
vid2Locations: make(map[uint32][]Location),
|
||||||
ecVid2Locations: make(map[uint32][]Location),
|
ecVid2Locations: make(map[uint32][]Location),
|
||||||
DataCenter: dataCenter,
|
DataCenter: dataCenter,
|
||||||
|
@ -1,15 +1,17 @@
|
|||||||
package wdclient
|
package wdclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLocationIndex(t *testing.T) {
|
func TestLocationIndex(t *testing.T) {
|
||||||
vm := vidMap{}
|
vm := &vidMap{}
|
||||||
// test must be failed
|
// test must be failed
|
||||||
mustFailed := func(length int) {
|
mustFailed := func(length int) {
|
||||||
_, err := vm.getLocationIndex(length)
|
_, err := vm.getLocationIndex(length)
|
||||||
@ -132,6 +134,43 @@ func TestLookupFileId(t *testing.T) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrentGetLocations(t *testing.T) {
|
||||||
|
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil)
|
||||||
|
location := Location{Url: "TestDataRacing"}
|
||||||
|
mc.addLocation(1, location)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
_, found := mc.GetLocations(1)
|
||||||
|
if !found {
|
||||||
|
cancel()
|
||||||
|
t.Error("vid map invalid due to data racing. ")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
//Simulate vidmap reset with cache when leader changes
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
mc.resetVidMap()
|
||||||
|
mc.addLocation(1, location)
|
||||||
|
time.Sleep(1 * time.Microsecond)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkLocationIndex(b *testing.B) {
|
func BenchmarkLocationIndex(b *testing.B) {
|
||||||
b.SetParallelism(8)
|
b.SetParallelism(8)
|
||||||
vm := vidMap{
|
vm := vidMap{
|
||||||
|
Loading…
Reference in New Issue
Block a user