You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

570 lines
14 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  18. "github.com/chrislusf/seaweedfs/weed/filer2"
  19. "github.com/chrislusf/seaweedfs/weed/glog"
  20. "github.com/chrislusf/seaweedfs/weed/security"
  21. )
  22. type WebDavOption struct {
  23. Filer string
  24. FilerGrpcAddress string
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Uid uint32
  30. Gid uint32
  31. Cipher bool
  32. CacheDir string
  33. CacheSizeMB int64
  34. }
  35. type WebDavServer struct {
  36. option *WebDavOption
  37. secret security.SigningKey
  38. filer *filer2.Filer
  39. grpcDialOption grpc.DialOption
  40. Handler *webdav.Handler
  41. }
  42. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  43. fs, _ := NewWebDavFileSystem(option)
  44. ws = &WebDavServer{
  45. option: option,
  46. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  47. Handler: &webdav.Handler{
  48. FileSystem: fs,
  49. LockSystem: webdav.NewMemLS(),
  50. },
  51. }
  52. return ws, nil
  53. }
  54. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  55. type WebDavFileSystem struct {
  56. option *WebDavOption
  57. secret security.SigningKey
  58. filer *filer2.Filer
  59. grpcDialOption grpc.DialOption
  60. chunkCache *chunk_cache.TieredChunkCache
  61. }
  62. type FileInfo struct {
  63. name string
  64. size int64
  65. mode os.FileMode
  66. modifiledTime time.Time
  67. isDirectory bool
  68. }
  69. func (fi *FileInfo) Name() string { return fi.name }
  70. func (fi *FileInfo) Size() int64 { return fi.size }
  71. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  72. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  73. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  74. func (fi *FileInfo) Sys() interface{} { return nil }
  75. type WebDavFile struct {
  76. fs *WebDavFileSystem
  77. name string
  78. isDirectory bool
  79. off int64
  80. entry *filer_pb.Entry
  81. entryViewCache []filer2.VisibleInterval
  82. reader io.ReaderAt
  83. }
  84. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  85. chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB)
  86. return &WebDavFileSystem{
  87. option: option,
  88. chunkCache: chunkCache,
  89. }, nil
  90. }
  91. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  92. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  93. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  94. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  95. return fn(client)
  96. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  97. }
  98. func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
  99. return hostAndPort
  100. }
  101. func clearName(name string) (string, error) {
  102. slashed := strings.HasSuffix(name, "/")
  103. name = path.Clean(name)
  104. if !strings.HasSuffix(name, "/") && slashed {
  105. name += "/"
  106. }
  107. if !strings.HasPrefix(name, "/") {
  108. return "", os.ErrInvalid
  109. }
  110. return name, nil
  111. }
  112. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  113. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  114. if !strings.HasSuffix(fullDirPath, "/") {
  115. fullDirPath += "/"
  116. }
  117. var err error
  118. if fullDirPath, err = clearName(fullDirPath); err != nil {
  119. return err
  120. }
  121. _, err = fs.stat(ctx, fullDirPath)
  122. if err == nil {
  123. return os.ErrExist
  124. }
  125. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  126. dir, name := util.FullPath(fullDirPath).DirAndName()
  127. request := &filer_pb.CreateEntryRequest{
  128. Directory: dir,
  129. Entry: &filer_pb.Entry{
  130. Name: name,
  131. IsDirectory: true,
  132. Attributes: &filer_pb.FuseAttributes{
  133. Mtime: time.Now().Unix(),
  134. Crtime: time.Now().Unix(),
  135. FileMode: uint32(perm | os.ModeDir),
  136. Uid: fs.option.Uid,
  137. Gid: fs.option.Gid,
  138. },
  139. },
  140. }
  141. glog.V(1).Infof("mkdir: %v", request)
  142. if err := filer_pb.CreateEntry(client, request); err != nil {
  143. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  144. }
  145. return nil
  146. })
  147. }
  148. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  149. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  150. var err error
  151. if fullFilePath, err = clearName(fullFilePath); err != nil {
  152. return nil, err
  153. }
  154. if flag&os.O_CREATE != 0 {
  155. // file should not have / suffix.
  156. if strings.HasSuffix(fullFilePath, "/") {
  157. return nil, os.ErrInvalid
  158. }
  159. _, err = fs.stat(ctx, fullFilePath)
  160. if err == nil {
  161. if flag&os.O_EXCL != 0 {
  162. return nil, os.ErrExist
  163. }
  164. fs.removeAll(ctx, fullFilePath)
  165. }
  166. dir, name := util.FullPath(fullFilePath).DirAndName()
  167. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  168. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  169. Directory: dir,
  170. Entry: &filer_pb.Entry{
  171. Name: name,
  172. IsDirectory: perm&os.ModeDir > 0,
  173. Attributes: &filer_pb.FuseAttributes{
  174. Mtime: time.Now().Unix(),
  175. Crtime: time.Now().Unix(),
  176. FileMode: uint32(perm),
  177. Uid: fs.option.Uid,
  178. Gid: fs.option.Gid,
  179. Collection: fs.option.Collection,
  180. Replication: "000",
  181. TtlSec: 0,
  182. },
  183. },
  184. }); err != nil {
  185. return fmt.Errorf("create %s: %v", fullFilePath, err)
  186. }
  187. return nil
  188. })
  189. if err != nil {
  190. return nil, err
  191. }
  192. return &WebDavFile{
  193. fs: fs,
  194. name: fullFilePath,
  195. isDirectory: false,
  196. }, nil
  197. }
  198. fi, err := fs.stat(ctx, fullFilePath)
  199. if err != nil {
  200. return nil, os.ErrNotExist
  201. }
  202. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  203. fullFilePath += "/"
  204. }
  205. return &WebDavFile{
  206. fs: fs,
  207. name: fullFilePath,
  208. isDirectory: false,
  209. }, nil
  210. }
  211. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  212. var err error
  213. if fullFilePath, err = clearName(fullFilePath); err != nil {
  214. return err
  215. }
  216. dir, name := util.FullPath(fullFilePath).DirAndName()
  217. return filer_pb.Remove(fs, dir, name, true, false, false, false)
  218. }
  219. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  220. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  221. return fs.removeAll(ctx, name)
  222. }
  223. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  224. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  225. var err error
  226. if oldName, err = clearName(oldName); err != nil {
  227. return err
  228. }
  229. if newName, err = clearName(newName); err != nil {
  230. return err
  231. }
  232. of, err := fs.stat(ctx, oldName)
  233. if err != nil {
  234. return os.ErrExist
  235. }
  236. if of.IsDir() {
  237. if strings.HasSuffix(oldName, "/") {
  238. oldName = strings.TrimRight(oldName, "/")
  239. }
  240. if strings.HasSuffix(newName, "/") {
  241. newName = strings.TrimRight(newName, "/")
  242. }
  243. }
  244. _, err = fs.stat(ctx, newName)
  245. if err == nil {
  246. return os.ErrExist
  247. }
  248. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  249. newDir, newBaseName := util.FullPath(newName).DirAndName()
  250. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  251. request := &filer_pb.AtomicRenameEntryRequest{
  252. OldDirectory: oldDir,
  253. OldName: oldBaseName,
  254. NewDirectory: newDir,
  255. NewName: newBaseName,
  256. }
  257. _, err := client.AtomicRenameEntry(ctx, request)
  258. if err != nil {
  259. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  260. }
  261. return nil
  262. })
  263. }
  264. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  265. var err error
  266. if fullFilePath, err = clearName(fullFilePath); err != nil {
  267. return nil, err
  268. }
  269. fullpath := util.FullPath(fullFilePath)
  270. var fi FileInfo
  271. entry, err := filer_pb.GetEntry(fs, fullpath)
  272. if entry == nil {
  273. return nil, os.ErrNotExist
  274. }
  275. if err != nil {
  276. return nil, err
  277. }
  278. fi.size = int64(filer2.FileSize(entry))
  279. fi.name = string(fullpath)
  280. fi.mode = os.FileMode(entry.Attributes.FileMode)
  281. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  282. fi.isDirectory = entry.IsDirectory
  283. if fi.name == "/" {
  284. fi.modifiledTime = time.Now()
  285. fi.isDirectory = true
  286. }
  287. return &fi, nil
  288. }
  289. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  290. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  291. return fs.stat(ctx, name)
  292. }
  293. func (f *WebDavFile) Write(buf []byte) (int, error) {
  294. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  295. dir, _ := util.FullPath(f.name).DirAndName()
  296. var err error
  297. ctx := context.Background()
  298. if f.entry == nil {
  299. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  300. }
  301. if f.entry == nil {
  302. return 0, err
  303. }
  304. if err != nil {
  305. return 0, err
  306. }
  307. var fileId, host string
  308. var auth security.EncodedJwt
  309. var collection, replication string
  310. if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  311. request := &filer_pb.AssignVolumeRequest{
  312. Count: 1,
  313. Replication: "",
  314. Collection: f.fs.option.Collection,
  315. ParentPath: dir,
  316. }
  317. resp, err := client.AssignVolume(ctx, request)
  318. if err != nil {
  319. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  320. return err
  321. }
  322. if resp.Error != "" {
  323. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  324. }
  325. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  326. collection, replication = resp.Collection, resp.Replication
  327. return nil
  328. }); err != nil {
  329. return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  330. }
  331. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  332. uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
  333. if err != nil {
  334. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
  335. return 0, fmt.Errorf("upload data: %v", err)
  336. }
  337. if uploadResult.Error != "" {
  338. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
  339. return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
  340. }
  341. f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off))
  342. err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  343. f.entry.Attributes.Mtime = time.Now().Unix()
  344. f.entry.Attributes.Collection = collection
  345. f.entry.Attributes.Replication = replication
  346. request := &filer_pb.UpdateEntryRequest{
  347. Directory: dir,
  348. Entry: f.entry,
  349. }
  350. if _, err := client.UpdateEntry(ctx, request); err != nil {
  351. return fmt.Errorf("update %s: %v", f.name, err)
  352. }
  353. return nil
  354. })
  355. if err == nil {
  356. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  357. f.off += int64(len(buf))
  358. }
  359. return len(buf), err
  360. }
  361. func (f *WebDavFile) Close() error {
  362. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  363. if f.entry != nil {
  364. f.entry = nil
  365. f.entryViewCache = nil
  366. }
  367. return nil
  368. }
  369. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  370. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  371. if f.entry == nil {
  372. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  373. }
  374. if f.entry == nil {
  375. return 0, err
  376. }
  377. if err != nil {
  378. return 0, err
  379. }
  380. fileSize := int64(filer2.FileSize(f.entry))
  381. if fileSize == 0 {
  382. return 0, io.EOF
  383. }
  384. if f.entryViewCache == nil {
  385. f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks)
  386. f.reader = nil
  387. }
  388. if f.reader == nil {
  389. chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
  390. f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
  391. }
  392. readSize, err = f.reader.ReadAt(p, f.off)
  393. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  394. f.off += int64(readSize)
  395. if err == io.EOF {
  396. err = nil
  397. }
  398. if err != nil {
  399. glog.Errorf("file read %s: %v", f.name, err)
  400. }
  401. return
  402. }
  403. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  404. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  405. dir, _ := util.FullPath(f.name).DirAndName()
  406. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  407. fi := FileInfo{
  408. size: int64(filer2.FileSize(entry)),
  409. name: entry.Name,
  410. mode: os.FileMode(entry.Attributes.FileMode),
  411. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  412. isDirectory: entry.IsDirectory,
  413. }
  414. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  415. fi.name += "/"
  416. }
  417. glog.V(4).Infof("entry: %v", fi.name)
  418. ret = append(ret, &fi)
  419. return nil
  420. })
  421. old := f.off
  422. if old >= int64(len(ret)) {
  423. if count > 0 {
  424. return nil, io.EOF
  425. }
  426. return nil, nil
  427. }
  428. if count > 0 {
  429. f.off += int64(count)
  430. if f.off > int64(len(ret)) {
  431. f.off = int64(len(ret))
  432. }
  433. } else {
  434. f.off = int64(len(ret))
  435. old = 0
  436. }
  437. return ret[old:f.off], nil
  438. }
  439. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  440. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  441. ctx := context.Background()
  442. var err error
  443. switch whence {
  444. case 0:
  445. f.off = 0
  446. case 2:
  447. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  448. return 0, err
  449. } else {
  450. f.off = fi.Size()
  451. }
  452. }
  453. f.off += offset
  454. return f.off, err
  455. }
  456. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  457. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  458. ctx := context.Background()
  459. return f.fs.stat(ctx, f.name)
  460. }