You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

635 lines
16 KiB

5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "path"
  10. "strings"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
  13. "golang.org/x/net/webdav"
  14. "google.golang.org/grpc"
  15. "github.com/chrislusf/seaweedfs/weed/operation"
  16. "github.com/chrislusf/seaweedfs/weed/pb"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  20. "github.com/chrislusf/seaweedfs/weed/filer"
  21. "github.com/chrislusf/seaweedfs/weed/glog"
  22. "github.com/chrislusf/seaweedfs/weed/security"
  23. )
  24. type WebDavOption struct {
  25. Filer pb.ServerAddress
  26. DomainName string
  27. BucketsPath string
  28. GrpcDialOption grpc.DialOption
  29. Collection string
  30. Replication string
  31. DiskType string
  32. Uid uint32
  33. Gid uint32
  34. Cipher bool
  35. CacheDir string
  36. CacheSizeMB int64
  37. }
  38. type WebDavServer struct {
  39. option *WebDavOption
  40. secret security.SigningKey
  41. filer *filer.Filer
  42. grpcDialOption grpc.DialOption
  43. Handler *webdav.Handler
  44. }
  45. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  46. fs, _ := NewWebDavFileSystem(option)
  47. ws = &WebDavServer{
  48. option: option,
  49. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  50. Handler: &webdav.Handler{
  51. FileSystem: fs,
  52. LockSystem: webdav.NewMemLS(),
  53. },
  54. }
  55. return ws, nil
  56. }
  57. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  58. type WebDavFileSystem struct {
  59. option *WebDavOption
  60. secret security.SigningKey
  61. filer *filer.Filer
  62. grpcDialOption grpc.DialOption
  63. chunkCache *chunk_cache.TieredChunkCache
  64. signature int32
  65. }
  66. type FileInfo struct {
  67. name string
  68. size int64
  69. mode os.FileMode
  70. modifiledTime time.Time
  71. isDirectory bool
  72. }
  73. func (fi *FileInfo) Name() string { return fi.name }
  74. func (fi *FileInfo) Size() int64 { return fi.size }
  75. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  76. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  77. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  78. func (fi *FileInfo) Sys() interface{} { return nil }
  79. type WebDavFile struct {
  80. fs *WebDavFileSystem
  81. name string
  82. isDirectory bool
  83. off int64
  84. entry *filer_pb.Entry
  85. entryViewCache []filer.VisibleInterval
  86. reader io.ReaderAt
  87. bufWriter *buffered_writer.BufferedWriteCloser
  88. collection string
  89. replication string
  90. }
  91. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  92. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  93. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  94. os.MkdirAll(cacheDir, os.FileMode(0755))
  95. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  96. return &WebDavFileSystem{
  97. option: option,
  98. chunkCache: chunkCache,
  99. signature: util.RandomInt32(),
  100. }, nil
  101. }
  102. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  103. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  104. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  105. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  106. return fn(client)
  107. }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
  108. }
  109. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  110. return location.Url
  111. }
  112. func clearName(name string) (string, error) {
  113. slashed := strings.HasSuffix(name, "/")
  114. name = path.Clean(name)
  115. if !strings.HasSuffix(name, "/") && slashed {
  116. name += "/"
  117. }
  118. if !strings.HasPrefix(name, "/") {
  119. return "", os.ErrInvalid
  120. }
  121. return name, nil
  122. }
  123. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  124. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  125. if !strings.HasSuffix(fullDirPath, "/") {
  126. fullDirPath += "/"
  127. }
  128. var err error
  129. if fullDirPath, err = clearName(fullDirPath); err != nil {
  130. return err
  131. }
  132. _, err = fs.stat(ctx, fullDirPath)
  133. if err == nil {
  134. return os.ErrExist
  135. }
  136. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  137. dir, name := util.FullPath(fullDirPath).DirAndName()
  138. request := &filer_pb.CreateEntryRequest{
  139. Directory: dir,
  140. Entry: &filer_pb.Entry{
  141. Name: name,
  142. IsDirectory: true,
  143. Attributes: &filer_pb.FuseAttributes{
  144. Mtime: time.Now().Unix(),
  145. Crtime: time.Now().Unix(),
  146. FileMode: uint32(perm | os.ModeDir),
  147. Uid: fs.option.Uid,
  148. Gid: fs.option.Gid,
  149. },
  150. },
  151. Signatures: []int32{fs.signature},
  152. }
  153. glog.V(1).Infof("mkdir: %v", request)
  154. if err := filer_pb.CreateEntry(client, request); err != nil {
  155. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  156. }
  157. return nil
  158. })
  159. }
  160. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  161. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  162. var err error
  163. if fullFilePath, err = clearName(fullFilePath); err != nil {
  164. return nil, err
  165. }
  166. if flag&os.O_CREATE != 0 {
  167. // file should not have / suffix.
  168. if strings.HasSuffix(fullFilePath, "/") {
  169. return nil, os.ErrInvalid
  170. }
  171. _, err = fs.stat(ctx, fullFilePath)
  172. if err == nil {
  173. if flag&os.O_EXCL != 0 {
  174. return nil, os.ErrExist
  175. }
  176. fs.removeAll(ctx, fullFilePath)
  177. }
  178. dir, name := util.FullPath(fullFilePath).DirAndName()
  179. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  180. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  181. Directory: dir,
  182. Entry: &filer_pb.Entry{
  183. Name: name,
  184. IsDirectory: perm&os.ModeDir > 0,
  185. Attributes: &filer_pb.FuseAttributes{
  186. Mtime: time.Now().Unix(),
  187. Crtime: time.Now().Unix(),
  188. FileMode: uint32(perm),
  189. Uid: fs.option.Uid,
  190. Gid: fs.option.Gid,
  191. Collection: fs.option.Collection,
  192. Replication: fs.option.Replication,
  193. TtlSec: 0,
  194. },
  195. },
  196. Signatures: []int32{fs.signature},
  197. }); err != nil {
  198. return fmt.Errorf("create %s: %v", fullFilePath, err)
  199. }
  200. return nil
  201. })
  202. if err != nil {
  203. return nil, err
  204. }
  205. return &WebDavFile{
  206. fs: fs,
  207. name: fullFilePath,
  208. isDirectory: false,
  209. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  210. }, nil
  211. }
  212. fi, err := fs.stat(ctx, fullFilePath)
  213. if err != nil {
  214. return nil, os.ErrNotExist
  215. }
  216. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  217. fullFilePath += "/"
  218. }
  219. return &WebDavFile{
  220. fs: fs,
  221. name: fullFilePath,
  222. isDirectory: false,
  223. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  224. }, nil
  225. }
  226. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  227. var err error
  228. if fullFilePath, err = clearName(fullFilePath); err != nil {
  229. return err
  230. }
  231. dir, name := util.FullPath(fullFilePath).DirAndName()
  232. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  233. }
  234. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  235. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  236. return fs.removeAll(ctx, name)
  237. }
  238. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  239. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  240. var err error
  241. if oldName, err = clearName(oldName); err != nil {
  242. return err
  243. }
  244. if newName, err = clearName(newName); err != nil {
  245. return err
  246. }
  247. of, err := fs.stat(ctx, oldName)
  248. if err != nil {
  249. return os.ErrExist
  250. }
  251. if of.IsDir() {
  252. if strings.HasSuffix(oldName, "/") {
  253. oldName = strings.TrimRight(oldName, "/")
  254. }
  255. if strings.HasSuffix(newName, "/") {
  256. newName = strings.TrimRight(newName, "/")
  257. }
  258. }
  259. _, err = fs.stat(ctx, newName)
  260. if err == nil {
  261. return os.ErrExist
  262. }
  263. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  264. newDir, newBaseName := util.FullPath(newName).DirAndName()
  265. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  266. request := &filer_pb.AtomicRenameEntryRequest{
  267. OldDirectory: oldDir,
  268. OldName: oldBaseName,
  269. NewDirectory: newDir,
  270. NewName: newBaseName,
  271. }
  272. _, err := client.AtomicRenameEntry(ctx, request)
  273. if err != nil {
  274. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  275. }
  276. return nil
  277. })
  278. }
  279. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  280. var err error
  281. if fullFilePath, err = clearName(fullFilePath); err != nil {
  282. return nil, err
  283. }
  284. fullpath := util.FullPath(fullFilePath)
  285. var fi FileInfo
  286. entry, err := filer_pb.GetEntry(fs, fullpath)
  287. if entry == nil {
  288. return nil, os.ErrNotExist
  289. }
  290. if err != nil {
  291. return nil, err
  292. }
  293. fi.size = int64(filer.FileSize(entry))
  294. fi.name = string(fullpath)
  295. fi.mode = os.FileMode(entry.Attributes.FileMode)
  296. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  297. fi.isDirectory = entry.IsDirectory
  298. if fi.name == "/" {
  299. fi.modifiledTime = time.Now()
  300. fi.isDirectory = true
  301. }
  302. return &fi, nil
  303. }
  304. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  305. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  306. return fs.stat(ctx, name)
  307. }
  308. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  309. var fileId, host string
  310. var auth security.EncodedJwt
  311. if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  312. ctx := context.Background()
  313. assignErr := util.Retry("assignVolume", func() error {
  314. request := &filer_pb.AssignVolumeRequest{
  315. Count: 1,
  316. Replication: f.fs.option.Replication,
  317. Collection: f.fs.option.Collection,
  318. DiskType: f.fs.option.DiskType,
  319. Path: name,
  320. }
  321. resp, err := client.AssignVolume(ctx, request)
  322. if err != nil {
  323. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  324. return err
  325. }
  326. if resp.Error != "" {
  327. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  328. }
  329. fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
  330. f.collection, f.replication = resp.Collection, resp.Replication
  331. return nil
  332. })
  333. if assignErr != nil {
  334. return assignErr
  335. }
  336. return nil
  337. }); flushErr != nil {
  338. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  339. }
  340. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  341. uploadOption := &operation.UploadOption{
  342. UploadUrl: fileUrl,
  343. Filename: f.name,
  344. Cipher: f.fs.option.Cipher,
  345. IsInputCompressed: false,
  346. MimeType: "",
  347. PairMap: nil,
  348. Jwt: auth,
  349. }
  350. uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
  351. if flushErr != nil {
  352. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  353. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  354. }
  355. if uploadResult.Error != "" {
  356. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  357. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  358. }
  359. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  360. }
  361. func (f *WebDavFile) Write(buf []byte) (int, error) {
  362. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  363. dir, _ := util.FullPath(f.name).DirAndName()
  364. var getErr error
  365. ctx := context.Background()
  366. if f.entry == nil {
  367. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  368. }
  369. if f.entry == nil {
  370. return 0, getErr
  371. }
  372. if getErr != nil {
  373. return 0, getErr
  374. }
  375. if f.bufWriter.FlushFunc == nil {
  376. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  377. var chunk *filer_pb.FileChunk
  378. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  379. if flushErr != nil {
  380. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  381. }
  382. f.entry.Content = nil
  383. f.entry.Chunks = append(f.entry.Chunks, chunk)
  384. return flushErr
  385. }
  386. f.bufWriter.CloseFunc = func() error {
  387. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  388. if manifestErr != nil {
  389. // not good, but should be ok
  390. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  391. } else {
  392. f.entry.Chunks = manifestedChunks
  393. }
  394. flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  395. f.entry.Attributes.Mtime = time.Now().Unix()
  396. f.entry.Attributes.Collection = f.collection
  397. f.entry.Attributes.Replication = f.replication
  398. request := &filer_pb.UpdateEntryRequest{
  399. Directory: dir,
  400. Entry: f.entry,
  401. Signatures: []int32{f.fs.signature},
  402. }
  403. if _, err := client.UpdateEntry(ctx, request); err != nil {
  404. return fmt.Errorf("update %s: %v", f.name, err)
  405. }
  406. return nil
  407. })
  408. return flushErr
  409. }
  410. }
  411. written, err := f.bufWriter.Write(buf)
  412. if err == nil {
  413. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  414. f.off += int64(written)
  415. }
  416. return written, err
  417. }
  418. func (f *WebDavFile) Close() error {
  419. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  420. err := f.bufWriter.Close()
  421. if f.entry != nil {
  422. f.entry = nil
  423. f.entryViewCache = nil
  424. }
  425. return err
  426. }
  427. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  428. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  429. if f.entry == nil {
  430. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  431. }
  432. if f.entry == nil {
  433. return 0, err
  434. }
  435. if err != nil {
  436. return 0, err
  437. }
  438. fileSize := int64(filer.FileSize(f.entry))
  439. if fileSize == 0 {
  440. return 0, io.EOF
  441. }
  442. if f.entryViewCache == nil {
  443. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64)
  444. f.reader = nil
  445. }
  446. if f.reader == nil {
  447. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
  448. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  449. }
  450. readSize, err = f.reader.ReadAt(p, f.off)
  451. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  452. f.off += int64(readSize)
  453. if err != nil && err != io.EOF {
  454. glog.Errorf("file read %s: %v", f.name, err)
  455. }
  456. return
  457. }
  458. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  459. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  460. dir, _ := util.FullPath(f.name).DirAndName()
  461. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  462. fi := FileInfo{
  463. size: int64(filer.FileSize(entry)),
  464. name: entry.Name,
  465. mode: os.FileMode(entry.Attributes.FileMode),
  466. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  467. isDirectory: entry.IsDirectory,
  468. }
  469. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  470. fi.name += "/"
  471. }
  472. glog.V(4).Infof("entry: %v", fi.name)
  473. ret = append(ret, &fi)
  474. return nil
  475. })
  476. old := f.off
  477. if old >= int64(len(ret)) {
  478. if count > 0 {
  479. return nil, io.EOF
  480. }
  481. return nil, nil
  482. }
  483. if count > 0 {
  484. f.off += int64(count)
  485. if f.off > int64(len(ret)) {
  486. f.off = int64(len(ret))
  487. }
  488. } else {
  489. f.off = int64(len(ret))
  490. old = 0
  491. }
  492. return ret[old:f.off], nil
  493. }
  494. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  495. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  496. ctx := context.Background()
  497. var err error
  498. switch whence {
  499. case io.SeekStart:
  500. f.off = 0
  501. case io.SeekEnd:
  502. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  503. return 0, err
  504. } else {
  505. f.off = fi.Size()
  506. }
  507. }
  508. f.off += offset
  509. return f.off, err
  510. }
  511. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  512. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  513. ctx := context.Background()
  514. return f.fs.stat(ctx, f.name)
  515. }