avoid crashes Galera Cluster

https://github.com/chrislusf/seaweedfs/issues/2125
This commit is contained in:
Konstantin Lebedev
2021-06-15 18:12:39 +05:00
parent ebe971da2e
commit 6aa1a56ec8
18 changed files with 36 additions and 29 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
"sync"
"time"
)
type SqlGenerator interface {
@@ -261,7 +262,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu
return nil
}
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
if err != nil {
@@ -279,18 +280,23 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
}
}
glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
for {
glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath), limit)
if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
}
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
rowCount, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
}
if rowCount < limit {
break
}
// to give the Galera Cluster a chance to breath
time.Sleep(time.Second)
}
_, err = res.RowsAffected()
if err != nil {
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
}
return nil
}