You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

610 lines
15 KiB

5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  12. "golang.org/x/net/webdav"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  19. "github.com/seaweedfs/seaweedfs/weed/filer"
  20. "github.com/seaweedfs/seaweedfs/weed/glog"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. )
  23. type WebDavOption struct {
  24. Filer pb.ServerAddress
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. }
  37. type WebDavServer struct {
  38. option *WebDavOption
  39. secret security.SigningKey
  40. filer *filer.Filer
  41. grpcDialOption grpc.DialOption
  42. Handler *webdav.Handler
  43. }
  44. func max(x, y int64) int64 {
  45. if x <= y {
  46. return y
  47. }
  48. return x
  49. }
  50. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  51. fs, _ := NewWebDavFileSystem(option)
  52. ws = &WebDavServer{
  53. option: option,
  54. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  55. Handler: &webdav.Handler{
  56. FileSystem: fs,
  57. LockSystem: webdav.NewMemLS(),
  58. },
  59. }
  60. return ws, nil
  61. }
  62. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  63. type WebDavFileSystem struct {
  64. option *WebDavOption
  65. secret security.SigningKey
  66. grpcDialOption grpc.DialOption
  67. chunkCache *chunk_cache.TieredChunkCache
  68. signature int32
  69. }
  70. type FileInfo struct {
  71. name string
  72. size int64
  73. mode os.FileMode
  74. modifiledTime time.Time
  75. isDirectory bool
  76. }
  77. func (fi *FileInfo) Name() string { return fi.name }
  78. func (fi *FileInfo) Size() int64 { return fi.size }
  79. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  80. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  81. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  82. func (fi *FileInfo) Sys() interface{} { return nil }
  83. type WebDavFile struct {
  84. fs *WebDavFileSystem
  85. name string
  86. isDirectory bool
  87. off int64
  88. entry *filer_pb.Entry
  89. entryViewCache []filer.VisibleInterval
  90. reader io.ReaderAt
  91. bufWriter *buffered_writer.BufferedWriteCloser
  92. }
  93. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  94. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  95. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  96. os.MkdirAll(cacheDir, os.FileMode(0755))
  97. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  98. return &WebDavFileSystem{
  99. option: option,
  100. chunkCache: chunkCache,
  101. signature: util.RandomInt32(),
  102. }, nil
  103. }
  104. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  105. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  106. return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
  107. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  108. return fn(client)
  109. }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
  110. }
  111. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  112. return location.Url
  113. }
  114. func (fs *WebDavFileSystem) GetDataCenter() string {
  115. return ""
  116. }
  117. func clearName(name string) (string, error) {
  118. slashed := strings.HasSuffix(name, "/")
  119. name = path.Clean(name)
  120. if !strings.HasSuffix(name, "/") && slashed {
  121. name += "/"
  122. }
  123. if !strings.HasPrefix(name, "/") {
  124. return "", os.ErrInvalid
  125. }
  126. return name, nil
  127. }
  128. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  129. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  130. if !strings.HasSuffix(fullDirPath, "/") {
  131. fullDirPath += "/"
  132. }
  133. var err error
  134. if fullDirPath, err = clearName(fullDirPath); err != nil {
  135. return err
  136. }
  137. _, err = fs.stat(ctx, fullDirPath)
  138. if err == nil {
  139. return os.ErrExist
  140. }
  141. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  142. dir, name := util.FullPath(fullDirPath).DirAndName()
  143. request := &filer_pb.CreateEntryRequest{
  144. Directory: dir,
  145. Entry: &filer_pb.Entry{
  146. Name: name,
  147. IsDirectory: true,
  148. Attributes: &filer_pb.FuseAttributes{
  149. Mtime: time.Now().Unix(),
  150. Crtime: time.Now().Unix(),
  151. FileMode: uint32(perm | os.ModeDir),
  152. Uid: fs.option.Uid,
  153. Gid: fs.option.Gid,
  154. },
  155. },
  156. Signatures: []int32{fs.signature},
  157. }
  158. glog.V(1).Infof("mkdir: %v", request)
  159. if err := filer_pb.CreateEntry(client, request); err != nil {
  160. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  161. }
  162. return nil
  163. })
  164. }
  165. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  166. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  167. var err error
  168. if fullFilePath, err = clearName(fullFilePath); err != nil {
  169. return nil, err
  170. }
  171. if flag&os.O_CREATE != 0 {
  172. // file should not have / suffix.
  173. if strings.HasSuffix(fullFilePath, "/") {
  174. return nil, os.ErrInvalid
  175. }
  176. _, err = fs.stat(ctx, fullFilePath)
  177. if err == nil {
  178. if flag&os.O_EXCL != 0 {
  179. return nil, os.ErrExist
  180. }
  181. fs.removeAll(ctx, fullFilePath)
  182. }
  183. dir, name := util.FullPath(fullFilePath).DirAndName()
  184. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  185. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  186. Directory: dir,
  187. Entry: &filer_pb.Entry{
  188. Name: name,
  189. IsDirectory: perm&os.ModeDir > 0,
  190. Attributes: &filer_pb.FuseAttributes{
  191. Mtime: time.Now().Unix(),
  192. Crtime: time.Now().Unix(),
  193. FileMode: uint32(perm),
  194. Uid: fs.option.Uid,
  195. Gid: fs.option.Gid,
  196. TtlSec: 0,
  197. },
  198. },
  199. Signatures: []int32{fs.signature},
  200. }); err != nil {
  201. return fmt.Errorf("create %s: %v", fullFilePath, err)
  202. }
  203. return nil
  204. })
  205. if err != nil {
  206. return nil, err
  207. }
  208. return &WebDavFile{
  209. fs: fs,
  210. name: fullFilePath,
  211. isDirectory: false,
  212. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  213. }, nil
  214. }
  215. fi, err := fs.stat(ctx, fullFilePath)
  216. if err != nil {
  217. return nil, os.ErrNotExist
  218. }
  219. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  220. fullFilePath += "/"
  221. }
  222. return &WebDavFile{
  223. fs: fs,
  224. name: fullFilePath,
  225. isDirectory: false,
  226. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  227. }, nil
  228. }
  229. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  230. var err error
  231. if fullFilePath, err = clearName(fullFilePath); err != nil {
  232. return err
  233. }
  234. dir, name := util.FullPath(fullFilePath).DirAndName()
  235. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  236. }
  237. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  238. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  239. return fs.removeAll(ctx, name)
  240. }
  241. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  242. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  243. var err error
  244. if oldName, err = clearName(oldName); err != nil {
  245. return err
  246. }
  247. if newName, err = clearName(newName); err != nil {
  248. return err
  249. }
  250. of, err := fs.stat(ctx, oldName)
  251. if err != nil {
  252. return os.ErrExist
  253. }
  254. if of.IsDir() {
  255. if strings.HasSuffix(oldName, "/") {
  256. oldName = strings.TrimRight(oldName, "/")
  257. }
  258. if strings.HasSuffix(newName, "/") {
  259. newName = strings.TrimRight(newName, "/")
  260. }
  261. }
  262. _, err = fs.stat(ctx, newName)
  263. if err == nil {
  264. return os.ErrExist
  265. }
  266. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  267. newDir, newBaseName := util.FullPath(newName).DirAndName()
  268. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  269. request := &filer_pb.AtomicRenameEntryRequest{
  270. OldDirectory: oldDir,
  271. OldName: oldBaseName,
  272. NewDirectory: newDir,
  273. NewName: newBaseName,
  274. }
  275. _, err := client.AtomicRenameEntry(ctx, request)
  276. if err != nil {
  277. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  278. }
  279. return nil
  280. })
  281. }
  282. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  283. var err error
  284. if fullFilePath, err = clearName(fullFilePath); err != nil {
  285. return nil, err
  286. }
  287. fullpath := util.FullPath(fullFilePath)
  288. var fi FileInfo
  289. entry, err := filer_pb.GetEntry(fs, fullpath)
  290. if entry == nil {
  291. return nil, os.ErrNotExist
  292. }
  293. if err != nil {
  294. return nil, err
  295. }
  296. fi.size = int64(filer.FileSize(entry))
  297. fi.name = string(fullpath)
  298. fi.mode = os.FileMode(entry.Attributes.FileMode)
  299. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  300. fi.isDirectory = entry.IsDirectory
  301. if fi.name == "/" {
  302. fi.modifiledTime = time.Now()
  303. fi.isDirectory = true
  304. }
  305. return &fi, nil
  306. }
  307. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  308. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  309. return fs.stat(ctx, name)
  310. }
  311. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
  312. fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
  313. f.fs,
  314. &filer_pb.AssignVolumeRequest{
  315. Count: 1,
  316. Replication: f.fs.option.Replication,
  317. Collection: f.fs.option.Collection,
  318. DiskType: f.fs.option.DiskType,
  319. Path: name,
  320. },
  321. &operation.UploadOption{
  322. Filename: f.name,
  323. Cipher: f.fs.option.Cipher,
  324. IsInputCompressed: false,
  325. MimeType: "",
  326. PairMap: nil,
  327. },
  328. func(host, fileId string) string {
  329. return fmt.Sprintf("http://%s/%s", host, fileId)
  330. },
  331. reader,
  332. )
  333. if flushErr != nil {
  334. glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
  335. return nil, fmt.Errorf("upload data: %v", flushErr)
  336. }
  337. if uploadResult.Error != "" {
  338. glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
  339. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  340. }
  341. return uploadResult.ToPbFileChunk(fileId, offset), nil
  342. }
  343. func (f *WebDavFile) Write(buf []byte) (int, error) {
  344. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  345. dir, _ := util.FullPath(f.name).DirAndName()
  346. var getErr error
  347. ctx := context.Background()
  348. if f.entry == nil {
  349. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  350. }
  351. if f.entry == nil {
  352. return 0, getErr
  353. }
  354. if getErr != nil {
  355. return 0, getErr
  356. }
  357. if f.bufWriter.FlushFunc == nil {
  358. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  359. var chunk *filer_pb.FileChunk
  360. chunk, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  361. if flushErr != nil {
  362. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  363. }
  364. f.entry.Content = nil
  365. f.entry.Chunks = append(f.entry.Chunks, chunk)
  366. return flushErr
  367. }
  368. f.bufWriter.CloseFunc = func() error {
  369. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  370. if manifestErr != nil {
  371. // not good, but should be ok
  372. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  373. } else {
  374. f.entry.Chunks = manifestedChunks
  375. }
  376. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  377. f.entry.Attributes.Mtime = time.Now().Unix()
  378. request := &filer_pb.UpdateEntryRequest{
  379. Directory: dir,
  380. Entry: f.entry,
  381. Signatures: []int32{f.fs.signature},
  382. }
  383. if _, err := client.UpdateEntry(ctx, request); err != nil {
  384. return fmt.Errorf("update %s: %v", f.name, err)
  385. }
  386. return nil
  387. })
  388. return flushErr
  389. }
  390. }
  391. written, err := f.bufWriter.Write(buf)
  392. if err == nil {
  393. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  394. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  395. f.off += int64(written)
  396. }
  397. return written, err
  398. }
  399. func (f *WebDavFile) Close() error {
  400. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  401. err := f.bufWriter.Close()
  402. if f.entry != nil {
  403. f.entry = nil
  404. f.entryViewCache = nil
  405. }
  406. return err
  407. }
  408. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  409. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  410. if f.entry == nil {
  411. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  412. }
  413. if f.entry == nil {
  414. return 0, err
  415. }
  416. if err != nil {
  417. return 0, err
  418. }
  419. fileSize := int64(filer.FileSize(f.entry))
  420. if fileSize == 0 {
  421. return 0, io.EOF
  422. }
  423. if f.entryViewCache == nil {
  424. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
  425. f.reader = nil
  426. }
  427. if f.reader == nil {
  428. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
  429. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  430. }
  431. readSize, err = f.reader.ReadAt(p, f.off)
  432. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  433. f.off += int64(readSize)
  434. if err != nil && err != io.EOF {
  435. glog.Errorf("file read %s: %v", f.name, err)
  436. }
  437. return
  438. }
  439. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  440. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  441. dir, _ := util.FullPath(f.name).DirAndName()
  442. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  443. fi := FileInfo{
  444. size: int64(filer.FileSize(entry)),
  445. name: entry.Name,
  446. mode: os.FileMode(entry.Attributes.FileMode),
  447. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  448. isDirectory: entry.IsDirectory,
  449. }
  450. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  451. fi.name += "/"
  452. }
  453. glog.V(4).Infof("entry: %v", fi.name)
  454. ret = append(ret, &fi)
  455. return nil
  456. })
  457. old := f.off
  458. if old >= int64(len(ret)) {
  459. if count > 0 {
  460. return nil, io.EOF
  461. }
  462. return nil, nil
  463. }
  464. if count > 0 {
  465. f.off += int64(count)
  466. if f.off > int64(len(ret)) {
  467. f.off = int64(len(ret))
  468. }
  469. } else {
  470. f.off = int64(len(ret))
  471. old = 0
  472. }
  473. return ret[old:f.off], nil
  474. }
  475. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  476. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  477. ctx := context.Background()
  478. var err error
  479. switch whence {
  480. case io.SeekStart:
  481. f.off = 0
  482. case io.SeekEnd:
  483. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  484. return 0, err
  485. } else {
  486. f.off = fi.Size()
  487. }
  488. }
  489. f.off += offset
  490. return f.off, err
  491. }
  492. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  493. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  494. ctx := context.Background()
  495. return f.fs.stat(ctx, f.name)
  496. }