You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

273 lines
8.6 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. //go:build linux || darwin
  2. // +build linux darwin
  3. package command
  4. import (
  5. "context"
  6. "fmt"
  7. "github.com/hanwen/go-fuse/v2/fuse"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/mount"
  10. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  11. "github.com/seaweedfs/seaweedfs/weed/mount/unmount"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/security"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "google.golang.org/grpc/reflection"
  18. "net"
  19. "net/http"
  20. "os"
  21. "os/user"
  22. "runtime"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/seaweedfs/seaweedfs/weed/util"
  27. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  28. )
  29. func runMount(cmd *Command, args []string) bool {
  30. if *mountOptions.debug {
  31. go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil)
  32. }
  33. grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
  34. if *mountReadRetryTime < time.Second {
  35. *mountReadRetryTime = time.Second
  36. }
  37. util.RetryWaitTime = *mountReadRetryTime
  38. umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
  39. if umaskErr != nil {
  40. fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
  41. return false
  42. }
  43. if len(args) > 0 {
  44. return false
  45. }
  46. return RunMount(&mountOptions, os.FileMode(umask))
  47. }
  48. func RunMount(option *MountOptions, umask os.FileMode) bool {
  49. // basic checks
  50. chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
  51. if chunkSizeLimitMB <= 0 {
  52. fmt.Printf("Please specify a reasonable buffer size.")
  53. return false
  54. }
  55. // try to connect to filer
  56. filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
  57. util.LoadConfiguration("security", false)
  58. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  59. var cipher bool
  60. var err error
  61. for i := 0; i < 10; i++ {
  62. err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  63. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  64. if err != nil {
  65. return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err)
  66. }
  67. cipher = resp.Cipher
  68. return nil
  69. })
  70. if err != nil {
  71. glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
  72. glog.V(0).Infof("wait for %d seconds ...", i+1)
  73. time.Sleep(time.Duration(i+1) * time.Second)
  74. }
  75. }
  76. if err != nil {
  77. glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
  78. return true
  79. }
  80. filerMountRootPath := *option.filerMountRootPath
  81. // clean up mount point
  82. dir := util.ResolvePath(*option.dir)
  83. if dir == "" {
  84. fmt.Printf("Please specify the mount directory via \"-dir\"")
  85. return false
  86. }
  87. unmount.Unmount(dir)
  88. // start on local unix socket
  89. if *option.localSocket == "" {
  90. mountDirHash := util.HashToInt32([]byte(dir))
  91. if mountDirHash < 0 {
  92. mountDirHash = -mountDirHash
  93. }
  94. *option.localSocket = fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", mountDirHash)
  95. }
  96. if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
  97. glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
  98. }
  99. montSocketListener, err := net.Listen("unix", *option.localSocket)
  100. if err != nil {
  101. glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err)
  102. }
  103. // detect mount folder mode
  104. if *option.dirAutoCreate {
  105. os.MkdirAll(dir, os.FileMode(0777)&^umask)
  106. }
  107. fileInfo, err := os.Stat(dir)
  108. // collect uid, gid
  109. uid, gid := uint32(0), uint32(0)
  110. mountMode := os.ModeDir | 0777
  111. if err == nil {
  112. mountMode = os.ModeDir | os.FileMode(0777)&^umask
  113. uid, gid = util.GetFileUidGid(fileInfo)
  114. fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
  115. } else {
  116. fmt.Printf("can not stat %s\n", dir)
  117. return false
  118. }
  119. // detect uid, gid
  120. if uid == 0 {
  121. if u, err := user.Current(); err == nil {
  122. if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
  123. uid = uint32(parsedId)
  124. }
  125. if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
  126. gid = uint32(parsedId)
  127. }
  128. fmt.Printf("current uid=%d gid=%d\n", uid, gid)
  129. }
  130. }
  131. // mapping uid, gid
  132. uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
  133. if err != nil {
  134. fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
  135. return false
  136. }
  137. // Ensure target mount point availability
  138. if isValid := checkMountPointAvailable(dir); !isValid {
  139. glog.Fatalf("Target mount point is not available: %s, please check!", dir)
  140. return true
  141. }
  142. serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+")
  143. // mount fuse
  144. fuseMountOptions := &fuse.MountOptions{
  145. AllowOther: *option.allowOthers,
  146. Options: option.extraOptions,
  147. MaxBackground: 128,
  148. MaxWrite: 1024 * 1024 * 2,
  149. MaxReadAhead: 1024 * 1024 * 2,
  150. IgnoreSecurityLabels: false,
  151. RememberInodes: false,
  152. FsName: serverFriendlyName + ":" + filerMountRootPath,
  153. Name: "seaweedfs",
  154. SingleThreaded: false,
  155. DisableXAttrs: *option.disableXAttr,
  156. Debug: *option.debug,
  157. EnableLocks: false,
  158. ExplicitDataCacheControl: false,
  159. DirectMount: true,
  160. DirectMountFlags: 0,
  161. //SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
  162. EnableAcl: true,
  163. }
  164. if *option.nonempty {
  165. fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty")
  166. }
  167. if *option.readOnly {
  168. if runtime.GOOS == "darwin" {
  169. fuseMountOptions.Options = append(fuseMountOptions.Options, "rdonly")
  170. } else {
  171. fuseMountOptions.Options = append(fuseMountOptions.Options, "ro")
  172. }
  173. }
  174. if runtime.GOOS == "darwin" {
  175. // https://github-wiki-see.page/m/macfuse/macfuse/wiki/Mount-Options
  176. ioSizeMB := 1
  177. for ioSizeMB*2 <= *option.chunkSizeLimitMB && ioSizeMB*2 <= 32 {
  178. ioSizeMB *= 2
  179. }
  180. fuseMountOptions.Options = append(fuseMountOptions.Options, "daemon_timeout=600")
  181. fuseMountOptions.Options = append(fuseMountOptions.Options, "noapplexattr")
  182. // fuseMountOptions.Options = append(fuseMountOptions.Options, "novncache") // need to test effectiveness
  183. fuseMountOptions.Options = append(fuseMountOptions.Options, "slow_statfs")
  184. fuseMountOptions.Options = append(fuseMountOptions.Options, "volname="+serverFriendlyName)
  185. fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024))
  186. }
  187. // find mount point
  188. mountRoot := filerMountRootPath
  189. if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
  190. mountRoot = mountRoot[0 : len(mountRoot)-1]
  191. }
  192. seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
  193. MountDirectory: dir,
  194. FilerAddresses: filerAddresses,
  195. GrpcDialOption: grpcDialOption,
  196. FilerMountRootPath: mountRoot,
  197. Collection: *option.collection,
  198. Replication: *option.replication,
  199. TtlSec: int32(*option.ttlSec),
  200. DiskType: types.ToDiskType(*option.diskType),
  201. ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
  202. ConcurrentWriters: *option.concurrentWriters,
  203. CacheDir: *option.cacheDir,
  204. CacheSizeMB: *option.cacheSizeMB,
  205. DataCenter: *option.dataCenter,
  206. Quota: int64(*option.collectionQuota) * 1024 * 1024,
  207. MountUid: uid,
  208. MountGid: gid,
  209. MountMode: mountMode,
  210. MountCtime: fileInfo.ModTime(),
  211. MountMtime: time.Now(),
  212. Umask: umask,
  213. VolumeServerAccess: *mountOptions.volumeServerAccess,
  214. Cipher: cipher,
  215. UidGidMapper: uidGidMapper,
  216. DisableXAttr: *option.disableXAttr,
  217. })
  218. // create mount root
  219. mountRootPath := util.FullPath(mountRoot)
  220. mountRootParent, mountDir := mountRootPath.DirAndName()
  221. if err = filer_pb.Mkdir(seaweedFileSystem, mountRootParent, mountDir, nil); err != nil {
  222. fmt.Printf("failed to create dir %s on filer %s: %v\n", mountRoot, filerAddresses, err)
  223. return false
  224. }
  225. server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
  226. if err != nil {
  227. glog.Fatalf("Mount fail: %v", err)
  228. }
  229. grace.OnInterrupt(func() {
  230. unmount.Unmount(dir)
  231. })
  232. grpcS := pb.NewGrpcServer()
  233. mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
  234. reflection.Register(grpcS)
  235. go grpcS.Serve(montSocketListener)
  236. seaweedFileSystem.StartBackgroundTasks()
  237. glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
  238. glog.V(0).Infof("This is SeaweedFS version %s %s %s", util.Version(), runtime.GOOS, runtime.GOARCH)
  239. server.Serve()
  240. return true
  241. }