You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

641 lines
16 KiB

5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  12. "golang.org/x/net/webdav"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  19. "github.com/seaweedfs/seaweedfs/weed/filer"
  20. "github.com/seaweedfs/seaweedfs/weed/glog"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. )
  23. type WebDavOption struct {
  24. Filer pb.ServerAddress
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. }
  37. type WebDavServer struct {
  38. option *WebDavOption
  39. secret security.SigningKey
  40. filer *filer.Filer
  41. grpcDialOption grpc.DialOption
  42. Handler *webdav.Handler
  43. }
  44. func max(x, y int64) int64 {
  45. if x <= y {
  46. return y
  47. }
  48. return x
  49. }
  50. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  51. fs, _ := NewWebDavFileSystem(option)
  52. ws = &WebDavServer{
  53. option: option,
  54. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  55. Handler: &webdav.Handler{
  56. FileSystem: fs,
  57. LockSystem: webdav.NewMemLS(),
  58. },
  59. }
  60. return ws, nil
  61. }
  62. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  63. type WebDavFileSystem struct {
  64. option *WebDavOption
  65. secret security.SigningKey
  66. filer *filer.Filer
  67. grpcDialOption grpc.DialOption
  68. chunkCache *chunk_cache.TieredChunkCache
  69. signature int32
  70. }
  71. type FileInfo struct {
  72. name string
  73. size int64
  74. mode os.FileMode
  75. modifiledTime time.Time
  76. isDirectory bool
  77. }
  78. func (fi *FileInfo) Name() string { return fi.name }
  79. func (fi *FileInfo) Size() int64 { return fi.size }
  80. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  81. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  82. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  83. func (fi *FileInfo) Sys() interface{} { return nil }
  84. type WebDavFile struct {
  85. fs *WebDavFileSystem
  86. name string
  87. isDirectory bool
  88. off int64
  89. entry *filer_pb.Entry
  90. entryViewCache []filer.VisibleInterval
  91. reader io.ReaderAt
  92. bufWriter *buffered_writer.BufferedWriteCloser
  93. collection string
  94. replication string
  95. }
  96. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  97. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  98. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  99. os.MkdirAll(cacheDir, os.FileMode(0755))
  100. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  101. return &WebDavFileSystem{
  102. option: option,
  103. chunkCache: chunkCache,
  104. signature: util.RandomInt32(),
  105. }, nil
  106. }
  107. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  108. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  109. return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
  110. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  111. return fn(client)
  112. }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
  113. }
  114. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  115. return location.Url
  116. }
  117. func (fs *WebDavFileSystem) GetDataCenter() string {
  118. return fs.filer.MasterClient.DataCenter
  119. }
  120. func clearName(name string) (string, error) {
  121. slashed := strings.HasSuffix(name, "/")
  122. name = path.Clean(name)
  123. if !strings.HasSuffix(name, "/") && slashed {
  124. name += "/"
  125. }
  126. if !strings.HasPrefix(name, "/") {
  127. return "", os.ErrInvalid
  128. }
  129. return name, nil
  130. }
  131. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  132. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  133. if !strings.HasSuffix(fullDirPath, "/") {
  134. fullDirPath += "/"
  135. }
  136. var err error
  137. if fullDirPath, err = clearName(fullDirPath); err != nil {
  138. return err
  139. }
  140. _, err = fs.stat(ctx, fullDirPath)
  141. if err == nil {
  142. return os.ErrExist
  143. }
  144. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  145. dir, name := util.FullPath(fullDirPath).DirAndName()
  146. request := &filer_pb.CreateEntryRequest{
  147. Directory: dir,
  148. Entry: &filer_pb.Entry{
  149. Name: name,
  150. IsDirectory: true,
  151. Attributes: &filer_pb.FuseAttributes{
  152. Mtime: time.Now().Unix(),
  153. Crtime: time.Now().Unix(),
  154. FileMode: uint32(perm | os.ModeDir),
  155. Uid: fs.option.Uid,
  156. Gid: fs.option.Gid,
  157. },
  158. },
  159. Signatures: []int32{fs.signature},
  160. }
  161. glog.V(1).Infof("mkdir: %v", request)
  162. if err := filer_pb.CreateEntry(client, request); err != nil {
  163. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  164. }
  165. return nil
  166. })
  167. }
  168. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  169. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  170. var err error
  171. if fullFilePath, err = clearName(fullFilePath); err != nil {
  172. return nil, err
  173. }
  174. if flag&os.O_CREATE != 0 {
  175. // file should not have / suffix.
  176. if strings.HasSuffix(fullFilePath, "/") {
  177. return nil, os.ErrInvalid
  178. }
  179. _, err = fs.stat(ctx, fullFilePath)
  180. if err == nil {
  181. if flag&os.O_EXCL != 0 {
  182. return nil, os.ErrExist
  183. }
  184. fs.removeAll(ctx, fullFilePath)
  185. }
  186. dir, name := util.FullPath(fullFilePath).DirAndName()
  187. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  188. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  189. Directory: dir,
  190. Entry: &filer_pb.Entry{
  191. Name: name,
  192. IsDirectory: perm&os.ModeDir > 0,
  193. Attributes: &filer_pb.FuseAttributes{
  194. Mtime: time.Now().Unix(),
  195. Crtime: time.Now().Unix(),
  196. FileMode: uint32(perm),
  197. Uid: fs.option.Uid,
  198. Gid: fs.option.Gid,
  199. TtlSec: 0,
  200. },
  201. },
  202. Signatures: []int32{fs.signature},
  203. }); err != nil {
  204. return fmt.Errorf("create %s: %v", fullFilePath, err)
  205. }
  206. return nil
  207. })
  208. if err != nil {
  209. return nil, err
  210. }
  211. return &WebDavFile{
  212. fs: fs,
  213. name: fullFilePath,
  214. isDirectory: false,
  215. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  216. }, nil
  217. }
  218. fi, err := fs.stat(ctx, fullFilePath)
  219. if err != nil {
  220. return nil, os.ErrNotExist
  221. }
  222. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  223. fullFilePath += "/"
  224. }
  225. return &WebDavFile{
  226. fs: fs,
  227. name: fullFilePath,
  228. isDirectory: false,
  229. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  230. }, nil
  231. }
  232. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  233. var err error
  234. if fullFilePath, err = clearName(fullFilePath); err != nil {
  235. return err
  236. }
  237. dir, name := util.FullPath(fullFilePath).DirAndName()
  238. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  239. }
  240. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  241. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  242. return fs.removeAll(ctx, name)
  243. }
  244. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  245. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  246. var err error
  247. if oldName, err = clearName(oldName); err != nil {
  248. return err
  249. }
  250. if newName, err = clearName(newName); err != nil {
  251. return err
  252. }
  253. of, err := fs.stat(ctx, oldName)
  254. if err != nil {
  255. return os.ErrExist
  256. }
  257. if of.IsDir() {
  258. if strings.HasSuffix(oldName, "/") {
  259. oldName = strings.TrimRight(oldName, "/")
  260. }
  261. if strings.HasSuffix(newName, "/") {
  262. newName = strings.TrimRight(newName, "/")
  263. }
  264. }
  265. _, err = fs.stat(ctx, newName)
  266. if err == nil {
  267. return os.ErrExist
  268. }
  269. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  270. newDir, newBaseName := util.FullPath(newName).DirAndName()
  271. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  272. request := &filer_pb.AtomicRenameEntryRequest{
  273. OldDirectory: oldDir,
  274. OldName: oldBaseName,
  275. NewDirectory: newDir,
  276. NewName: newBaseName,
  277. }
  278. _, err := client.AtomicRenameEntry(ctx, request)
  279. if err != nil {
  280. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  281. }
  282. return nil
  283. })
  284. }
  285. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  286. var err error
  287. if fullFilePath, err = clearName(fullFilePath); err != nil {
  288. return nil, err
  289. }
  290. fullpath := util.FullPath(fullFilePath)
  291. var fi FileInfo
  292. entry, err := filer_pb.GetEntry(fs, fullpath)
  293. if entry == nil {
  294. return nil, os.ErrNotExist
  295. }
  296. if err != nil {
  297. return nil, err
  298. }
  299. fi.size = int64(filer.FileSize(entry))
  300. fi.name = string(fullpath)
  301. fi.mode = os.FileMode(entry.Attributes.FileMode)
  302. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  303. fi.isDirectory = entry.IsDirectory
  304. if fi.name == "/" {
  305. fi.modifiledTime = time.Now()
  306. fi.isDirectory = true
  307. }
  308. return &fi, nil
  309. }
  310. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  311. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  312. return fs.stat(ctx, name)
  313. }
  314. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  315. var fileId, host string
  316. var auth security.EncodedJwt
  317. if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  318. ctx := context.Background()
  319. assignErr := util.Retry("assignVolume", func() error {
  320. request := &filer_pb.AssignVolumeRequest{
  321. Count: 1,
  322. Replication: f.fs.option.Replication,
  323. Collection: f.fs.option.Collection,
  324. DiskType: f.fs.option.DiskType,
  325. Path: name,
  326. }
  327. resp, err := client.AssignVolume(ctx, request)
  328. if err != nil {
  329. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  330. return err
  331. }
  332. if resp.Error != "" {
  333. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  334. }
  335. fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
  336. f.collection, f.replication = resp.Collection, resp.Replication
  337. return nil
  338. })
  339. if assignErr != nil {
  340. return assignErr
  341. }
  342. return nil
  343. }); flushErr != nil {
  344. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  345. }
  346. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  347. uploadOption := &operation.UploadOption{
  348. UploadUrl: fileUrl,
  349. Filename: f.name,
  350. Cipher: f.fs.option.Cipher,
  351. IsInputCompressed: false,
  352. MimeType: "",
  353. PairMap: nil,
  354. Jwt: auth,
  355. }
  356. uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
  357. if flushErr != nil {
  358. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  359. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  360. }
  361. if uploadResult.Error != "" {
  362. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  363. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  364. }
  365. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  366. }
  367. func (f *WebDavFile) Write(buf []byte) (int, error) {
  368. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  369. dir, _ := util.FullPath(f.name).DirAndName()
  370. var getErr error
  371. ctx := context.Background()
  372. if f.entry == nil {
  373. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  374. }
  375. if f.entry == nil {
  376. return 0, getErr
  377. }
  378. if getErr != nil {
  379. return 0, getErr
  380. }
  381. if f.bufWriter.FlushFunc == nil {
  382. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  383. var chunk *filer_pb.FileChunk
  384. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  385. if flushErr != nil {
  386. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  387. }
  388. f.entry.Content = nil
  389. f.entry.Chunks = append(f.entry.Chunks, chunk)
  390. return flushErr
  391. }
  392. f.bufWriter.CloseFunc = func() error {
  393. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  394. if manifestErr != nil {
  395. // not good, but should be ok
  396. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  397. } else {
  398. f.entry.Chunks = manifestedChunks
  399. }
  400. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  401. f.entry.Attributes.Mtime = time.Now().Unix()
  402. request := &filer_pb.UpdateEntryRequest{
  403. Directory: dir,
  404. Entry: f.entry,
  405. Signatures: []int32{f.fs.signature},
  406. }
  407. if _, err := client.UpdateEntry(ctx, request); err != nil {
  408. return fmt.Errorf("update %s: %v", f.name, err)
  409. }
  410. return nil
  411. })
  412. return flushErr
  413. }
  414. }
  415. written, err := f.bufWriter.Write(buf)
  416. if err == nil {
  417. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  418. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  419. f.off += int64(written)
  420. }
  421. return written, err
  422. }
  423. func (f *WebDavFile) Close() error {
  424. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  425. err := f.bufWriter.Close()
  426. if f.entry != nil {
  427. f.entry = nil
  428. f.entryViewCache = nil
  429. }
  430. return err
  431. }
  432. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  433. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  434. if f.entry == nil {
  435. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  436. }
  437. if f.entry == nil {
  438. return 0, err
  439. }
  440. if err != nil {
  441. return 0, err
  442. }
  443. fileSize := int64(filer.FileSize(f.entry))
  444. if fileSize == 0 {
  445. return 0, io.EOF
  446. }
  447. if f.entryViewCache == nil {
  448. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
  449. f.reader = nil
  450. }
  451. if f.reader == nil {
  452. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
  453. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  454. }
  455. readSize, err = f.reader.ReadAt(p, f.off)
  456. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  457. f.off += int64(readSize)
  458. if err != nil && err != io.EOF {
  459. glog.Errorf("file read %s: %v", f.name, err)
  460. }
  461. return
  462. }
  463. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  464. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  465. dir, _ := util.FullPath(f.name).DirAndName()
  466. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  467. fi := FileInfo{
  468. size: int64(filer.FileSize(entry)),
  469. name: entry.Name,
  470. mode: os.FileMode(entry.Attributes.FileMode),
  471. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  472. isDirectory: entry.IsDirectory,
  473. }
  474. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  475. fi.name += "/"
  476. }
  477. glog.V(4).Infof("entry: %v", fi.name)
  478. ret = append(ret, &fi)
  479. return nil
  480. })
  481. old := f.off
  482. if old >= int64(len(ret)) {
  483. if count > 0 {
  484. return nil, io.EOF
  485. }
  486. return nil, nil
  487. }
  488. if count > 0 {
  489. f.off += int64(count)
  490. if f.off > int64(len(ret)) {
  491. f.off = int64(len(ret))
  492. }
  493. } else {
  494. f.off = int64(len(ret))
  495. old = 0
  496. }
  497. return ret[old:f.off], nil
  498. }
  499. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  500. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  501. ctx := context.Background()
  502. var err error
  503. switch whence {
  504. case io.SeekStart:
  505. f.off = 0
  506. case io.SeekEnd:
  507. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  508. return 0, err
  509. } else {
  510. f.off = fi.Size()
  511. }
  512. }
  513. f.off += offset
  514. return f.off, err
  515. }
  516. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  517. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  518. ctx := context.Background()
  519. return f.fs.stat(ctx, f.name)
  520. }