You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
7.2 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
3 years ago
3 years ago
1 year ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
3 years ago
1 year ago
Fix dead lock (#5815) * reduce locks to avoid dead lock Flush->FlushData->uplloadPipeline.FluahAll uploaderCount>0 goroutine 1 [sync.Cond.Wait, 71 minutes]: sync.runtime_notifyListWait(0xc0007ae4d0, 0x0) /usr/local/go/src/runtime/sema.go:569 +0x159 sync.(*Cond).Wait(0xc001a59290?) /usr/local/go/src/sync/cond.go:70 +0x85 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).waitForCurrentWritersToComplete(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline_lock.go:58 +0x32 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).FlushAll(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline.go:151 +0x25 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).FlushData(0xc00087e840) /github/workspace/weed/mount/dirty_pages_chunked.go:54 +0x29 github.com/seaweedfs/seaweedfs/weed/mount.(*PageWriter).FlushData(...) /github/workspace/weed/mount/page_writer.go:50 github.com/seaweedfs/seaweedfs/weed/mount.(*WFS).doFlush(0xc0006ad600, 0xc00030d380, 0x0, 0x0) /github/workspace/weed/mount/weedfs_file_sync.go:101 +0x169 github.com/seaweedfs/seaweedfs/weed/mount.(*WFS).Flush(0xc0006ad600, 0xc001a594a8?, 0xc0004c1ca0) /github/workspace/weed/mount/weedfs_file_sync.go:59 +0x48 github.com/hanwen/go-fuse/v2/fuse.doFlush(0xc0000da870?, 0xc0004c1b08) SaveContent -> MemChunk.RLock -> ChunkedDirtyPages.saveChunkedFileIntervalToStorage pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) fh.entryLock.Lock() sync.(*RWMutex).Lock(0x0?) /usr/local/go/src/sync/rwmutex.go:146 +0x31 github.com/seaweedfs/seaweedfs/weed/mount.(*FileHandle).AddChunks(0xc00030d380, {0xc00028bdc8, 0x1, 0x1}) /github/workspace/weed/mount/filehandle.go:93 +0x45 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).saveChunkedFileIntervalToStorage(0xc00087e840, {0x2be7ac0, 0xc00018d9e0}, 0x0, 0x121, 0x17e3c624565ace45, 0x1?) /github/workspace/weed/mount/dirty_pages_chunked.go:80 +0x2d4 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*MemChunk).SaveContent(0xc0008d9130, 0xc0008093e0) /github/workspace/weed/mount/page_writer/page_chunk_mem.go:115 +0x112 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).moveToSealed.func1() /github/workspace/weed/mount/page_writer/upload_pipeline.go:187 +0x55 github.com/seaweedfs/seaweedfs/weed/util.(*LimitedConcurrentExecutor).Execute.func1() /github/workspace/weed/util/limited_executor.go:38 +0x62 created by github.com/seaweedfs/seaweedfs/weed/util.(*LimitedConcurrentExecutor).Execute in goroutine 1 /github/workspace/weed/util/limited_executor.go:33 +0x97 On metadata update fh.entryLock.Lock() fh.dirtyPages.Destroy() up.chunksLock.Lock => each sealed chunk.FreeReference => MemChunk.Lock goroutine 134 [sync.RWMutex.Lock, 71 minutes]: sync.runtime_SemacquireRWMutex(0xc0007c3558?, 0xea?, 0x3fb0800?) /usr/local/go/src/runtime/sema.go:87 +0x25 sync.(*RWMutex).Lock(0xc0007c35a8?) /usr/local/go/src/sync/rwmutex.go:151 +0x6a github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*MemChunk).FreeResource(0xc0008d9130) /github/workspace/weed/mount/page_writer/page_chunk_mem.go:38 +0x2a github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*SealedChunk).FreeReference(0xc00071cdb0, {0xc0006ba1a0, 0x20}) /github/workspace/weed/mount/page_writer/upload_pipeline.go:38 +0xb7 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).Shutdown(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline.go:220 +0x185 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).Destroy(0xc0008cea40?) /github/workspace/weed/mount/dirty_pages_chunked.go:87 +0x17 github.com/seaweedfs/seaweedfs/weed/mount.(*PageWriter).Destroy(...) /github/workspace/weed/mount/page_writer.go:78 github.com/seaweedfs/seaweedfs/weed/mount.NewSeaweedFileSystem.func3({0xc00069a6c0, 0x30}, 0x6?) /github/workspace/weed/mount/weedfs.go:119 +0x17a github.com/seaweedfs/seaweedfs/weed/mount/meta_cache.NewMetaCache.func1({0xc00069a6c0?, 0xc00069a480?}, 0x4015b40?) /github/workspace/weed/mount/meta_cache/meta_cache.go:37 +0x1c github.com/seaweedfs/seaweedfs/weed/mount/meta_cache.SubscribeMetaEvents.func1(0xc000661810) /github/workspace/weed/mount/meta_cache/meta_cache_subscribe.go:43 +0x570 * use locked entry everywhere * modifiable remote entry * skip locking after getting lock from fhLockTable
6 months ago
3 years ago
1 year ago
1 year ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
1 year ago
3 years ago
1 year ago
3 years ago
  1. package mount
  2. import (
  3. "context"
  4. "math/rand"
  5. "os"
  6. "path"
  7. "path/filepath"
  8. "sync/atomic"
  9. "time"
  10. "github.com/hanwen/go-fuse/v2/fuse"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  20. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  21. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  22. "github.com/hanwen/go-fuse/v2/fs"
  23. )
  24. type Option struct {
  25. filerIndex int32 // align memory for atomic read/write
  26. FilerAddresses []pb.ServerAddress
  27. MountDirectory string
  28. GrpcDialOption grpc.DialOption
  29. FilerMountRootPath string
  30. Collection string
  31. Replication string
  32. TtlSec int32
  33. DiskType types.DiskType
  34. ChunkSizeLimit int64
  35. ConcurrentWriters int
  36. CacheDirForRead string
  37. CacheSizeMBForRead int64
  38. CacheDirForWrite string
  39. DataCenter string
  40. Umask os.FileMode
  41. Quota int64
  42. DisableXAttr bool
  43. MountUid uint32
  44. MountGid uint32
  45. MountMode os.FileMode
  46. MountCtime time.Time
  47. MountMtime time.Time
  48. MountParentInode uint64
  49. VolumeServerAccess string // how to access volume servers
  50. Cipher bool // whether encrypt data on volume server
  51. UidGidMapper *meta_cache.UidGidMapper
  52. uniqueCacheDirForRead string
  53. uniqueCacheDirForWrite string
  54. }
  55. type WFS struct {
  56. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  57. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  58. fuse.RawFileSystem
  59. mount_pb.UnimplementedSeaweedMountServer
  60. fs.Inode
  61. option *Option
  62. metaCache *meta_cache.MetaCache
  63. stats statsCache
  64. chunkCache *chunk_cache.TieredChunkCache
  65. signature int32
  66. concurrentWriters *util.LimitedConcurrentExecutor
  67. inodeToPath *InodeToPath
  68. fhmap *FileHandleToInode
  69. dhmap *DirectoryHandleToInode
  70. fuseServer *fuse.Server
  71. IsOverQuota bool
  72. fhLockTable *util.LockTable[FileHandleId]
  73. }
  74. func NewSeaweedFileSystem(option *Option) *WFS {
  75. wfs := &WFS{
  76. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  77. option: option,
  78. signature: util.RandomInt32(),
  79. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  80. fhmap: NewFileHandleToInode(),
  81. dhmap: NewDirectoryHandleToInode(),
  82. fhLockTable: util.NewLockTable[FileHandleId](),
  83. }
  84. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  85. wfs.option.setupUniqueCacheDirectory()
  86. if option.CacheSizeMBForRead > 0 {
  87. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  88. }
  89. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  90. util.FullPath(option.FilerMountRootPath),
  91. func(path util.FullPath) {
  92. wfs.inodeToPath.MarkChildrenCached(path)
  93. }, func(path util.FullPath) bool {
  94. return wfs.inodeToPath.IsChildrenCached(path)
  95. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  96. // Find inode if it is not a deleted path
  97. if inode, inode_found := wfs.inodeToPath.GetInode(filePath); inode_found {
  98. // Find open file handle
  99. if fh, fh_found := wfs.fhmap.FindFileHandle(inode); fh_found {
  100. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  101. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  102. // Recreate dirty pages
  103. fh.dirtyPages.Destroy()
  104. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  105. // Update handle entry
  106. newentry, status := wfs.maybeLoadEntry(filePath)
  107. if status == fuse.OK {
  108. if fh.GetEntry().GetEntry() != newentry {
  109. fh.SetEntry(newentry)
  110. }
  111. }
  112. }
  113. }
  114. })
  115. grace.OnInterrupt(func() {
  116. wfs.metaCache.Shutdown()
  117. os.RemoveAll(option.getUniqueCacheDirForWrite())
  118. os.RemoveAll(option.getUniqueCacheDirForRead())
  119. })
  120. if wfs.option.ConcurrentWriters > 0 {
  121. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  122. }
  123. return wfs
  124. }
  125. func (wfs *WFS) StartBackgroundTasks() {
  126. startTime := time.Now()
  127. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  128. go wfs.loopCheckQuota()
  129. }
  130. func (wfs *WFS) String() string {
  131. return "seaweedfs"
  132. }
  133. func (wfs *WFS) Init(server *fuse.Server) {
  134. wfs.fuseServer = server
  135. }
  136. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  137. path, status = wfs.inodeToPath.GetPath(inode)
  138. if status != fuse.OK {
  139. return
  140. }
  141. var found bool
  142. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  143. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  144. if entry != nil && fh.entry.Attributes == nil {
  145. entry.Attributes = &filer_pb.FuseAttributes{}
  146. }
  147. })
  148. } else {
  149. entry, status = wfs.maybeLoadEntry(path)
  150. }
  151. return
  152. }
  153. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  154. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  155. dir, name := fullpath.DirAndName()
  156. // return a valid entry for the mount root
  157. if string(fullpath) == wfs.option.FilerMountRootPath {
  158. return &filer_pb.Entry{
  159. Name: name,
  160. IsDirectory: true,
  161. Attributes: &filer_pb.FuseAttributes{
  162. Mtime: wfs.option.MountMtime.Unix(),
  163. FileMode: uint32(wfs.option.MountMode),
  164. Uid: wfs.option.MountUid,
  165. Gid: wfs.option.MountGid,
  166. Crtime: wfs.option.MountCtime.Unix(),
  167. },
  168. }, fuse.OK
  169. }
  170. // read from async meta cache
  171. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  172. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  173. if cacheErr == filer_pb.ErrNotFound {
  174. return nil, fuse.ENOENT
  175. }
  176. return cachedEntry.ToProtoEntry(), fuse.OK
  177. }
  178. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  179. if wfs.option.VolumeServerAccess == "filerProxy" {
  180. return func(fileId string) (targetUrls []string, err error) {
  181. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  182. }
  183. }
  184. return filer.LookupFn(wfs)
  185. }
  186. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  187. i := atomic.LoadInt32(&wfs.option.filerIndex)
  188. return wfs.option.FilerAddresses[i]
  189. }
  190. func (option *Option) setupUniqueCacheDirectory() {
  191. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  192. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  193. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  194. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  195. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  196. }
  197. func (option *Option) getUniqueCacheDirForWrite() string {
  198. return option.uniqueCacheDirForWrite
  199. }
  200. func (option *Option) getUniqueCacheDirForRead() string {
  201. return option.uniqueCacheDirForRead
  202. }