You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

609 lines
15 KiB

5 years ago
5 years ago
3 years ago
4 years ago
2 years ago
2 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
2 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
2 years ago
2 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  18. "github.com/seaweedfs/seaweedfs/weed/filer"
  19. "github.com/seaweedfs/seaweedfs/weed/glog"
  20. "github.com/seaweedfs/seaweedfs/weed/security"
  21. )
  22. type WebDavOption struct {
  23. Filer pb.ServerAddress
  24. DomainName string
  25. BucketsPath string
  26. GrpcDialOption grpc.DialOption
  27. Collection string
  28. Replication string
  29. DiskType string
  30. Uid uint32
  31. Gid uint32
  32. Cipher bool
  33. CacheDir string
  34. CacheSizeMB int64
  35. }
  36. type WebDavServer struct {
  37. option *WebDavOption
  38. secret security.SigningKey
  39. filer *filer.Filer
  40. grpcDialOption grpc.DialOption
  41. Handler *webdav.Handler
  42. }
  43. func max(x, y int64) int64 {
  44. if x <= y {
  45. return y
  46. }
  47. return x
  48. }
  49. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  50. fs, _ := NewWebDavFileSystem(option)
  51. ws = &WebDavServer{
  52. option: option,
  53. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  54. Handler: &webdav.Handler{
  55. FileSystem: fs,
  56. LockSystem: webdav.NewMemLS(),
  57. },
  58. }
  59. return ws, nil
  60. }
  61. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  62. type WebDavFileSystem struct {
  63. option *WebDavOption
  64. secret security.SigningKey
  65. grpcDialOption grpc.DialOption
  66. chunkCache *chunk_cache.TieredChunkCache
  67. signature int32
  68. }
  69. type FileInfo struct {
  70. name string
  71. size int64
  72. mode os.FileMode
  73. modifiedTime time.Time
  74. isDirectory bool
  75. }
  76. func (fi *FileInfo) Name() string { return fi.name }
  77. func (fi *FileInfo) Size() int64 { return fi.size }
  78. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  79. func (fi *FileInfo) ModTime() time.Time { return fi.modifiedTime }
  80. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  81. func (fi *FileInfo) Sys() interface{} { return nil }
  82. type WebDavFile struct {
  83. fs *WebDavFileSystem
  84. name string
  85. isDirectory bool
  86. off int64
  87. entry *filer_pb.Entry
  88. entryViewCache []filer.VisibleInterval
  89. reader io.ReaderAt
  90. bufWriter *buffered_writer.BufferedWriteCloser
  91. }
  92. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  93. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  94. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  95. os.MkdirAll(cacheDir, os.FileMode(0755))
  96. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  97. return &WebDavFileSystem{
  98. option: option,
  99. chunkCache: chunkCache,
  100. signature: util.RandomInt32(),
  101. }, nil
  102. }
  103. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  104. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  105. return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
  106. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  107. return fn(client)
  108. }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)
  109. }
  110. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  111. return location.Url
  112. }
  113. func (fs *WebDavFileSystem) GetDataCenter() string {
  114. return ""
  115. }
  116. func clearName(name string) (string, error) {
  117. slashed := strings.HasSuffix(name, "/")
  118. name = path.Clean(name)
  119. if !strings.HasSuffix(name, "/") && slashed {
  120. name += "/"
  121. }
  122. if !strings.HasPrefix(name, "/") {
  123. return "", os.ErrInvalid
  124. }
  125. return name, nil
  126. }
  127. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  128. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  129. if !strings.HasSuffix(fullDirPath, "/") {
  130. fullDirPath += "/"
  131. }
  132. var err error
  133. if fullDirPath, err = clearName(fullDirPath); err != nil {
  134. return err
  135. }
  136. _, err = fs.stat(ctx, fullDirPath)
  137. if err == nil {
  138. return os.ErrExist
  139. }
  140. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  141. dir, name := util.FullPath(fullDirPath).DirAndName()
  142. request := &filer_pb.CreateEntryRequest{
  143. Directory: dir,
  144. Entry: &filer_pb.Entry{
  145. Name: name,
  146. IsDirectory: true,
  147. Attributes: &filer_pb.FuseAttributes{
  148. Mtime: time.Now().Unix(),
  149. Crtime: time.Now().Unix(),
  150. FileMode: uint32(perm | os.ModeDir),
  151. Uid: fs.option.Uid,
  152. Gid: fs.option.Gid,
  153. },
  154. },
  155. Signatures: []int32{fs.signature},
  156. }
  157. glog.V(1).Infof("mkdir: %v", request)
  158. if err := filer_pb.CreateEntry(client, request); err != nil {
  159. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  160. }
  161. return nil
  162. })
  163. }
  164. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  165. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  166. var err error
  167. if fullFilePath, err = clearName(fullFilePath); err != nil {
  168. return nil, err
  169. }
  170. if flag&os.O_CREATE != 0 {
  171. // file should not have / suffix.
  172. if strings.HasSuffix(fullFilePath, "/") {
  173. return nil, os.ErrInvalid
  174. }
  175. _, err = fs.stat(ctx, fullFilePath)
  176. if err == nil {
  177. if flag&os.O_EXCL != 0 {
  178. return nil, os.ErrExist
  179. }
  180. fs.removeAll(ctx, fullFilePath)
  181. }
  182. dir, name := util.FullPath(fullFilePath).DirAndName()
  183. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  184. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  185. Directory: dir,
  186. Entry: &filer_pb.Entry{
  187. Name: name,
  188. IsDirectory: perm&os.ModeDir > 0,
  189. Attributes: &filer_pb.FuseAttributes{
  190. Mtime: time.Now().Unix(),
  191. Crtime: time.Now().Unix(),
  192. FileMode: uint32(perm),
  193. Uid: fs.option.Uid,
  194. Gid: fs.option.Gid,
  195. TtlSec: 0,
  196. },
  197. },
  198. Signatures: []int32{fs.signature},
  199. }); err != nil {
  200. return fmt.Errorf("create %s: %v", fullFilePath, err)
  201. }
  202. return nil
  203. })
  204. if err != nil {
  205. return nil, err
  206. }
  207. return &WebDavFile{
  208. fs: fs,
  209. name: fullFilePath,
  210. isDirectory: false,
  211. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  212. }, nil
  213. }
  214. fi, err := fs.stat(ctx, fullFilePath)
  215. if err != nil {
  216. return nil, os.ErrNotExist
  217. }
  218. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  219. fullFilePath += "/"
  220. }
  221. return &WebDavFile{
  222. fs: fs,
  223. name: fullFilePath,
  224. isDirectory: false,
  225. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  226. }, nil
  227. }
  228. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  229. var err error
  230. if fullFilePath, err = clearName(fullFilePath); err != nil {
  231. return err
  232. }
  233. dir, name := util.FullPath(fullFilePath).DirAndName()
  234. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  235. }
  236. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  237. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  238. return fs.removeAll(ctx, name)
  239. }
  240. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  241. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  242. var err error
  243. if oldName, err = clearName(oldName); err != nil {
  244. return err
  245. }
  246. if newName, err = clearName(newName); err != nil {
  247. return err
  248. }
  249. of, err := fs.stat(ctx, oldName)
  250. if err != nil {
  251. return os.ErrExist
  252. }
  253. if of.IsDir() {
  254. if strings.HasSuffix(oldName, "/") {
  255. oldName = strings.TrimRight(oldName, "/")
  256. }
  257. if strings.HasSuffix(newName, "/") {
  258. newName = strings.TrimRight(newName, "/")
  259. }
  260. }
  261. _, err = fs.stat(ctx, newName)
  262. if err == nil {
  263. return os.ErrExist
  264. }
  265. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  266. newDir, newBaseName := util.FullPath(newName).DirAndName()
  267. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  268. request := &filer_pb.AtomicRenameEntryRequest{
  269. OldDirectory: oldDir,
  270. OldName: oldBaseName,
  271. NewDirectory: newDir,
  272. NewName: newBaseName,
  273. }
  274. _, err := client.AtomicRenameEntry(ctx, request)
  275. if err != nil {
  276. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  277. }
  278. return nil
  279. })
  280. }
  281. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  282. var err error
  283. if fullFilePath, err = clearName(fullFilePath); err != nil {
  284. return nil, err
  285. }
  286. fullpath := util.FullPath(fullFilePath)
  287. var fi FileInfo
  288. entry, err := filer_pb.GetEntry(fs, fullpath)
  289. if entry == nil {
  290. return nil, os.ErrNotExist
  291. }
  292. if err != nil {
  293. return nil, err
  294. }
  295. fi.size = int64(filer.FileSize(entry))
  296. fi.name = string(fullpath)
  297. fi.mode = os.FileMode(entry.Attributes.FileMode)
  298. fi.modifiedTime = time.Unix(entry.Attributes.Mtime, 0)
  299. fi.isDirectory = entry.IsDirectory
  300. if fi.name == "/" {
  301. fi.modifiedTime = time.Now()
  302. fi.isDirectory = true
  303. }
  304. return &fi, nil
  305. }
  306. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  307. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  308. return fs.stat(ctx, name)
  309. }
  310. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
  311. fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
  312. f.fs,
  313. &filer_pb.AssignVolumeRequest{
  314. Count: 1,
  315. Replication: f.fs.option.Replication,
  316. Collection: f.fs.option.Collection,
  317. DiskType: f.fs.option.DiskType,
  318. Path: name,
  319. },
  320. &operation.UploadOption{
  321. Filename: f.name,
  322. Cipher: f.fs.option.Cipher,
  323. IsInputCompressed: false,
  324. MimeType: "",
  325. PairMap: nil,
  326. },
  327. func(host, fileId string) string {
  328. return fmt.Sprintf("http://%s/%s", host, fileId)
  329. },
  330. reader,
  331. )
  332. if flushErr != nil {
  333. glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
  334. return nil, fmt.Errorf("upload data: %v", flushErr)
  335. }
  336. if uploadResult.Error != "" {
  337. glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
  338. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  339. }
  340. return uploadResult.ToPbFileChunk(fileId, offset), nil
  341. }
  342. func (f *WebDavFile) Write(buf []byte) (int, error) {
  343. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  344. dir, _ := util.FullPath(f.name).DirAndName()
  345. var getErr error
  346. ctx := context.Background()
  347. if f.entry == nil {
  348. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  349. }
  350. if f.entry == nil {
  351. return 0, getErr
  352. }
  353. if getErr != nil {
  354. return 0, getErr
  355. }
  356. if f.bufWriter.FlushFunc == nil {
  357. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  358. var chunk *filer_pb.FileChunk
  359. chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset)
  360. if flushErr != nil {
  361. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  362. }
  363. f.entry.Content = nil
  364. f.entry.Chunks = append(f.entry.Chunks, chunk)
  365. return flushErr
  366. }
  367. f.bufWriter.CloseFunc = func() error {
  368. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  369. if manifestErr != nil {
  370. // not good, but should be ok
  371. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  372. } else {
  373. f.entry.Chunks = manifestedChunks
  374. }
  375. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  376. f.entry.Attributes.Mtime = time.Now().Unix()
  377. request := &filer_pb.UpdateEntryRequest{
  378. Directory: dir,
  379. Entry: f.entry,
  380. Signatures: []int32{f.fs.signature},
  381. }
  382. if _, err := client.UpdateEntry(ctx, request); err != nil {
  383. return fmt.Errorf("update %s: %v", f.name, err)
  384. }
  385. return nil
  386. })
  387. return flushErr
  388. }
  389. }
  390. written, err := f.bufWriter.Write(buf)
  391. if err == nil {
  392. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  393. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  394. f.off += int64(written)
  395. }
  396. return written, err
  397. }
  398. func (f *WebDavFile) Close() error {
  399. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  400. err := f.bufWriter.Close()
  401. if f.entry != nil {
  402. f.entry = nil
  403. f.entryViewCache = nil
  404. }
  405. return err
  406. }
  407. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  408. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  409. if f.entry == nil {
  410. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  411. }
  412. if f.entry == nil {
  413. return 0, err
  414. }
  415. if err != nil {
  416. return 0, err
  417. }
  418. fileSize := int64(filer.FileSize(f.entry))
  419. if fileSize == 0 {
  420. return 0, io.EOF
  421. }
  422. if f.entryViewCache == nil {
  423. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
  424. f.reader = nil
  425. }
  426. if f.reader == nil {
  427. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
  428. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  429. }
  430. readSize, err = f.reader.ReadAt(p, f.off)
  431. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  432. f.off += int64(readSize)
  433. if err != nil && err != io.EOF {
  434. glog.Errorf("file read %s: %v", f.name, err)
  435. }
  436. return
  437. }
  438. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  439. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  440. dir, _ := util.FullPath(f.name).DirAndName()
  441. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  442. fi := FileInfo{
  443. size: int64(filer.FileSize(entry)),
  444. name: entry.Name,
  445. mode: os.FileMode(entry.Attributes.FileMode),
  446. modifiedTime: time.Unix(entry.Attributes.Mtime, 0),
  447. isDirectory: entry.IsDirectory,
  448. }
  449. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  450. fi.name += "/"
  451. }
  452. glog.V(4).Infof("entry: %v", fi.name)
  453. ret = append(ret, &fi)
  454. return nil
  455. })
  456. old := f.off
  457. if old >= int64(len(ret)) {
  458. if count > 0 {
  459. return nil, io.EOF
  460. }
  461. return nil, nil
  462. }
  463. if count > 0 {
  464. f.off += int64(count)
  465. if f.off > int64(len(ret)) {
  466. f.off = int64(len(ret))
  467. }
  468. } else {
  469. f.off = int64(len(ret))
  470. old = 0
  471. }
  472. return ret[old:f.off], nil
  473. }
  474. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  475. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  476. ctx := context.Background()
  477. var err error
  478. switch whence {
  479. case io.SeekStart:
  480. f.off = 0
  481. case io.SeekEnd:
  482. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  483. return 0, err
  484. } else {
  485. f.off = fi.Size()
  486. }
  487. }
  488. f.off += offset
  489. return f.off, err
  490. }
  491. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  492. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  493. ctx := context.Background()
  494. return f.fs.stat(ctx, f.name)
  495. }