You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

640 lines
16 KiB

5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  12. "golang.org/x/net/webdav"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  19. "github.com/seaweedfs/seaweedfs/weed/filer"
  20. "github.com/seaweedfs/seaweedfs/weed/glog"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. )
  23. type WebDavOption struct {
  24. Filer pb.ServerAddress
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. }
  37. type WebDavServer struct {
  38. option *WebDavOption
  39. secret security.SigningKey
  40. filer *filer.Filer
  41. grpcDialOption grpc.DialOption
  42. Handler *webdav.Handler
  43. }
  44. func max(x, y int64) int64 {
  45. if x <= y {
  46. return y
  47. }
  48. return x
  49. }
  50. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  51. fs, _ := NewWebDavFileSystem(option)
  52. ws = &WebDavServer{
  53. option: option,
  54. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  55. Handler: &webdav.Handler{
  56. FileSystem: fs,
  57. LockSystem: webdav.NewMemLS(),
  58. },
  59. }
  60. return ws, nil
  61. }
  62. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  63. type WebDavFileSystem struct {
  64. option *WebDavOption
  65. secret security.SigningKey
  66. grpcDialOption grpc.DialOption
  67. chunkCache *chunk_cache.TieredChunkCache
  68. signature int32
  69. }
  70. type FileInfo struct {
  71. name string
  72. size int64
  73. mode os.FileMode
  74. modifiledTime time.Time
  75. isDirectory bool
  76. }
  77. func (fi *FileInfo) Name() string { return fi.name }
  78. func (fi *FileInfo) Size() int64 { return fi.size }
  79. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  80. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  81. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  82. func (fi *FileInfo) Sys() interface{} { return nil }
  83. type WebDavFile struct {
  84. fs *WebDavFileSystem
  85. name string
  86. isDirectory bool
  87. off int64
  88. entry *filer_pb.Entry
  89. entryViewCache []filer.VisibleInterval
  90. reader io.ReaderAt
  91. bufWriter *buffered_writer.BufferedWriteCloser
  92. collection string
  93. replication string
  94. }
  95. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  96. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  97. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  98. os.MkdirAll(cacheDir, os.FileMode(0755))
  99. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  100. return &WebDavFileSystem{
  101. option: option,
  102. chunkCache: chunkCache,
  103. signature: util.RandomInt32(),
  104. }, nil
  105. }
  106. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  107. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  108. return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
  109. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  110. return fn(client)
  111. }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
  112. }
  113. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  114. return location.Url
  115. }
  116. func (fs *WebDavFileSystem) GetDataCenter() string {
  117. return ""
  118. }
  119. func clearName(name string) (string, error) {
  120. slashed := strings.HasSuffix(name, "/")
  121. name = path.Clean(name)
  122. if !strings.HasSuffix(name, "/") && slashed {
  123. name += "/"
  124. }
  125. if !strings.HasPrefix(name, "/") {
  126. return "", os.ErrInvalid
  127. }
  128. return name, nil
  129. }
  130. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  131. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  132. if !strings.HasSuffix(fullDirPath, "/") {
  133. fullDirPath += "/"
  134. }
  135. var err error
  136. if fullDirPath, err = clearName(fullDirPath); err != nil {
  137. return err
  138. }
  139. _, err = fs.stat(ctx, fullDirPath)
  140. if err == nil {
  141. return os.ErrExist
  142. }
  143. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  144. dir, name := util.FullPath(fullDirPath).DirAndName()
  145. request := &filer_pb.CreateEntryRequest{
  146. Directory: dir,
  147. Entry: &filer_pb.Entry{
  148. Name: name,
  149. IsDirectory: true,
  150. Attributes: &filer_pb.FuseAttributes{
  151. Mtime: time.Now().Unix(),
  152. Crtime: time.Now().Unix(),
  153. FileMode: uint32(perm | os.ModeDir),
  154. Uid: fs.option.Uid,
  155. Gid: fs.option.Gid,
  156. },
  157. },
  158. Signatures: []int32{fs.signature},
  159. }
  160. glog.V(1).Infof("mkdir: %v", request)
  161. if err := filer_pb.CreateEntry(client, request); err != nil {
  162. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  163. }
  164. return nil
  165. })
  166. }
  167. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  168. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  169. var err error
  170. if fullFilePath, err = clearName(fullFilePath); err != nil {
  171. return nil, err
  172. }
  173. if flag&os.O_CREATE != 0 {
  174. // file should not have / suffix.
  175. if strings.HasSuffix(fullFilePath, "/") {
  176. return nil, os.ErrInvalid
  177. }
  178. _, err = fs.stat(ctx, fullFilePath)
  179. if err == nil {
  180. if flag&os.O_EXCL != 0 {
  181. return nil, os.ErrExist
  182. }
  183. fs.removeAll(ctx, fullFilePath)
  184. }
  185. dir, name := util.FullPath(fullFilePath).DirAndName()
  186. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  187. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  188. Directory: dir,
  189. Entry: &filer_pb.Entry{
  190. Name: name,
  191. IsDirectory: perm&os.ModeDir > 0,
  192. Attributes: &filer_pb.FuseAttributes{
  193. Mtime: time.Now().Unix(),
  194. Crtime: time.Now().Unix(),
  195. FileMode: uint32(perm),
  196. Uid: fs.option.Uid,
  197. Gid: fs.option.Gid,
  198. TtlSec: 0,
  199. },
  200. },
  201. Signatures: []int32{fs.signature},
  202. }); err != nil {
  203. return fmt.Errorf("create %s: %v", fullFilePath, err)
  204. }
  205. return nil
  206. })
  207. if err != nil {
  208. return nil, err
  209. }
  210. return &WebDavFile{
  211. fs: fs,
  212. name: fullFilePath,
  213. isDirectory: false,
  214. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  215. }, nil
  216. }
  217. fi, err := fs.stat(ctx, fullFilePath)
  218. if err != nil {
  219. return nil, os.ErrNotExist
  220. }
  221. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  222. fullFilePath += "/"
  223. }
  224. return &WebDavFile{
  225. fs: fs,
  226. name: fullFilePath,
  227. isDirectory: false,
  228. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  229. }, nil
  230. }
  231. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  232. var err error
  233. if fullFilePath, err = clearName(fullFilePath); err != nil {
  234. return err
  235. }
  236. dir, name := util.FullPath(fullFilePath).DirAndName()
  237. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  238. }
  239. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  240. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  241. return fs.removeAll(ctx, name)
  242. }
  243. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  244. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  245. var err error
  246. if oldName, err = clearName(oldName); err != nil {
  247. return err
  248. }
  249. if newName, err = clearName(newName); err != nil {
  250. return err
  251. }
  252. of, err := fs.stat(ctx, oldName)
  253. if err != nil {
  254. return os.ErrExist
  255. }
  256. if of.IsDir() {
  257. if strings.HasSuffix(oldName, "/") {
  258. oldName = strings.TrimRight(oldName, "/")
  259. }
  260. if strings.HasSuffix(newName, "/") {
  261. newName = strings.TrimRight(newName, "/")
  262. }
  263. }
  264. _, err = fs.stat(ctx, newName)
  265. if err == nil {
  266. return os.ErrExist
  267. }
  268. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  269. newDir, newBaseName := util.FullPath(newName).DirAndName()
  270. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  271. request := &filer_pb.AtomicRenameEntryRequest{
  272. OldDirectory: oldDir,
  273. OldName: oldBaseName,
  274. NewDirectory: newDir,
  275. NewName: newBaseName,
  276. }
  277. _, err := client.AtomicRenameEntry(ctx, request)
  278. if err != nil {
  279. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  280. }
  281. return nil
  282. })
  283. }
  284. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  285. var err error
  286. if fullFilePath, err = clearName(fullFilePath); err != nil {
  287. return nil, err
  288. }
  289. fullpath := util.FullPath(fullFilePath)
  290. var fi FileInfo
  291. entry, err := filer_pb.GetEntry(fs, fullpath)
  292. if entry == nil {
  293. return nil, os.ErrNotExist
  294. }
  295. if err != nil {
  296. return nil, err
  297. }
  298. fi.size = int64(filer.FileSize(entry))
  299. fi.name = string(fullpath)
  300. fi.mode = os.FileMode(entry.Attributes.FileMode)
  301. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  302. fi.isDirectory = entry.IsDirectory
  303. if fi.name == "/" {
  304. fi.modifiledTime = time.Now()
  305. fi.isDirectory = true
  306. }
  307. return &fi, nil
  308. }
  309. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  310. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  311. return fs.stat(ctx, name)
  312. }
  313. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  314. var fileId, host string
  315. var auth security.EncodedJwt
  316. if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  317. ctx := context.Background()
  318. assignErr := util.Retry("assignVolume", func() error {
  319. request := &filer_pb.AssignVolumeRequest{
  320. Count: 1,
  321. Replication: f.fs.option.Replication,
  322. Collection: f.fs.option.Collection,
  323. DiskType: f.fs.option.DiskType,
  324. Path: name,
  325. }
  326. resp, err := client.AssignVolume(ctx, request)
  327. if err != nil {
  328. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  329. return err
  330. }
  331. if resp.Error != "" {
  332. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  333. }
  334. fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
  335. f.collection, f.replication = resp.Collection, resp.Replication
  336. return nil
  337. })
  338. if assignErr != nil {
  339. return assignErr
  340. }
  341. return nil
  342. }); flushErr != nil {
  343. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  344. }
  345. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  346. uploadOption := &operation.UploadOption{
  347. UploadUrl: fileUrl,
  348. Filename: f.name,
  349. Cipher: f.fs.option.Cipher,
  350. IsInputCompressed: false,
  351. MimeType: "",
  352. PairMap: nil,
  353. Jwt: auth,
  354. }
  355. uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
  356. if flushErr != nil {
  357. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  358. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  359. }
  360. if uploadResult.Error != "" {
  361. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  362. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  363. }
  364. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  365. }
  366. func (f *WebDavFile) Write(buf []byte) (int, error) {
  367. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  368. dir, _ := util.FullPath(f.name).DirAndName()
  369. var getErr error
  370. ctx := context.Background()
  371. if f.entry == nil {
  372. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  373. }
  374. if f.entry == nil {
  375. return 0, getErr
  376. }
  377. if getErr != nil {
  378. return 0, getErr
  379. }
  380. if f.bufWriter.FlushFunc == nil {
  381. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  382. var chunk *filer_pb.FileChunk
  383. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  384. if flushErr != nil {
  385. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  386. }
  387. f.entry.Content = nil
  388. f.entry.Chunks = append(f.entry.Chunks, chunk)
  389. return flushErr
  390. }
  391. f.bufWriter.CloseFunc = func() error {
  392. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  393. if manifestErr != nil {
  394. // not good, but should be ok
  395. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  396. } else {
  397. f.entry.Chunks = manifestedChunks
  398. }
  399. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  400. f.entry.Attributes.Mtime = time.Now().Unix()
  401. request := &filer_pb.UpdateEntryRequest{
  402. Directory: dir,
  403. Entry: f.entry,
  404. Signatures: []int32{f.fs.signature},
  405. }
  406. if _, err := client.UpdateEntry(ctx, request); err != nil {
  407. return fmt.Errorf("update %s: %v", f.name, err)
  408. }
  409. return nil
  410. })
  411. return flushErr
  412. }
  413. }
  414. written, err := f.bufWriter.Write(buf)
  415. if err == nil {
  416. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  417. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  418. f.off += int64(written)
  419. }
  420. return written, err
  421. }
  422. func (f *WebDavFile) Close() error {
  423. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  424. err := f.bufWriter.Close()
  425. if f.entry != nil {
  426. f.entry = nil
  427. f.entryViewCache = nil
  428. }
  429. return err
  430. }
  431. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  432. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  433. if f.entry == nil {
  434. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  435. }
  436. if f.entry == nil {
  437. return 0, err
  438. }
  439. if err != nil {
  440. return 0, err
  441. }
  442. fileSize := int64(filer.FileSize(f.entry))
  443. if fileSize == 0 {
  444. return 0, io.EOF
  445. }
  446. if f.entryViewCache == nil {
  447. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
  448. f.reader = nil
  449. }
  450. if f.reader == nil {
  451. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
  452. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  453. }
  454. readSize, err = f.reader.ReadAt(p, f.off)
  455. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  456. f.off += int64(readSize)
  457. if err != nil && err != io.EOF {
  458. glog.Errorf("file read %s: %v", f.name, err)
  459. }
  460. return
  461. }
  462. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  463. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  464. dir, _ := util.FullPath(f.name).DirAndName()
  465. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  466. fi := FileInfo{
  467. size: int64(filer.FileSize(entry)),
  468. name: entry.Name,
  469. mode: os.FileMode(entry.Attributes.FileMode),
  470. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  471. isDirectory: entry.IsDirectory,
  472. }
  473. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  474. fi.name += "/"
  475. }
  476. glog.V(4).Infof("entry: %v", fi.name)
  477. ret = append(ret, &fi)
  478. return nil
  479. })
  480. old := f.off
  481. if old >= int64(len(ret)) {
  482. if count > 0 {
  483. return nil, io.EOF
  484. }
  485. return nil, nil
  486. }
  487. if count > 0 {
  488. f.off += int64(count)
  489. if f.off > int64(len(ret)) {
  490. f.off = int64(len(ret))
  491. }
  492. } else {
  493. f.off = int64(len(ret))
  494. old = 0
  495. }
  496. return ret[old:f.off], nil
  497. }
  498. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  499. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  500. ctx := context.Background()
  501. var err error
  502. switch whence {
  503. case io.SeekStart:
  504. f.off = 0
  505. case io.SeekEnd:
  506. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  507. return 0, err
  508. } else {
  509. f.off = fi.Size()
  510. }
  511. }
  512. f.off += offset
  513. return f.off, err
  514. }
  515. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  516. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  517. ctx := context.Background()
  518. return f.fs.stat(ctx, f.name)
  519. }