You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

638 lines
16 KiB

5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  12. "golang.org/x/net/webdav"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  19. "github.com/seaweedfs/seaweedfs/weed/filer"
  20. "github.com/seaweedfs/seaweedfs/weed/glog"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. )
  23. type WebDavOption struct {
  24. Filer pb.ServerAddress
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. }
  37. type WebDavServer struct {
  38. option *WebDavOption
  39. secret security.SigningKey
  40. filer *filer.Filer
  41. grpcDialOption grpc.DialOption
  42. Handler *webdav.Handler
  43. }
  44. func max(x, y int64) int64 {
  45. if x <= y {
  46. return y
  47. }
  48. return x
  49. }
  50. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  51. fs, _ := NewWebDavFileSystem(option)
  52. ws = &WebDavServer{
  53. option: option,
  54. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  55. Handler: &webdav.Handler{
  56. FileSystem: fs,
  57. LockSystem: webdav.NewMemLS(),
  58. },
  59. }
  60. return ws, nil
  61. }
  62. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  63. type WebDavFileSystem struct {
  64. option *WebDavOption
  65. secret security.SigningKey
  66. filer *filer.Filer
  67. grpcDialOption grpc.DialOption
  68. chunkCache *chunk_cache.TieredChunkCache
  69. signature int32
  70. }
  71. type FileInfo struct {
  72. name string
  73. size int64
  74. mode os.FileMode
  75. modifiledTime time.Time
  76. isDirectory bool
  77. }
  78. func (fi *FileInfo) Name() string { return fi.name }
  79. func (fi *FileInfo) Size() int64 { return fi.size }
  80. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  81. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  82. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  83. func (fi *FileInfo) Sys() interface{} { return nil }
  84. type WebDavFile struct {
  85. fs *WebDavFileSystem
  86. name string
  87. isDirectory bool
  88. off int64
  89. entry *filer_pb.Entry
  90. entryViewCache []filer.VisibleInterval
  91. reader io.ReaderAt
  92. bufWriter *buffered_writer.BufferedWriteCloser
  93. collection string
  94. replication string
  95. }
  96. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  97. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  98. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  99. os.MkdirAll(cacheDir, os.FileMode(0755))
  100. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  101. return &WebDavFileSystem{
  102. option: option,
  103. chunkCache: chunkCache,
  104. signature: util.RandomInt32(),
  105. }, nil
  106. }
  107. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  108. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  109. return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
  110. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  111. return fn(client)
  112. }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
  113. }
  114. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  115. return location.Url
  116. }
  117. func clearName(name string) (string, error) {
  118. slashed := strings.HasSuffix(name, "/")
  119. name = path.Clean(name)
  120. if !strings.HasSuffix(name, "/") && slashed {
  121. name += "/"
  122. }
  123. if !strings.HasPrefix(name, "/") {
  124. return "", os.ErrInvalid
  125. }
  126. return name, nil
  127. }
  128. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  129. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  130. if !strings.HasSuffix(fullDirPath, "/") {
  131. fullDirPath += "/"
  132. }
  133. var err error
  134. if fullDirPath, err = clearName(fullDirPath); err != nil {
  135. return err
  136. }
  137. _, err = fs.stat(ctx, fullDirPath)
  138. if err == nil {
  139. return os.ErrExist
  140. }
  141. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  142. dir, name := util.FullPath(fullDirPath).DirAndName()
  143. request := &filer_pb.CreateEntryRequest{
  144. Directory: dir,
  145. Entry: &filer_pb.Entry{
  146. Name: name,
  147. IsDirectory: true,
  148. Attributes: &filer_pb.FuseAttributes{
  149. Mtime: time.Now().Unix(),
  150. Crtime: time.Now().Unix(),
  151. FileMode: uint32(perm | os.ModeDir),
  152. Uid: fs.option.Uid,
  153. Gid: fs.option.Gid,
  154. },
  155. },
  156. Signatures: []int32{fs.signature},
  157. }
  158. glog.V(1).Infof("mkdir: %v", request)
  159. if err := filer_pb.CreateEntry(client, request); err != nil {
  160. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  161. }
  162. return nil
  163. })
  164. }
  165. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  166. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  167. var err error
  168. if fullFilePath, err = clearName(fullFilePath); err != nil {
  169. return nil, err
  170. }
  171. if flag&os.O_CREATE != 0 {
  172. // file should not have / suffix.
  173. if strings.HasSuffix(fullFilePath, "/") {
  174. return nil, os.ErrInvalid
  175. }
  176. _, err = fs.stat(ctx, fullFilePath)
  177. if err == nil {
  178. if flag&os.O_EXCL != 0 {
  179. return nil, os.ErrExist
  180. }
  181. fs.removeAll(ctx, fullFilePath)
  182. }
  183. dir, name := util.FullPath(fullFilePath).DirAndName()
  184. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  185. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  186. Directory: dir,
  187. Entry: &filer_pb.Entry{
  188. Name: name,
  189. IsDirectory: perm&os.ModeDir > 0,
  190. Attributes: &filer_pb.FuseAttributes{
  191. Mtime: time.Now().Unix(),
  192. Crtime: time.Now().Unix(),
  193. FileMode: uint32(perm),
  194. Uid: fs.option.Uid,
  195. Gid: fs.option.Gid,
  196. TtlSec: 0,
  197. },
  198. },
  199. Signatures: []int32{fs.signature},
  200. }); err != nil {
  201. return fmt.Errorf("create %s: %v", fullFilePath, err)
  202. }
  203. return nil
  204. })
  205. if err != nil {
  206. return nil, err
  207. }
  208. return &WebDavFile{
  209. fs: fs,
  210. name: fullFilePath,
  211. isDirectory: false,
  212. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  213. }, nil
  214. }
  215. fi, err := fs.stat(ctx, fullFilePath)
  216. if err != nil {
  217. return nil, os.ErrNotExist
  218. }
  219. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  220. fullFilePath += "/"
  221. }
  222. return &WebDavFile{
  223. fs: fs,
  224. name: fullFilePath,
  225. isDirectory: false,
  226. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  227. }, nil
  228. }
  229. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  230. var err error
  231. if fullFilePath, err = clearName(fullFilePath); err != nil {
  232. return err
  233. }
  234. dir, name := util.FullPath(fullFilePath).DirAndName()
  235. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  236. }
  237. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  238. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  239. return fs.removeAll(ctx, name)
  240. }
  241. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  242. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  243. var err error
  244. if oldName, err = clearName(oldName); err != nil {
  245. return err
  246. }
  247. if newName, err = clearName(newName); err != nil {
  248. return err
  249. }
  250. of, err := fs.stat(ctx, oldName)
  251. if err != nil {
  252. return os.ErrExist
  253. }
  254. if of.IsDir() {
  255. if strings.HasSuffix(oldName, "/") {
  256. oldName = strings.TrimRight(oldName, "/")
  257. }
  258. if strings.HasSuffix(newName, "/") {
  259. newName = strings.TrimRight(newName, "/")
  260. }
  261. }
  262. _, err = fs.stat(ctx, newName)
  263. if err == nil {
  264. return os.ErrExist
  265. }
  266. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  267. newDir, newBaseName := util.FullPath(newName).DirAndName()
  268. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  269. request := &filer_pb.AtomicRenameEntryRequest{
  270. OldDirectory: oldDir,
  271. OldName: oldBaseName,
  272. NewDirectory: newDir,
  273. NewName: newBaseName,
  274. }
  275. _, err := client.AtomicRenameEntry(ctx, request)
  276. if err != nil {
  277. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  278. }
  279. return nil
  280. })
  281. }
  282. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  283. var err error
  284. if fullFilePath, err = clearName(fullFilePath); err != nil {
  285. return nil, err
  286. }
  287. fullpath := util.FullPath(fullFilePath)
  288. var fi FileInfo
  289. entry, err := filer_pb.GetEntry(fs, fullpath)
  290. if entry == nil {
  291. return nil, os.ErrNotExist
  292. }
  293. if err != nil {
  294. return nil, err
  295. }
  296. fi.size = int64(filer.FileSize(entry))
  297. fi.name = string(fullpath)
  298. fi.mode = os.FileMode(entry.Attributes.FileMode)
  299. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  300. fi.isDirectory = entry.IsDirectory
  301. if fi.name == "/" {
  302. fi.modifiledTime = time.Now()
  303. fi.isDirectory = true
  304. }
  305. return &fi, nil
  306. }
  307. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  308. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  309. return fs.stat(ctx, name)
  310. }
  311. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  312. var fileId, host string
  313. var auth security.EncodedJwt
  314. if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  315. ctx := context.Background()
  316. assignErr := util.Retry("assignVolume", func() error {
  317. request := &filer_pb.AssignVolumeRequest{
  318. Count: 1,
  319. Replication: f.fs.option.Replication,
  320. Collection: f.fs.option.Collection,
  321. DiskType: f.fs.option.DiskType,
  322. Path: name,
  323. }
  324. resp, err := client.AssignVolume(ctx, request)
  325. if err != nil {
  326. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  327. return err
  328. }
  329. if resp.Error != "" {
  330. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  331. }
  332. fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
  333. f.collection, f.replication = resp.Collection, resp.Replication
  334. return nil
  335. })
  336. if assignErr != nil {
  337. return assignErr
  338. }
  339. return nil
  340. }); flushErr != nil {
  341. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  342. }
  343. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  344. uploadOption := &operation.UploadOption{
  345. UploadUrl: fileUrl,
  346. Filename: f.name,
  347. Cipher: f.fs.option.Cipher,
  348. IsInputCompressed: false,
  349. MimeType: "",
  350. PairMap: nil,
  351. Jwt: auth,
  352. }
  353. uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
  354. if flushErr != nil {
  355. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  356. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  357. }
  358. if uploadResult.Error != "" {
  359. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  360. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  361. }
  362. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  363. }
  364. func (f *WebDavFile) Write(buf []byte) (int, error) {
  365. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  366. dir, _ := util.FullPath(f.name).DirAndName()
  367. var getErr error
  368. ctx := context.Background()
  369. if f.entry == nil {
  370. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  371. }
  372. if f.entry == nil {
  373. return 0, getErr
  374. }
  375. if getErr != nil {
  376. return 0, getErr
  377. }
  378. if f.bufWriter.FlushFunc == nil {
  379. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  380. var chunk *filer_pb.FileChunk
  381. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  382. if flushErr != nil {
  383. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  384. }
  385. f.entry.Content = nil
  386. f.entry.Chunks = append(f.entry.Chunks, chunk)
  387. return flushErr
  388. }
  389. f.bufWriter.CloseFunc = func() error {
  390. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  391. if manifestErr != nil {
  392. // not good, but should be ok
  393. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  394. } else {
  395. f.entry.Chunks = manifestedChunks
  396. }
  397. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  398. f.entry.Attributes.Mtime = time.Now().Unix()
  399. request := &filer_pb.UpdateEntryRequest{
  400. Directory: dir,
  401. Entry: f.entry,
  402. Signatures: []int32{f.fs.signature},
  403. }
  404. if _, err := client.UpdateEntry(ctx, request); err != nil {
  405. return fmt.Errorf("update %s: %v", f.name, err)
  406. }
  407. return nil
  408. })
  409. return flushErr
  410. }
  411. }
  412. written, err := f.bufWriter.Write(buf)
  413. if err == nil {
  414. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  415. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  416. f.off += int64(written)
  417. }
  418. return written, err
  419. }
  420. func (f *WebDavFile) Close() error {
  421. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  422. err := f.bufWriter.Close()
  423. if f.entry != nil {
  424. f.entry = nil
  425. f.entryViewCache = nil
  426. }
  427. return err
  428. }
  429. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  430. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  431. if f.entry == nil {
  432. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  433. }
  434. if f.entry == nil {
  435. return 0, err
  436. }
  437. if err != nil {
  438. return 0, err
  439. }
  440. fileSize := int64(filer.FileSize(f.entry))
  441. if fileSize == 0 {
  442. return 0, io.EOF
  443. }
  444. if f.entryViewCache == nil {
  445. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
  446. f.reader = nil
  447. }
  448. if f.reader == nil {
  449. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
  450. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  451. }
  452. readSize, err = f.reader.ReadAt(p, f.off)
  453. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  454. f.off += int64(readSize)
  455. if err != nil && err != io.EOF {
  456. glog.Errorf("file read %s: %v", f.name, err)
  457. }
  458. return
  459. }
  460. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  461. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  462. dir, _ := util.FullPath(f.name).DirAndName()
  463. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  464. fi := FileInfo{
  465. size: int64(filer.FileSize(entry)),
  466. name: entry.Name,
  467. mode: os.FileMode(entry.Attributes.FileMode),
  468. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  469. isDirectory: entry.IsDirectory,
  470. }
  471. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  472. fi.name += "/"
  473. }
  474. glog.V(4).Infof("entry: %v", fi.name)
  475. ret = append(ret, &fi)
  476. return nil
  477. })
  478. old := f.off
  479. if old >= int64(len(ret)) {
  480. if count > 0 {
  481. return nil, io.EOF
  482. }
  483. return nil, nil
  484. }
  485. if count > 0 {
  486. f.off += int64(count)
  487. if f.off > int64(len(ret)) {
  488. f.off = int64(len(ret))
  489. }
  490. } else {
  491. f.off = int64(len(ret))
  492. old = 0
  493. }
  494. return ret[old:f.off], nil
  495. }
  496. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  497. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  498. ctx := context.Background()
  499. var err error
  500. switch whence {
  501. case io.SeekStart:
  502. f.off = 0
  503. case io.SeekEnd:
  504. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  505. return 0, err
  506. } else {
  507. f.off = fi.Size()
  508. }
  509. }
  510. f.off += offset
  511. return f.off, err
  512. }
  513. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  514. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  515. ctx := context.Background()
  516. return f.fs.stat(ctx, f.name)
  517. }