Browse Source

refactor(exclusive_locker): `Interval` readability batch of updates (#3668)

* refactor(filechunk_manifest): `localProcesed` -> `localProcessed`

Signed-off-by: Ryan Russell <git@ryanrussell.org>

* refactor: `saveChunkedFileIntevalToStorage` -> `saveChunkedFileIntervalToStorage`

Signed-off-by: Ryan Russell <git@ryanrussell.org>

* refactor: `SafeRenewInteval` -> `SafeRenewInterval`

Signed-off-by: Ryan Russell <git@ryanrussell.org>

* refactor: `InitLockInteval` -> `InitLockInterval`

Signed-off-by: Ryan Russell <git@ryanrussell.org>

* refactor: `RenewInteval` -> `RenewInterval`

Signed-off-by: Ryan Russell <git@ryanrussell.org>

Signed-off-by: Ryan Russell <git@ryanrussell.org>
pull/3669/head
Ryan Russell 2 years ago
committed by GitHub
parent
commit
72d8a9f9a8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      weed/filer/filechunk_manifest.go
  2. 4
      weed/mount/dirty_pages_chunked.go
  3. 12
      weed/wdclient/exclusive_locks/exclusive_locker.go

12
weed/filer/filechunk_manifest.go

@ -169,19 +169,19 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
var localProcesed int
var localProcessed int
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
if totalWritten > localProcesed {
toBeSkipped := totalWritten - localProcesed
if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped { if len(data) <= toBeSkipped {
localProcesed += len(data)
localProcessed += len(data)
return // skip if already processed return // skip if already processed
} }
data = data[toBeSkipped:] data = data[toBeSkipped:]
localProcesed += toBeSkipped
localProcessed += toBeSkipped
} }
writer.Write(data) writer.Write(data)
localProcesed += len(data)
localProcessed += len(data)
totalWritten += len(data) totalWritten += len(data)
}) })
if !shouldRetry { if !shouldRetry {

4
weed/mount/dirty_pages_chunked.go

@ -33,7 +33,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
swapFileDir := fh.wfs.option.getTempFilePageDir() swapFileDir := fh.wfs.option.getTempFilePageDir()
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize, dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize,
dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters, swapFileDir)
dirtyPages.saveChunkedFileIntervalToStorage, fh.wfs.option.ConcurrentWriters, swapFileDir)
return dirtyPages return dirtyPages
} }
@ -65,7 +65,7 @@ func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64)
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
} }
func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
mtime := time.Now().UnixNano() mtime := time.Now().UnixNano()
defer cleanupFn() defer cleanupFn()

12
weed/wdclient/exclusive_locks/exclusive_locker.go

@ -11,9 +11,9 @@ import (
) )
const ( const (
RenewInteval = 4 * time.Second
SafeRenewInteval = 3 * time.Second
InitLockInteval = 1 * time.Second
RenewInterval = 4 * time.Second
SafeRenewInterval = 3 * time.Second
InitLockInterval = 1 * time.Second
) )
type ExclusiveLocker struct { type ExclusiveLocker struct {
@ -37,7 +37,7 @@ func (l *ExclusiveLocker) IsLocked() bool {
} }
func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInterval).Before(time.Now()) {
// wait until now is within the safe lock period, no immediate renewal to change the token // wait until now is within the safe lock period, no immediate renewal to change the token
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
@ -68,7 +68,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
return err return err
}); err != nil { }); err != nil {
println("lock:", err.Error()) println("lock:", err.Error())
time.Sleep(InitLockInteval)
time.Sleep(InitLockInterval)
} else { } else {
break break
} }
@ -101,7 +101,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
l.isLocked = false l.isLocked = false
return return
} else { } else {
time.Sleep(RenewInteval)
time.Sleep(RenewInterval)
} }
} }

Loading…
Cancel
Save