You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

634 lines
16 KiB

5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
  12. "golang.org/x/net/webdav"
  13. "google.golang.org/grpc"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  19. "github.com/chrislusf/seaweedfs/weed/filer"
  20. "github.com/chrislusf/seaweedfs/weed/glog"
  21. "github.com/chrislusf/seaweedfs/weed/security"
  22. )
  23. type WebDavOption struct {
  24. Filer pb.ServerAddress
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. }
  37. type WebDavServer struct {
  38. option *WebDavOption
  39. secret security.SigningKey
  40. filer *filer.Filer
  41. grpcDialOption grpc.DialOption
  42. Handler *webdav.Handler
  43. }
  44. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  45. fs, _ := NewWebDavFileSystem(option)
  46. ws = &WebDavServer{
  47. option: option,
  48. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  49. Handler: &webdav.Handler{
  50. FileSystem: fs,
  51. LockSystem: webdav.NewMemLS(),
  52. },
  53. }
  54. return ws, nil
  55. }
  56. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  57. type WebDavFileSystem struct {
  58. option *WebDavOption
  59. secret security.SigningKey
  60. filer *filer.Filer
  61. grpcDialOption grpc.DialOption
  62. chunkCache *chunk_cache.TieredChunkCache
  63. signature int32
  64. }
  65. type FileInfo struct {
  66. name string
  67. size int64
  68. mode os.FileMode
  69. modifiledTime time.Time
  70. isDirectory bool
  71. }
  72. func (fi *FileInfo) Name() string { return fi.name }
  73. func (fi *FileInfo) Size() int64 { return fi.size }
  74. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  75. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  76. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  77. func (fi *FileInfo) Sys() interface{} { return nil }
  78. type WebDavFile struct {
  79. fs *WebDavFileSystem
  80. name string
  81. isDirectory bool
  82. off int64
  83. entry *filer_pb.Entry
  84. entryViewCache []filer.VisibleInterval
  85. reader io.ReaderAt
  86. bufWriter *buffered_writer.BufferedWriteCloser
  87. collection string
  88. replication string
  89. }
  90. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  91. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  92. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  93. os.MkdirAll(cacheDir, os.FileMode(0755))
  94. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  95. return &WebDavFileSystem{
  96. option: option,
  97. chunkCache: chunkCache,
  98. signature: util.RandomInt32(),
  99. }, nil
  100. }
  101. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  102. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  103. return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
  104. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  105. return fn(client)
  106. }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
  107. }
  108. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  109. return location.Url
  110. }
  111. func clearName(name string) (string, error) {
  112. slashed := strings.HasSuffix(name, "/")
  113. name = path.Clean(name)
  114. if !strings.HasSuffix(name, "/") && slashed {
  115. name += "/"
  116. }
  117. if !strings.HasPrefix(name, "/") {
  118. return "", os.ErrInvalid
  119. }
  120. return name, nil
  121. }
  122. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  123. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  124. if !strings.HasSuffix(fullDirPath, "/") {
  125. fullDirPath += "/"
  126. }
  127. var err error
  128. if fullDirPath, err = clearName(fullDirPath); err != nil {
  129. return err
  130. }
  131. _, err = fs.stat(ctx, fullDirPath)
  132. if err == nil {
  133. return os.ErrExist
  134. }
  135. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  136. dir, name := util.FullPath(fullDirPath).DirAndName()
  137. request := &filer_pb.CreateEntryRequest{
  138. Directory: dir,
  139. Entry: &filer_pb.Entry{
  140. Name: name,
  141. IsDirectory: true,
  142. Attributes: &filer_pb.FuseAttributes{
  143. Mtime: time.Now().Unix(),
  144. Crtime: time.Now().Unix(),
  145. FileMode: uint32(perm | os.ModeDir),
  146. Uid: fs.option.Uid,
  147. Gid: fs.option.Gid,
  148. },
  149. },
  150. Signatures: []int32{fs.signature},
  151. }
  152. glog.V(1).Infof("mkdir: %v", request)
  153. if err := filer_pb.CreateEntry(client, request); err != nil {
  154. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  155. }
  156. return nil
  157. })
  158. }
  159. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  160. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  161. var err error
  162. if fullFilePath, err = clearName(fullFilePath); err != nil {
  163. return nil, err
  164. }
  165. if flag&os.O_CREATE != 0 {
  166. // file should not have / suffix.
  167. if strings.HasSuffix(fullFilePath, "/") {
  168. return nil, os.ErrInvalid
  169. }
  170. _, err = fs.stat(ctx, fullFilePath)
  171. if err == nil {
  172. if flag&os.O_EXCL != 0 {
  173. return nil, os.ErrExist
  174. }
  175. fs.removeAll(ctx, fullFilePath)
  176. }
  177. dir, name := util.FullPath(fullFilePath).DirAndName()
  178. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  179. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  180. Directory: dir,
  181. Entry: &filer_pb.Entry{
  182. Name: name,
  183. IsDirectory: perm&os.ModeDir > 0,
  184. Attributes: &filer_pb.FuseAttributes{
  185. Mtime: time.Now().Unix(),
  186. Crtime: time.Now().Unix(),
  187. FileMode: uint32(perm),
  188. Uid: fs.option.Uid,
  189. Gid: fs.option.Gid,
  190. Collection: fs.option.Collection,
  191. Replication: fs.option.Replication,
  192. TtlSec: 0,
  193. },
  194. },
  195. Signatures: []int32{fs.signature},
  196. }); err != nil {
  197. return fmt.Errorf("create %s: %v", fullFilePath, err)
  198. }
  199. return nil
  200. })
  201. if err != nil {
  202. return nil, err
  203. }
  204. return &WebDavFile{
  205. fs: fs,
  206. name: fullFilePath,
  207. isDirectory: false,
  208. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  209. }, nil
  210. }
  211. fi, err := fs.stat(ctx, fullFilePath)
  212. if err != nil {
  213. return nil, os.ErrNotExist
  214. }
  215. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  216. fullFilePath += "/"
  217. }
  218. return &WebDavFile{
  219. fs: fs,
  220. name: fullFilePath,
  221. isDirectory: false,
  222. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  223. }, nil
  224. }
  225. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  226. var err error
  227. if fullFilePath, err = clearName(fullFilePath); err != nil {
  228. return err
  229. }
  230. dir, name := util.FullPath(fullFilePath).DirAndName()
  231. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  232. }
  233. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  234. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  235. return fs.removeAll(ctx, name)
  236. }
  237. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  238. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  239. var err error
  240. if oldName, err = clearName(oldName); err != nil {
  241. return err
  242. }
  243. if newName, err = clearName(newName); err != nil {
  244. return err
  245. }
  246. of, err := fs.stat(ctx, oldName)
  247. if err != nil {
  248. return os.ErrExist
  249. }
  250. if of.IsDir() {
  251. if strings.HasSuffix(oldName, "/") {
  252. oldName = strings.TrimRight(oldName, "/")
  253. }
  254. if strings.HasSuffix(newName, "/") {
  255. newName = strings.TrimRight(newName, "/")
  256. }
  257. }
  258. _, err = fs.stat(ctx, newName)
  259. if err == nil {
  260. return os.ErrExist
  261. }
  262. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  263. newDir, newBaseName := util.FullPath(newName).DirAndName()
  264. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  265. request := &filer_pb.AtomicRenameEntryRequest{
  266. OldDirectory: oldDir,
  267. OldName: oldBaseName,
  268. NewDirectory: newDir,
  269. NewName: newBaseName,
  270. }
  271. _, err := client.AtomicRenameEntry(ctx, request)
  272. if err != nil {
  273. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  274. }
  275. return nil
  276. })
  277. }
  278. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  279. var err error
  280. if fullFilePath, err = clearName(fullFilePath); err != nil {
  281. return nil, err
  282. }
  283. fullpath := util.FullPath(fullFilePath)
  284. var fi FileInfo
  285. entry, err := filer_pb.GetEntry(fs, fullpath)
  286. if entry == nil {
  287. return nil, os.ErrNotExist
  288. }
  289. if err != nil {
  290. return nil, err
  291. }
  292. fi.size = int64(filer.FileSize(entry))
  293. fi.name = string(fullpath)
  294. fi.mode = os.FileMode(entry.Attributes.FileMode)
  295. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  296. fi.isDirectory = entry.IsDirectory
  297. if fi.name == "/" {
  298. fi.modifiledTime = time.Now()
  299. fi.isDirectory = true
  300. }
  301. return &fi, nil
  302. }
  303. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  304. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  305. return fs.stat(ctx, name)
  306. }
  307. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  308. var fileId, host string
  309. var auth security.EncodedJwt
  310. if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  311. ctx := context.Background()
  312. assignErr := util.Retry("assignVolume", func() error {
  313. request := &filer_pb.AssignVolumeRequest{
  314. Count: 1,
  315. Replication: f.fs.option.Replication,
  316. Collection: f.fs.option.Collection,
  317. DiskType: f.fs.option.DiskType,
  318. Path: name,
  319. }
  320. resp, err := client.AssignVolume(ctx, request)
  321. if err != nil {
  322. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  323. return err
  324. }
  325. if resp.Error != "" {
  326. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  327. }
  328. fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
  329. f.collection, f.replication = resp.Collection, resp.Replication
  330. return nil
  331. })
  332. if assignErr != nil {
  333. return assignErr
  334. }
  335. return nil
  336. }); flushErr != nil {
  337. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  338. }
  339. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  340. uploadOption := &operation.UploadOption{
  341. UploadUrl: fileUrl,
  342. Filename: f.name,
  343. Cipher: f.fs.option.Cipher,
  344. IsInputCompressed: false,
  345. MimeType: "",
  346. PairMap: nil,
  347. Jwt: auth,
  348. }
  349. uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
  350. if flushErr != nil {
  351. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  352. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  353. }
  354. if uploadResult.Error != "" {
  355. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  356. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  357. }
  358. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  359. }
  360. func (f *WebDavFile) Write(buf []byte) (int, error) {
  361. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  362. dir, _ := util.FullPath(f.name).DirAndName()
  363. var getErr error
  364. ctx := context.Background()
  365. if f.entry == nil {
  366. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  367. }
  368. if f.entry == nil {
  369. return 0, getErr
  370. }
  371. if getErr != nil {
  372. return 0, getErr
  373. }
  374. if f.bufWriter.FlushFunc == nil {
  375. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  376. var chunk *filer_pb.FileChunk
  377. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  378. if flushErr != nil {
  379. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  380. }
  381. f.entry.Content = nil
  382. f.entry.Chunks = append(f.entry.Chunks, chunk)
  383. return flushErr
  384. }
  385. f.bufWriter.CloseFunc = func() error {
  386. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  387. if manifestErr != nil {
  388. // not good, but should be ok
  389. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  390. } else {
  391. f.entry.Chunks = manifestedChunks
  392. }
  393. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  394. f.entry.Attributes.Mtime = time.Now().Unix()
  395. f.entry.Attributes.Collection = f.collection
  396. f.entry.Attributes.Replication = f.replication
  397. request := &filer_pb.UpdateEntryRequest{
  398. Directory: dir,
  399. Entry: f.entry,
  400. Signatures: []int32{f.fs.signature},
  401. }
  402. if _, err := client.UpdateEntry(ctx, request); err != nil {
  403. return fmt.Errorf("update %s: %v", f.name, err)
  404. }
  405. return nil
  406. })
  407. return flushErr
  408. }
  409. }
  410. written, err := f.bufWriter.Write(buf)
  411. if err == nil {
  412. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  413. f.off += int64(written)
  414. }
  415. return written, err
  416. }
  417. func (f *WebDavFile) Close() error {
  418. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  419. err := f.bufWriter.Close()
  420. if f.entry != nil {
  421. f.entry = nil
  422. f.entryViewCache = nil
  423. }
  424. return err
  425. }
  426. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  427. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  428. if f.entry == nil {
  429. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  430. }
  431. if f.entry == nil {
  432. return 0, err
  433. }
  434. if err != nil {
  435. return 0, err
  436. }
  437. fileSize := int64(filer.FileSize(f.entry))
  438. if fileSize == 0 {
  439. return 0, io.EOF
  440. }
  441. if f.entryViewCache == nil {
  442. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
  443. f.reader = nil
  444. }
  445. if f.reader == nil {
  446. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
  447. f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
  448. }
  449. readSize, err = f.reader.ReadAt(p, f.off)
  450. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  451. f.off += int64(readSize)
  452. if err != nil && err != io.EOF {
  453. glog.Errorf("file read %s: %v", f.name, err)
  454. }
  455. return
  456. }
  457. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  458. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  459. dir, _ := util.FullPath(f.name).DirAndName()
  460. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  461. fi := FileInfo{
  462. size: int64(filer.FileSize(entry)),
  463. name: entry.Name,
  464. mode: os.FileMode(entry.Attributes.FileMode),
  465. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  466. isDirectory: entry.IsDirectory,
  467. }
  468. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  469. fi.name += "/"
  470. }
  471. glog.V(4).Infof("entry: %v", fi.name)
  472. ret = append(ret, &fi)
  473. return nil
  474. })
  475. old := f.off
  476. if old >= int64(len(ret)) {
  477. if count > 0 {
  478. return nil, io.EOF
  479. }
  480. return nil, nil
  481. }
  482. if count > 0 {
  483. f.off += int64(count)
  484. if f.off > int64(len(ret)) {
  485. f.off = int64(len(ret))
  486. }
  487. } else {
  488. f.off = int64(len(ret))
  489. old = 0
  490. }
  491. return ret[old:f.off], nil
  492. }
  493. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  494. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  495. ctx := context.Background()
  496. var err error
  497. switch whence {
  498. case io.SeekStart:
  499. f.off = 0
  500. case io.SeekEnd:
  501. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  502. return 0, err
  503. } else {
  504. f.off = fi.Size()
  505. }
  506. }
  507. f.off += offset
  508. return f.off, err
  509. }
  510. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  511. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  512. ctx := context.Background()
  513. return f.fs.stat(ctx, f.name)
  514. }