You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
7.2 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
3 years ago
3 years ago
3 years ago
1 year ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
3 years ago
1 year ago
Fix dead lock (#5815) * reduce locks to avoid dead lock Flush->FlushData->uplloadPipeline.FluahAll uploaderCount>0 goroutine 1 [sync.Cond.Wait, 71 minutes]: sync.runtime_notifyListWait(0xc0007ae4d0, 0x0) /usr/local/go/src/runtime/sema.go:569 +0x159 sync.(*Cond).Wait(0xc001a59290?) /usr/local/go/src/sync/cond.go:70 +0x85 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).waitForCurrentWritersToComplete(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline_lock.go:58 +0x32 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).FlushAll(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline.go:151 +0x25 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).FlushData(0xc00087e840) /github/workspace/weed/mount/dirty_pages_chunked.go:54 +0x29 github.com/seaweedfs/seaweedfs/weed/mount.(*PageWriter).FlushData(...) /github/workspace/weed/mount/page_writer.go:50 github.com/seaweedfs/seaweedfs/weed/mount.(*WFS).doFlush(0xc0006ad600, 0xc00030d380, 0x0, 0x0) /github/workspace/weed/mount/weedfs_file_sync.go:101 +0x169 github.com/seaweedfs/seaweedfs/weed/mount.(*WFS).Flush(0xc0006ad600, 0xc001a594a8?, 0xc0004c1ca0) /github/workspace/weed/mount/weedfs_file_sync.go:59 +0x48 github.com/hanwen/go-fuse/v2/fuse.doFlush(0xc0000da870?, 0xc0004c1b08) SaveContent -> MemChunk.RLock -> ChunkedDirtyPages.saveChunkedFileIntervalToStorage pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) fh.entryLock.Lock() sync.(*RWMutex).Lock(0x0?) /usr/local/go/src/sync/rwmutex.go:146 +0x31 github.com/seaweedfs/seaweedfs/weed/mount.(*FileHandle).AddChunks(0xc00030d380, {0xc00028bdc8, 0x1, 0x1}) /github/workspace/weed/mount/filehandle.go:93 +0x45 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).saveChunkedFileIntervalToStorage(0xc00087e840, {0x2be7ac0, 0xc00018d9e0}, 0x0, 0x121, 0x17e3c624565ace45, 0x1?) /github/workspace/weed/mount/dirty_pages_chunked.go:80 +0x2d4 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*MemChunk).SaveContent(0xc0008d9130, 0xc0008093e0) /github/workspace/weed/mount/page_writer/page_chunk_mem.go:115 +0x112 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).moveToSealed.func1() /github/workspace/weed/mount/page_writer/upload_pipeline.go:187 +0x55 github.com/seaweedfs/seaweedfs/weed/util.(*LimitedConcurrentExecutor).Execute.func1() /github/workspace/weed/util/limited_executor.go:38 +0x62 created by github.com/seaweedfs/seaweedfs/weed/util.(*LimitedConcurrentExecutor).Execute in goroutine 1 /github/workspace/weed/util/limited_executor.go:33 +0x97 On metadata update fh.entryLock.Lock() fh.dirtyPages.Destroy() up.chunksLock.Lock => each sealed chunk.FreeReference => MemChunk.Lock goroutine 134 [sync.RWMutex.Lock, 71 minutes]: sync.runtime_SemacquireRWMutex(0xc0007c3558?, 0xea?, 0x3fb0800?) /usr/local/go/src/runtime/sema.go:87 +0x25 sync.(*RWMutex).Lock(0xc0007c35a8?) /usr/local/go/src/sync/rwmutex.go:151 +0x6a github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*MemChunk).FreeResource(0xc0008d9130) /github/workspace/weed/mount/page_writer/page_chunk_mem.go:38 +0x2a github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*SealedChunk).FreeReference(0xc00071cdb0, {0xc0006ba1a0, 0x20}) /github/workspace/weed/mount/page_writer/upload_pipeline.go:38 +0xb7 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).Shutdown(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline.go:220 +0x185 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).Destroy(0xc0008cea40?) /github/workspace/weed/mount/dirty_pages_chunked.go:87 +0x17 github.com/seaweedfs/seaweedfs/weed/mount.(*PageWriter).Destroy(...) /github/workspace/weed/mount/page_writer.go:78 github.com/seaweedfs/seaweedfs/weed/mount.NewSeaweedFileSystem.func3({0xc00069a6c0, 0x30}, 0x6?) /github/workspace/weed/mount/weedfs.go:119 +0x17a github.com/seaweedfs/seaweedfs/weed/mount/meta_cache.NewMetaCache.func1({0xc00069a6c0?, 0xc00069a480?}, 0x4015b40?) /github/workspace/weed/mount/meta_cache/meta_cache.go:37 +0x1c github.com/seaweedfs/seaweedfs/weed/mount/meta_cache.SubscribeMetaEvents.func1(0xc000661810) /github/workspace/weed/mount/meta_cache/meta_cache_subscribe.go:43 +0x570 * use locked entry everywhere * modifiable remote entry * skip locking after getting lock from fhLockTable
6 months ago
3 years ago
1 year ago
1 year ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
1 year ago
3 years ago
1 year ago
3 years ago
  1. package mount
  2. import (
  3. "context"
  4. "math/rand"
  5. "os"
  6. "path"
  7. "path/filepath"
  8. "sync/atomic"
  9. "time"
  10. "github.com/hanwen/go-fuse/v2/fuse"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  20. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  21. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  22. "github.com/hanwen/go-fuse/v2/fs"
  23. )
  24. type Option struct {
  25. filerIndex int32 // align memory for atomic read/write
  26. FilerAddresses []pb.ServerAddress
  27. MountDirectory string
  28. GrpcDialOption grpc.DialOption
  29. FilerMountRootPath string
  30. Collection string
  31. Replication string
  32. TtlSec int32
  33. DiskType types.DiskType
  34. ChunkSizeLimit int64
  35. ConcurrentWriters int
  36. CacheDirForRead string
  37. CacheSizeMBForRead int64
  38. CacheDirForWrite string
  39. DataCenter string
  40. Umask os.FileMode
  41. Quota int64
  42. DisableXAttr bool
  43. WriteOnceReadMany bool
  44. MountUid uint32
  45. MountGid uint32
  46. MountMode os.FileMode
  47. MountCtime time.Time
  48. MountMtime time.Time
  49. MountParentInode uint64
  50. VolumeServerAccess string // how to access volume servers
  51. Cipher bool // whether encrypt data on volume server
  52. UidGidMapper *meta_cache.UidGidMapper
  53. uniqueCacheDirForRead string
  54. uniqueCacheDirForWrite string
  55. }
  56. type WFS struct {
  57. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  58. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  59. fuse.RawFileSystem
  60. mount_pb.UnimplementedSeaweedMountServer
  61. fs.Inode
  62. option *Option
  63. metaCache *meta_cache.MetaCache
  64. stats statsCache
  65. chunkCache *chunk_cache.TieredChunkCache
  66. signature int32
  67. concurrentWriters *util.LimitedConcurrentExecutor
  68. inodeToPath *InodeToPath
  69. fhmap *FileHandleToInode
  70. dhmap *DirectoryHandleToInode
  71. fuseServer *fuse.Server
  72. IsOverQuota bool
  73. fhLockTable *util.LockTable[FileHandleId]
  74. }
  75. func NewSeaweedFileSystem(option *Option) *WFS {
  76. wfs := &WFS{
  77. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  78. option: option,
  79. signature: util.RandomInt32(),
  80. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  81. fhmap: NewFileHandleToInode(),
  82. dhmap: NewDirectoryHandleToInode(),
  83. fhLockTable: util.NewLockTable[FileHandleId](),
  84. }
  85. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  86. wfs.option.setupUniqueCacheDirectory()
  87. if option.CacheSizeMBForRead > 0 {
  88. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  89. }
  90. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  91. util.FullPath(option.FilerMountRootPath),
  92. func(path util.FullPath) {
  93. wfs.inodeToPath.MarkChildrenCached(path)
  94. }, func(path util.FullPath) bool {
  95. return wfs.inodeToPath.IsChildrenCached(path)
  96. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  97. // Find inode if it is not a deleted path
  98. if inode, inode_found := wfs.inodeToPath.GetInode(filePath); inode_found {
  99. // Find open file handle
  100. if fh, fh_found := wfs.fhmap.FindFileHandle(inode); fh_found {
  101. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  102. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  103. // Recreate dirty pages
  104. fh.dirtyPages.Destroy()
  105. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  106. // Update handle entry
  107. newentry, status := wfs.maybeLoadEntry(filePath)
  108. if status == fuse.OK {
  109. if fh.GetEntry().GetEntry() != newentry {
  110. fh.SetEntry(newentry)
  111. }
  112. }
  113. }
  114. }
  115. })
  116. grace.OnInterrupt(func() {
  117. wfs.metaCache.Shutdown()
  118. os.RemoveAll(option.getUniqueCacheDirForWrite())
  119. os.RemoveAll(option.getUniqueCacheDirForRead())
  120. })
  121. if wfs.option.ConcurrentWriters > 0 {
  122. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  123. }
  124. return wfs
  125. }
  126. func (wfs *WFS) StartBackgroundTasks() {
  127. startTime := time.Now()
  128. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  129. go wfs.loopCheckQuota()
  130. }
  131. func (wfs *WFS) String() string {
  132. return "seaweedfs"
  133. }
  134. func (wfs *WFS) Init(server *fuse.Server) {
  135. wfs.fuseServer = server
  136. }
  137. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  138. path, status = wfs.inodeToPath.GetPath(inode)
  139. if status != fuse.OK {
  140. return
  141. }
  142. var found bool
  143. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  144. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  145. if entry != nil && fh.entry.Attributes == nil {
  146. entry.Attributes = &filer_pb.FuseAttributes{}
  147. }
  148. })
  149. } else {
  150. entry, status = wfs.maybeLoadEntry(path)
  151. }
  152. return
  153. }
  154. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  155. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  156. dir, name := fullpath.DirAndName()
  157. // return a valid entry for the mount root
  158. if string(fullpath) == wfs.option.FilerMountRootPath {
  159. return &filer_pb.Entry{
  160. Name: name,
  161. IsDirectory: true,
  162. Attributes: &filer_pb.FuseAttributes{
  163. Mtime: wfs.option.MountMtime.Unix(),
  164. FileMode: uint32(wfs.option.MountMode),
  165. Uid: wfs.option.MountUid,
  166. Gid: wfs.option.MountGid,
  167. Crtime: wfs.option.MountCtime.Unix(),
  168. },
  169. }, fuse.OK
  170. }
  171. // read from async meta cache
  172. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  173. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  174. if cacheErr == filer_pb.ErrNotFound {
  175. return nil, fuse.ENOENT
  176. }
  177. return cachedEntry.ToProtoEntry(), fuse.OK
  178. }
  179. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  180. if wfs.option.VolumeServerAccess == "filerProxy" {
  181. return func(fileId string) (targetUrls []string, err error) {
  182. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  183. }
  184. }
  185. return filer.LookupFn(wfs)
  186. }
  187. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  188. i := atomic.LoadInt32(&wfs.option.filerIndex)
  189. return wfs.option.FilerAddresses[i]
  190. }
  191. func (option *Option) setupUniqueCacheDirectory() {
  192. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  193. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  194. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  195. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  196. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  197. }
  198. func (option *Option) getUniqueCacheDirForWrite() string {
  199. return option.uniqueCacheDirForWrite
  200. }
  201. func (option *Option) getUniqueCacheDirForRead() string {
  202. return option.uniqueCacheDirForRead
  203. }