Update foundationdb_store.go

This commit is contained in:
chrislu
2025-08-28 20:26:49 -07:00
parent 10956d77e3
commit 6d7cd2b73d

View File

@@ -7,7 +7,6 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/apple/foundationdb/bindings/go/src/fdb" "github.com/apple/foundationdb/bindings/go/src/fdb"
@@ -61,9 +60,23 @@ type FoundationDBStore struct {
directoryPrefix string directoryPrefix string
timeout time.Duration timeout time.Duration
maxRetryDelay time.Duration maxRetryDelay time.Duration
txMu sync.RWMutex }
isInTransaction bool
currentTx fdb.Transaction // Context key type for storing transactions
type contextKey string
const transactionKey contextKey = "fdb_transaction"
// Helper functions for context-scoped transactions
func (store *FoundationDBStore) getTransactionFromContext(ctx context.Context) (fdb.Transaction, bool) {
if tx, ok := ctx.Value(transactionKey).(fdb.Transaction); ok {
return tx, true
}
return nil, false
}
func (store *FoundationDBStore) setTransactionInContext(ctx context.Context, tx fdb.Transaction) context.Context {
return context.WithValue(ctx, transactionKey, tx)
} }
func (store *FoundationDBStore) GetName() string { func (store *FoundationDBStore) GetName() string {
@@ -132,44 +145,47 @@ func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) e
} }
func (store *FoundationDBStore) BeginTransaction(ctx context.Context) (context.Context, error) { func (store *FoundationDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
store.txMu.Lock() // Check if there's already a transaction in this context
defer store.txMu.Unlock() if _, exists := store.getTransactionFromContext(ctx); exists {
return ctx, fmt.Errorf("transaction already in progress for this context")
if store.isInTransaction {
return ctx, fmt.Errorf("transaction already in progress")
} }
store.currentTx, _ = store.database.CreateTransaction() // Create a new transaction
store.isInTransaction = true tx, err := store.database.CreateTransaction()
if err != nil {
return ctx, fmt.Errorf("failed to create transaction: %v", err)
}
return ctx, nil // Store the transaction in context and return the new context
newCtx := store.setTransactionInContext(ctx, tx)
return newCtx, nil
} }
func (store *FoundationDBStore) CommitTransaction(ctx context.Context) error { func (store *FoundationDBStore) CommitTransaction(ctx context.Context) error {
store.txMu.Lock() // Get transaction from context
defer store.txMu.Unlock() tx, exists := store.getTransactionFromContext(ctx)
if !exists {
if !store.isInTransaction { return fmt.Errorf("no transaction in progress for this context")
return fmt.Errorf("no transaction in progress")
} }
err := store.currentTx.Commit().Get() // Commit the transaction
store.isInTransaction = false err := tx.Commit().Get()
if err != nil {
return fmt.Errorf("failed to commit transaction: %v", err)
}
return err return nil
} }
func (store *FoundationDBStore) RollbackTransaction(ctx context.Context) error { func (store *FoundationDBStore) RollbackTransaction(ctx context.Context) error {
store.txMu.Lock() // Get transaction from context
defer store.txMu.Unlock() tx, exists := store.getTransactionFromContext(ctx)
if !exists {
if !store.isInTransaction { return fmt.Errorf("no transaction in progress for this context")
return fmt.Errorf("no transaction in progress")
} }
store.currentTx.Cancel() // Cancel the transaction
store.isInTransaction = false tx.Cancel()
return nil return nil
} }
@@ -189,11 +205,9 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En
value = util.MaybeGzipData(value) value = util.MaybeGzipData(value)
} }
store.txMu.RLock() // Check if there's a transaction in context
defer store.txMu.RUnlock() if tx, exists := store.getTransactionFromContext(ctx); exists {
tx.Set(key, value)
if store.isInTransaction {
store.currentTx.Set(key, value)
return nil return nil
} }
@@ -213,12 +227,10 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En
func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
key := store.genKey(util.FullPath(fullpath).DirAndName()) key := store.genKey(util.FullPath(fullpath).DirAndName())
store.txMu.RLock()
defer store.txMu.RUnlock()
var data []byte var data []byte
if store.isInTransaction { // Check if there's a transaction in context
data, err = store.currentTx.Get(key).Get() if tx, exists := store.getTransactionFromContext(ctx); exists {
data, err = tx.Get(key).Get()
} else { } else {
result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
return rtr.Get(key).Get() return rtr.Get(key).Get()
@@ -253,11 +265,9 @@ func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.Ful
func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
key := store.genKey(util.FullPath(fullpath).DirAndName()) key := store.genKey(util.FullPath(fullpath).DirAndName())
store.txMu.RLock() // Check if there's a transaction in context
defer store.txMu.RUnlock() if tx, exists := store.getTransactionFromContext(ctx); exists {
tx.Clear(key)
if store.isInTransaction {
store.currentTx.Clear(key)
return nil return nil
} }
@@ -277,12 +287,10 @@ func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.F
func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
directoryPrefix := store.genDirectoryKeyPrefix(string(fullpath), "") directoryPrefix := store.genDirectoryKeyPrefix(string(fullpath), "")
store.txMu.RLock() // Check if there's a transaction in context
defer store.txMu.RUnlock() if tx, exists := store.getTransactionFromContext(ctx); exists {
if store.isInTransaction {
kr := fdb.KeyRange{Begin: directoryPrefix, End: prefixEnd(directoryPrefix)} kr := fdb.KeyRange{Begin: directoryPrefix, End: prefixEnd(directoryPrefix)}
store.currentTx.ClearRange(kr) tx.ClearRange(kr)
return nil return nil
} }
@@ -316,13 +324,11 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context
startKey = append(startKey, 0x00) startKey = append(startKey, 0x00)
} }
store.txMu.RLock()
defer store.txMu.RUnlock()
var kvs []fdb.KeyValue var kvs []fdb.KeyValue
if store.isInTransaction { // Check if there's a transaction in context
if tx, exists := store.getTransactionFromContext(ctx); exists {
kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEnd(directoryPrefix)} kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEnd(directoryPrefix)}
kvs = store.currentTx.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceOrPanic() kvs = tx.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceOrPanic()
} else { } else {
result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEnd(directoryPrefix)} kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEnd(directoryPrefix)}
@@ -366,11 +372,9 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context
func (store *FoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error { func (store *FoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error {
fdbKey := store.kvDir.Pack(tuple.Tuple{key}) fdbKey := store.kvDir.Pack(tuple.Tuple{key})
store.txMu.RLock() // Check if there's a transaction in context
defer store.txMu.RUnlock() if tx, exists := store.getTransactionFromContext(ctx); exists {
tx.Set(fdbKey, value)
if store.isInTransaction {
store.currentTx.Set(fdbKey, value)
return nil return nil
} }
@@ -385,14 +389,12 @@ func (store *FoundationDBStore) KvPut(ctx context.Context, key []byte, value []b
func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
fdbKey := store.kvDir.Pack(tuple.Tuple{key}) fdbKey := store.kvDir.Pack(tuple.Tuple{key})
store.txMu.RLock()
defer store.txMu.RUnlock()
var data []byte var data []byte
var err error var err error
if store.isInTransaction { // Check if there's a transaction in context
data, err = store.currentTx.Get(fdbKey).Get() if tx, exists := store.getTransactionFromContext(ctx); exists {
data, err = tx.Get(fdbKey).Get()
} else { } else {
result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
return rtr.Get(fdbKey).Get() return rtr.Get(fdbKey).Get()
@@ -414,11 +416,9 @@ func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte,
func (store *FoundationDBStore) KvDelete(ctx context.Context, key []byte) error { func (store *FoundationDBStore) KvDelete(ctx context.Context, key []byte) error {
fdbKey := store.kvDir.Pack(tuple.Tuple{key}) fdbKey := store.kvDir.Pack(tuple.Tuple{key})
store.txMu.RLock() // Check if there's a transaction in context
defer store.txMu.RUnlock() if tx, exists := store.getTransactionFromContext(ctx); exists {
tx.Clear(fdbKey)
if store.isInTransaction {
store.currentTx.Clear(fdbKey)
return nil return nil
} }