You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

636 lines
16 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "path"
  10. "strings"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
  13. "golang.org/x/net/webdav"
  14. "google.golang.org/grpc"
  15. "github.com/chrislusf/seaweedfs/weed/operation"
  16. "github.com/chrislusf/seaweedfs/weed/pb"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  20. "github.com/chrislusf/seaweedfs/weed/filer"
  21. "github.com/chrislusf/seaweedfs/weed/glog"
  22. "github.com/chrislusf/seaweedfs/weed/security"
  23. )
  24. type WebDavOption struct {
  25. Filer string
  26. FilerGrpcAddress string
  27. DomainName string
  28. BucketsPath string
  29. GrpcDialOption grpc.DialOption
  30. Collection string
  31. Replication string
  32. DiskType string
  33. Uid uint32
  34. Gid uint32
  35. Cipher bool
  36. CacheDir string
  37. CacheSizeMB int64
  38. }
  39. type WebDavServer struct {
  40. option *WebDavOption
  41. secret security.SigningKey
  42. filer *filer.Filer
  43. grpcDialOption grpc.DialOption
  44. Handler *webdav.Handler
  45. }
  46. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  47. fs, _ := NewWebDavFileSystem(option)
  48. ws = &WebDavServer{
  49. option: option,
  50. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  51. Handler: &webdav.Handler{
  52. FileSystem: fs,
  53. LockSystem: webdav.NewMemLS(),
  54. },
  55. }
  56. return ws, nil
  57. }
  58. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  59. type WebDavFileSystem struct {
  60. option *WebDavOption
  61. secret security.SigningKey
  62. filer *filer.Filer
  63. grpcDialOption grpc.DialOption
  64. chunkCache *chunk_cache.TieredChunkCache
  65. signature int32
  66. }
  67. type FileInfo struct {
  68. name string
  69. size int64
  70. mode os.FileMode
  71. modifiledTime time.Time
  72. isDirectory bool
  73. }
  74. func (fi *FileInfo) Name() string { return fi.name }
  75. func (fi *FileInfo) Size() int64 { return fi.size }
  76. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  77. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  78. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  79. func (fi *FileInfo) Sys() interface{} { return nil }
  80. type WebDavFile struct {
  81. fs *WebDavFileSystem
  82. name string
  83. isDirectory bool
  84. off int64
  85. entry *filer_pb.Entry
  86. entryViewCache []filer.VisibleInterval
  87. reader io.ReaderAt
  88. bufWriter *buffered_writer.BufferedWriteCloser
  89. collection string
  90. replication string
  91. }
  92. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  93. cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8]
  94. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  95. os.MkdirAll(cacheDir, os.FileMode(0755))
  96. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  97. return &WebDavFileSystem{
  98. option: option,
  99. chunkCache: chunkCache,
  100. signature: util.RandomInt32(),
  101. }, nil
  102. }
  103. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  104. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  105. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  106. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  107. return fn(client)
  108. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  109. }
  110. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  111. return location.Url
  112. }
  113. func clearName(name string) (string, error) {
  114. slashed := strings.HasSuffix(name, "/")
  115. name = path.Clean(name)
  116. if !strings.HasSuffix(name, "/") && slashed {
  117. name += "/"
  118. }
  119. if !strings.HasPrefix(name, "/") {
  120. return "", os.ErrInvalid
  121. }
  122. return name, nil
  123. }
  124. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  125. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  126. if !strings.HasSuffix(fullDirPath, "/") {
  127. fullDirPath += "/"
  128. }
  129. var err error
  130. if fullDirPath, err = clearName(fullDirPath); err != nil {
  131. return err
  132. }
  133. _, err = fs.stat(ctx, fullDirPath)
  134. if err == nil {
  135. return os.ErrExist
  136. }
  137. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  138. dir, name := util.FullPath(fullDirPath).DirAndName()
  139. request := &filer_pb.CreateEntryRequest{
  140. Directory: dir,
  141. Entry: &filer_pb.Entry{
  142. Name: name,
  143. IsDirectory: true,
  144. Attributes: &filer_pb.FuseAttributes{
  145. Mtime: time.Now().Unix(),
  146. Crtime: time.Now().Unix(),
  147. FileMode: uint32(perm | os.ModeDir),
  148. Uid: fs.option.Uid,
  149. Gid: fs.option.Gid,
  150. },
  151. },
  152. Signatures: []int32{fs.signature},
  153. }
  154. glog.V(1).Infof("mkdir: %v", request)
  155. if err := filer_pb.CreateEntry(client, request); err != nil {
  156. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  157. }
  158. return nil
  159. })
  160. }
  161. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  162. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  163. var err error
  164. if fullFilePath, err = clearName(fullFilePath); err != nil {
  165. return nil, err
  166. }
  167. if flag&os.O_CREATE != 0 {
  168. // file should not have / suffix.
  169. if strings.HasSuffix(fullFilePath, "/") {
  170. return nil, os.ErrInvalid
  171. }
  172. _, err = fs.stat(ctx, fullFilePath)
  173. if err == nil {
  174. if flag&os.O_EXCL != 0 {
  175. return nil, os.ErrExist
  176. }
  177. fs.removeAll(ctx, fullFilePath)
  178. }
  179. dir, name := util.FullPath(fullFilePath).DirAndName()
  180. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  181. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  182. Directory: dir,
  183. Entry: &filer_pb.Entry{
  184. Name: name,
  185. IsDirectory: perm&os.ModeDir > 0,
  186. Attributes: &filer_pb.FuseAttributes{
  187. Mtime: time.Now().Unix(),
  188. Crtime: time.Now().Unix(),
  189. FileMode: uint32(perm),
  190. Uid: fs.option.Uid,
  191. Gid: fs.option.Gid,
  192. Collection: fs.option.Collection,
  193. Replication: fs.option.Replication,
  194. TtlSec: 0,
  195. },
  196. },
  197. Signatures: []int32{fs.signature},
  198. }); err != nil {
  199. return fmt.Errorf("create %s: %v", fullFilePath, err)
  200. }
  201. return nil
  202. })
  203. if err != nil {
  204. return nil, err
  205. }
  206. return &WebDavFile{
  207. fs: fs,
  208. name: fullFilePath,
  209. isDirectory: false,
  210. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  211. }, nil
  212. }
  213. fi, err := fs.stat(ctx, fullFilePath)
  214. if err != nil {
  215. return nil, os.ErrNotExist
  216. }
  217. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  218. fullFilePath += "/"
  219. }
  220. return &WebDavFile{
  221. fs: fs,
  222. name: fullFilePath,
  223. isDirectory: false,
  224. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  225. }, nil
  226. }
  227. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  228. var err error
  229. if fullFilePath, err = clearName(fullFilePath); err != nil {
  230. return err
  231. }
  232. dir, name := util.FullPath(fullFilePath).DirAndName()
  233. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  234. }
  235. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  236. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  237. return fs.removeAll(ctx, name)
  238. }
  239. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  240. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  241. var err error
  242. if oldName, err = clearName(oldName); err != nil {
  243. return err
  244. }
  245. if newName, err = clearName(newName); err != nil {
  246. return err
  247. }
  248. of, err := fs.stat(ctx, oldName)
  249. if err != nil {
  250. return os.ErrExist
  251. }
  252. if of.IsDir() {
  253. if strings.HasSuffix(oldName, "/") {
  254. oldName = strings.TrimRight(oldName, "/")
  255. }
  256. if strings.HasSuffix(newName, "/") {
  257. newName = strings.TrimRight(newName, "/")
  258. }
  259. }
  260. _, err = fs.stat(ctx, newName)
  261. if err == nil {
  262. return os.ErrExist
  263. }
  264. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  265. newDir, newBaseName := util.FullPath(newName).DirAndName()
  266. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  267. request := &filer_pb.AtomicRenameEntryRequest{
  268. OldDirectory: oldDir,
  269. OldName: oldBaseName,
  270. NewDirectory: newDir,
  271. NewName: newBaseName,
  272. }
  273. _, err := client.AtomicRenameEntry(ctx, request)
  274. if err != nil {
  275. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  276. }
  277. return nil
  278. })
  279. }
  280. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  281. var err error
  282. if fullFilePath, err = clearName(fullFilePath); err != nil {
  283. return nil, err
  284. }
  285. fullpath := util.FullPath(fullFilePath)
  286. var fi FileInfo
  287. entry, err := filer_pb.GetEntry(fs, fullpath)
  288. if entry == nil {
  289. return nil, os.ErrNotExist
  290. }
  291. if err != nil {
  292. return nil, err
  293. }
  294. fi.size = int64(filer.FileSize(entry))
  295. fi.name = string(fullpath)
  296. fi.mode = os.FileMode(entry.Attributes.FileMode)
  297. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  298. fi.isDirectory = entry.IsDirectory
  299. if fi.name == "/" {
  300. fi.modifiledTime = time.Now()
  301. fi.isDirectory = true
  302. }
  303. return &fi, nil
  304. }
  305. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  306. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  307. return fs.stat(ctx, name)
  308. }
  309. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  310. var fileId, host string
  311. var auth security.EncodedJwt
  312. if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  313. ctx := context.Background()
  314. assignErr := util.Retry("assignVolume", func() error {
  315. request := &filer_pb.AssignVolumeRequest{
  316. Count: 1,
  317. Replication: f.fs.option.Replication,
  318. Collection: f.fs.option.Collection,
  319. DiskType: f.fs.option.DiskType,
  320. Path: name,
  321. }
  322. resp, err := client.AssignVolume(ctx, request)
  323. if err != nil {
  324. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  325. return err
  326. }
  327. if resp.Error != "" {
  328. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  329. }
  330. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  331. f.collection, f.replication = resp.Collection, resp.Replication
  332. return nil
  333. })
  334. if assignErr != nil {
  335. return assignErr
  336. }
  337. return nil
  338. }); flushErr != nil {
  339. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  340. }
  341. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  342. uploadOption := &operation.UploadOption{
  343. UploadUrl: fileUrl,
  344. Filename: f.name,
  345. Cipher: f.fs.option.Cipher,
  346. IsInputCompressed: false,
  347. MimeType: "",
  348. PairMap: nil,
  349. Jwt: auth,
  350. }
  351. uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
  352. if flushErr != nil {
  353. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  354. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  355. }
  356. if uploadResult.Error != "" {
  357. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  358. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  359. }
  360. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  361. }
  362. func (f *WebDavFile) Write(buf []byte) (int, error) {
  363. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  364. dir, _ := util.FullPath(f.name).DirAndName()
  365. var getErr error
  366. ctx := context.Background()
  367. if f.entry == nil {
  368. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  369. }
  370. if f.entry == nil {
  371. return 0, getErr
  372. }
  373. if getErr != nil {
  374. return 0, getErr
  375. }
  376. if f.bufWriter.FlushFunc == nil {
  377. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  378. var chunk *filer_pb.FileChunk
  379. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  380. if flushErr != nil {
  381. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  382. }
  383. f.entry.Content = nil
  384. f.entry.Chunks = append(f.entry.Chunks, chunk)
  385. return flushErr
  386. }
  387. f.bufWriter.CloseFunc = func() error {
  388. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  389. if manifestErr != nil {
  390. // not good, but should be ok
  391. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  392. } else {
  393. f.entry.Chunks = manifestedChunks
  394. }
  395. flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  396. f.entry.Attributes.Mtime = time.Now().Unix()
  397. f.entry.Attributes.Collection = f.collection
  398. f.entry.Attributes.Replication = f.replication
  399. request := &filer_pb.UpdateEntryRequest{
  400. Directory: dir,
  401. Entry: f.entry,
  402. Signatures: []int32{f.fs.signature},
  403. }
  404. if _, err := client.UpdateEntry(ctx, request); err != nil {
  405. return fmt.Errorf("update %s: %v", f.name, err)
  406. }
  407. return nil
  408. })
  409. return flushErr
  410. }
  411. }
  412. written, err := f.bufWriter.Write(buf)
  413. if err == nil {
  414. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  415. f.off += int64(written)
  416. }
  417. return written, err
  418. }
  419. func (f *WebDavFile) Close() error {
  420. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  421. err := f.bufWriter.Close()
  422. if f.entry != nil {
  423. f.entry = nil
  424. f.entryViewCache = nil
  425. }
  426. return err
  427. }
  428. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  429. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  430. if f.entry == nil {
  431. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  432. }
  433. if f.entry == nil {
  434. return 0, err
  435. }
  436. if err != nil {
  437. return 0, err
  438. }
  439. fileSize := int64(filer.FileSize(f.entry))
  440. if fileSize == 0 {
  441. return 0, io.EOF
  442. }
  443. if f.entryViewCache == nil {
  444. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64)
  445. f.reader = nil
  446. }
  447. if f.reader == nil {
  448. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
  449. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  450. }
  451. readSize, err = f.reader.ReadAt(p, f.off)
  452. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  453. f.off += int64(readSize)
  454. if err != nil && err != io.EOF {
  455. glog.Errorf("file read %s: %v", f.name, err)
  456. }
  457. return
  458. }
  459. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  460. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  461. dir, _ := util.FullPath(f.name).DirAndName()
  462. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  463. fi := FileInfo{
  464. size: int64(filer.FileSize(entry)),
  465. name: entry.Name,
  466. mode: os.FileMode(entry.Attributes.FileMode),
  467. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  468. isDirectory: entry.IsDirectory,
  469. }
  470. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  471. fi.name += "/"
  472. }
  473. glog.V(4).Infof("entry: %v", fi.name)
  474. ret = append(ret, &fi)
  475. return nil
  476. })
  477. old := f.off
  478. if old >= int64(len(ret)) {
  479. if count > 0 {
  480. return nil, io.EOF
  481. }
  482. return nil, nil
  483. }
  484. if count > 0 {
  485. f.off += int64(count)
  486. if f.off > int64(len(ret)) {
  487. f.off = int64(len(ret))
  488. }
  489. } else {
  490. f.off = int64(len(ret))
  491. old = 0
  492. }
  493. return ret[old:f.off], nil
  494. }
  495. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  496. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  497. ctx := context.Background()
  498. var err error
  499. switch whence {
  500. case io.SeekStart:
  501. f.off = 0
  502. case io.SeekEnd:
  503. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  504. return 0, err
  505. } else {
  506. f.off = fi.Size()
  507. }
  508. }
  509. f.off += offset
  510. return f.off, err
  511. }
  512. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  513. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  514. ctx := context.Background()
  515. return f.fs.stat(ctx, f.name)
  516. }