Browse Source
Revert "Merge branch 'track-mount-e2e' of https://github.com/seaweedfs/seaweedfs into track-mount-e2e"
Revert "Merge branch 'track-mount-e2e' of https://github.com/seaweedfs/seaweedfs into track-mount-e2e"
This reverts committrack-mount-e2e900abd44c1
, reversing changes made todf027fdddf
.
chrislu
1 year ago
23 changed files with 52 additions and 275 deletions
-
8.github/workflows/binaries_dev.yml
-
4.github/workflows/binaries_release0.yml
-
4.github/workflows/binaries_release1.yml
-
4.github/workflows/binaries_release2.yml
-
4.github/workflows/binaries_release3.yml
-
4.github/workflows/binaries_release4.yml
-
6.github/workflows/e2e.yml
-
2docker/Makefile
-
2weed/Makefile
-
26weed/iamapi/iamapi_management_handlers.go
-
6weed/mount/filehandle.go
-
2weed/mount/weedfs.go
-
9weed/mount/weedfs_file_copy_range.go
-
5weed/mount/weedfs_file_lseek.go
-
5weed/mount/weedfs_file_read.go
-
6weed/mount/weedfs_file_sync.go
-
5weed/mount/weedfs_file_write.go
-
8weed/s3api/auth_credentials_test.go
-
12weed/s3api/s3_constants/s3_actions.go
-
2weed/s3api/s3_constants/s3_config.go
-
8weed/s3api/s3api_server.go
-
153weed/util/lock_table.go
-
42weed/util/lock_table_test.go
@ -1,153 +0,0 @@ |
|||||
package util |
|
||||
|
|
||||
import ( |
|
||||
"fmt" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
||||
"sync" |
|
||||
"sync/atomic" |
|
||||
) |
|
||||
|
|
||||
// LockTable is a table of locks that can be acquired.
|
|
||||
// Locks are acquired in order of request.
|
|
||||
type LockTable[T comparable] struct { |
|
||||
mu sync.Mutex |
|
||||
locks map[T]*LockEntry |
|
||||
lockIdSeq int64 |
|
||||
} |
|
||||
|
|
||||
type LockEntry struct { |
|
||||
mu sync.Mutex |
|
||||
waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
|
|
||||
activeLockOwnerCount int32 |
|
||||
lockType LockType |
|
||||
cond *sync.Cond |
|
||||
} |
|
||||
|
|
||||
type LockType int |
|
||||
|
|
||||
const ( |
|
||||
SharedLock LockType = iota |
|
||||
ExclusiveLock |
|
||||
) |
|
||||
|
|
||||
type ActiveLock struct { |
|
||||
ID int64 |
|
||||
isDeleted bool |
|
||||
intention string // for debugging
|
|
||||
} |
|
||||
|
|
||||
func NewLockTable[T comparable]() *LockTable[T] { |
|
||||
return &LockTable[T]{ |
|
||||
locks: make(map[T]*LockEntry), |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock { |
|
||||
id := atomic.AddInt64(<.lockIdSeq, 1) |
|
||||
l := &ActiveLock{ID: id, intention: intention} |
|
||||
return l |
|
||||
} |
|
||||
|
|
||||
func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) { |
|
||||
lt.mu.Lock() |
|
||||
// Get or create the lock entry for the key
|
|
||||
entry, exists := lt.locks[key] |
|
||||
if !exists { |
|
||||
entry = &LockEntry{} |
|
||||
entry.cond = sync.NewCond(&entry.mu) |
|
||||
lt.locks[key] = entry |
|
||||
} |
|
||||
lt.mu.Unlock() |
|
||||
|
|
||||
lock = lt.NewActiveLock(intention) |
|
||||
|
|
||||
// If the lock is held exclusively, wait
|
|
||||
entry.mu.Lock() |
|
||||
if len(entry.waiters) > 0 || lockType == ExclusiveLock { |
|
||||
if glog.V(4) { |
|
||||
fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) |
|
||||
if len(entry.waiters) > 0 { |
|
||||
for _, waiter := range entry.waiters { |
|
||||
fmt.Printf(" %d", waiter.ID) |
|
||||
} |
|
||||
fmt.Printf("\n") |
|
||||
} |
|
||||
} |
|
||||
entry.waiters = append(entry.waiters, lock) |
|
||||
if lockType == ExclusiveLock { |
|
||||
for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) { |
|
||||
entry.cond.Wait() |
|
||||
} |
|
||||
} else { |
|
||||
for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) { |
|
||||
entry.cond.Wait() |
|
||||
} |
|
||||
} |
|
||||
// Remove the transaction from the waiters list
|
|
||||
if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID { |
|
||||
entry.waiters = entry.waiters[1:] |
|
||||
entry.cond.Broadcast() |
|
||||
} |
|
||||
} |
|
||||
entry.activeLockOwnerCount++ |
|
||||
|
|
||||
// Otherwise, grant the lock
|
|
||||
entry.lockType = lockType |
|
||||
if glog.V(4) { |
|
||||
fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) |
|
||||
if len(entry.waiters) > 0 { |
|
||||
for _, waiter := range entry.waiters { |
|
||||
fmt.Printf(" %d", waiter.ID) |
|
||||
} |
|
||||
fmt.Printf("\n") |
|
||||
} |
|
||||
} |
|
||||
entry.mu.Unlock() |
|
||||
|
|
||||
return lock |
|
||||
} |
|
||||
|
|
||||
func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) { |
|
||||
lt.mu.Lock() |
|
||||
defer lt.mu.Unlock() |
|
||||
|
|
||||
entry, exists := lt.locks[key] |
|
||||
if !exists { |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
entry.mu.Lock() |
|
||||
defer entry.mu.Unlock() |
|
||||
|
|
||||
// Remove the transaction from the waiters list
|
|
||||
for i, waiter := range entry.waiters { |
|
||||
if waiter == lock { |
|
||||
waiter.isDeleted = true |
|
||||
entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...) |
|
||||
break |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// If there are no waiters, release the lock
|
|
||||
if len(entry.waiters) == 0 { |
|
||||
delete(lt.locks, key) |
|
||||
} |
|
||||
|
|
||||
if glog.V(4) { |
|
||||
fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount) |
|
||||
if len(entry.waiters) > 0 { |
|
||||
for _, waiter := range entry.waiters { |
|
||||
fmt.Printf(" %d", waiter.ID) |
|
||||
} |
|
||||
fmt.Printf("\n") |
|
||||
} |
|
||||
} |
|
||||
entry.activeLockOwnerCount-- |
|
||||
|
|
||||
// Notify the next waiter
|
|
||||
entry.cond.Broadcast() |
|
||||
} |
|
||||
|
|
||||
func main() { |
|
||||
|
|
||||
} |
|
@ -1,42 +0,0 @@ |
|||||
package util |
|
||||
|
|
||||
import ( |
|
||||
"fmt" |
|
||||
"math/rand" |
|
||||
"sync" |
|
||||
"testing" |
|
||||
"time" |
|
||||
) |
|
||||
|
|
||||
func TestOrderedLock(t *testing.T) { |
|
||||
lt := NewLockTable[string]() |
|
||||
|
|
||||
var wg sync.WaitGroup |
|
||||
// Simulate transactions requesting locks
|
|
||||
for i := 1; i <= 50; i++ { |
|
||||
wg.Add(1) |
|
||||
go func(i int) { |
|
||||
defer wg.Done() |
|
||||
key := "resource" |
|
||||
lockType := SharedLock |
|
||||
if i%5 == 0 { |
|
||||
lockType = ExclusiveLock |
|
||||
} |
|
||||
|
|
||||
// Simulate attempting to acquire the lock
|
|
||||
lock := lt.AcquireLock("", key, lockType) |
|
||||
|
|
||||
// Lock acquired, perform some work
|
|
||||
fmt.Printf("ActiveLock %d acquired lock %v\n", lock.ID, lockType) |
|
||||
|
|
||||
// Simulate some work
|
|
||||
time.Sleep(time.Duration(rand.Int31n(10)*10) * time.Millisecond) |
|
||||
|
|
||||
// Release the lock
|
|
||||
lt.ReleaseLock(key, lock) |
|
||||
fmt.Printf("ActiveLock %d released lock %v\n", lock.ID, lockType) |
|
||||
}(i) |
|
||||
} |
|
||||
|
|
||||
wg.Wait() |
|
||||
} |
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue