You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

615 lines
15 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "path"
  10. "strings"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
  13. "golang.org/x/net/webdav"
  14. "google.golang.org/grpc"
  15. "github.com/chrislusf/seaweedfs/weed/operation"
  16. "github.com/chrislusf/seaweedfs/weed/pb"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  20. "github.com/chrislusf/seaweedfs/weed/filer"
  21. "github.com/chrislusf/seaweedfs/weed/glog"
  22. "github.com/chrislusf/seaweedfs/weed/security"
  23. )
  24. type WebDavOption struct {
  25. Filer string
  26. FilerGrpcAddress string
  27. DomainName string
  28. BucketsPath string
  29. GrpcDialOption grpc.DialOption
  30. Collection string
  31. DiskType string
  32. Uid uint32
  33. Gid uint32
  34. Cipher bool
  35. CacheDir string
  36. CacheSizeMB int64
  37. }
  38. type WebDavServer struct {
  39. option *WebDavOption
  40. secret security.SigningKey
  41. filer *filer.Filer
  42. grpcDialOption grpc.DialOption
  43. Handler *webdav.Handler
  44. }
  45. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  46. fs, _ := NewWebDavFileSystem(option)
  47. ws = &WebDavServer{
  48. option: option,
  49. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  50. Handler: &webdav.Handler{
  51. FileSystem: fs,
  52. LockSystem: webdav.NewMemLS(),
  53. },
  54. }
  55. return ws, nil
  56. }
  57. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  58. type WebDavFileSystem struct {
  59. option *WebDavOption
  60. secret security.SigningKey
  61. filer *filer.Filer
  62. grpcDialOption grpc.DialOption
  63. chunkCache *chunk_cache.TieredChunkCache
  64. signature int32
  65. }
  66. type FileInfo struct {
  67. name string
  68. size int64
  69. mode os.FileMode
  70. modifiledTime time.Time
  71. isDirectory bool
  72. }
  73. func (fi *FileInfo) Name() string { return fi.name }
  74. func (fi *FileInfo) Size() int64 { return fi.size }
  75. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  76. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  77. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  78. func (fi *FileInfo) Sys() interface{} { return nil }
  79. type WebDavFile struct {
  80. fs *WebDavFileSystem
  81. name string
  82. isDirectory bool
  83. off int64
  84. entry *filer_pb.Entry
  85. entryViewCache []filer.VisibleInterval
  86. reader io.ReaderAt
  87. bufWriter *buffered_writer.BufferedWriteCloser
  88. collection string
  89. replication string
  90. }
  91. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  92. chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024)
  93. return &WebDavFileSystem{
  94. option: option,
  95. chunkCache: chunkCache,
  96. signature: util.RandomInt32(),
  97. }, nil
  98. }
  99. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  100. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  101. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  102. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  103. return fn(client)
  104. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  105. }
  106. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  107. return location.Url
  108. }
  109. func clearName(name string) (string, error) {
  110. slashed := strings.HasSuffix(name, "/")
  111. name = path.Clean(name)
  112. if !strings.HasSuffix(name, "/") && slashed {
  113. name += "/"
  114. }
  115. if !strings.HasPrefix(name, "/") {
  116. return "", os.ErrInvalid
  117. }
  118. return name, nil
  119. }
  120. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  121. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  122. if !strings.HasSuffix(fullDirPath, "/") {
  123. fullDirPath += "/"
  124. }
  125. var err error
  126. if fullDirPath, err = clearName(fullDirPath); err != nil {
  127. return err
  128. }
  129. _, err = fs.stat(ctx, fullDirPath)
  130. if err == nil {
  131. return os.ErrExist
  132. }
  133. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  134. dir, name := util.FullPath(fullDirPath).DirAndName()
  135. request := &filer_pb.CreateEntryRequest{
  136. Directory: dir,
  137. Entry: &filer_pb.Entry{
  138. Name: name,
  139. IsDirectory: true,
  140. Attributes: &filer_pb.FuseAttributes{
  141. Mtime: time.Now().Unix(),
  142. Crtime: time.Now().Unix(),
  143. FileMode: uint32(perm | os.ModeDir),
  144. Uid: fs.option.Uid,
  145. Gid: fs.option.Gid,
  146. },
  147. },
  148. Signatures: []int32{fs.signature},
  149. }
  150. glog.V(1).Infof("mkdir: %v", request)
  151. if err := filer_pb.CreateEntry(client, request); err != nil {
  152. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  153. }
  154. return nil
  155. })
  156. }
  157. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  158. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  159. var err error
  160. if fullFilePath, err = clearName(fullFilePath); err != nil {
  161. return nil, err
  162. }
  163. if flag&os.O_CREATE != 0 {
  164. // file should not have / suffix.
  165. if strings.HasSuffix(fullFilePath, "/") {
  166. return nil, os.ErrInvalid
  167. }
  168. _, err = fs.stat(ctx, fullFilePath)
  169. if err == nil {
  170. if flag&os.O_EXCL != 0 {
  171. return nil, os.ErrExist
  172. }
  173. fs.removeAll(ctx, fullFilePath)
  174. }
  175. dir, name := util.FullPath(fullFilePath).DirAndName()
  176. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  177. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  178. Directory: dir,
  179. Entry: &filer_pb.Entry{
  180. Name: name,
  181. IsDirectory: perm&os.ModeDir > 0,
  182. Attributes: &filer_pb.FuseAttributes{
  183. Mtime: time.Now().Unix(),
  184. Crtime: time.Now().Unix(),
  185. FileMode: uint32(perm),
  186. Uid: fs.option.Uid,
  187. Gid: fs.option.Gid,
  188. Collection: fs.option.Collection,
  189. Replication: "000",
  190. TtlSec: 0,
  191. },
  192. },
  193. Signatures: []int32{fs.signature},
  194. }); err != nil {
  195. return fmt.Errorf("create %s: %v", fullFilePath, err)
  196. }
  197. return nil
  198. })
  199. if err != nil {
  200. return nil, err
  201. }
  202. return &WebDavFile{
  203. fs: fs,
  204. name: fullFilePath,
  205. isDirectory: false,
  206. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  207. }, nil
  208. }
  209. fi, err := fs.stat(ctx, fullFilePath)
  210. if err != nil {
  211. return nil, os.ErrNotExist
  212. }
  213. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  214. fullFilePath += "/"
  215. }
  216. return &WebDavFile{
  217. fs: fs,
  218. name: fullFilePath,
  219. isDirectory: false,
  220. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  221. }, nil
  222. }
  223. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  224. var err error
  225. if fullFilePath, err = clearName(fullFilePath); err != nil {
  226. return err
  227. }
  228. dir, name := util.FullPath(fullFilePath).DirAndName()
  229. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  230. }
  231. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  232. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  233. return fs.removeAll(ctx, name)
  234. }
  235. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  236. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  237. var err error
  238. if oldName, err = clearName(oldName); err != nil {
  239. return err
  240. }
  241. if newName, err = clearName(newName); err != nil {
  242. return err
  243. }
  244. of, err := fs.stat(ctx, oldName)
  245. if err != nil {
  246. return os.ErrExist
  247. }
  248. if of.IsDir() {
  249. if strings.HasSuffix(oldName, "/") {
  250. oldName = strings.TrimRight(oldName, "/")
  251. }
  252. if strings.HasSuffix(newName, "/") {
  253. newName = strings.TrimRight(newName, "/")
  254. }
  255. }
  256. _, err = fs.stat(ctx, newName)
  257. if err == nil {
  258. return os.ErrExist
  259. }
  260. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  261. newDir, newBaseName := util.FullPath(newName).DirAndName()
  262. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  263. request := &filer_pb.AtomicRenameEntryRequest{
  264. OldDirectory: oldDir,
  265. OldName: oldBaseName,
  266. NewDirectory: newDir,
  267. NewName: newBaseName,
  268. }
  269. _, err := client.AtomicRenameEntry(ctx, request)
  270. if err != nil {
  271. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  272. }
  273. return nil
  274. })
  275. }
  276. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  277. var err error
  278. if fullFilePath, err = clearName(fullFilePath); err != nil {
  279. return nil, err
  280. }
  281. fullpath := util.FullPath(fullFilePath)
  282. var fi FileInfo
  283. entry, err := filer_pb.GetEntry(fs, fullpath)
  284. if entry == nil {
  285. return nil, os.ErrNotExist
  286. }
  287. if err != nil {
  288. return nil, err
  289. }
  290. fi.size = int64(filer.FileSize(entry))
  291. fi.name = string(fullpath)
  292. fi.mode = os.FileMode(entry.Attributes.FileMode)
  293. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  294. fi.isDirectory = entry.IsDirectory
  295. if fi.name == "/" {
  296. fi.modifiledTime = time.Now()
  297. fi.isDirectory = true
  298. }
  299. return &fi, nil
  300. }
  301. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  302. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  303. return fs.stat(ctx, name)
  304. }
  305. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  306. var fileId, host string
  307. var auth security.EncodedJwt
  308. if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  309. ctx := context.Background()
  310. request := &filer_pb.AssignVolumeRequest{
  311. Count: 1,
  312. Replication: "",
  313. Collection: f.fs.option.Collection,
  314. DiskType: f.fs.option.DiskType,
  315. Path: name,
  316. }
  317. resp, err := client.AssignVolume(ctx, request)
  318. if err != nil {
  319. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  320. return err
  321. }
  322. if resp.Error != "" {
  323. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  324. }
  325. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  326. f.collection, f.replication = resp.Collection, resp.Replication
  327. return nil
  328. }); flushErr != nil {
  329. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  330. }
  331. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  332. uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
  333. if flushErr != nil {
  334. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  335. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  336. }
  337. if uploadResult.Error != "" {
  338. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  339. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  340. }
  341. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  342. }
  343. func (f *WebDavFile) Write(buf []byte) (int, error) {
  344. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  345. dir, _ := util.FullPath(f.name).DirAndName()
  346. var getErr error
  347. ctx := context.Background()
  348. if f.entry == nil {
  349. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  350. }
  351. if f.entry == nil {
  352. return 0, getErr
  353. }
  354. if getErr != nil {
  355. return 0, getErr
  356. }
  357. if f.bufWriter.FlushFunc == nil {
  358. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  359. var chunk *filer_pb.FileChunk
  360. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  361. if flushErr != nil {
  362. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  363. }
  364. f.entry.Content = nil
  365. f.entry.Chunks = append(f.entry.Chunks, chunk)
  366. return flushErr
  367. }
  368. f.bufWriter.CloseFunc = func() error {
  369. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  370. if manifestErr != nil {
  371. // not good, but should be ok
  372. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  373. } else {
  374. f.entry.Chunks = manifestedChunks
  375. }
  376. flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  377. f.entry.Attributes.Mtime = time.Now().Unix()
  378. f.entry.Attributes.Collection = f.collection
  379. f.entry.Attributes.Replication = f.replication
  380. request := &filer_pb.UpdateEntryRequest{
  381. Directory: dir,
  382. Entry: f.entry,
  383. Signatures: []int32{f.fs.signature},
  384. }
  385. if _, err := client.UpdateEntry(ctx, request); err != nil {
  386. return fmt.Errorf("update %s: %v", f.name, err)
  387. }
  388. return nil
  389. })
  390. return flushErr
  391. }
  392. }
  393. written, err := f.bufWriter.Write(buf)
  394. if err == nil {
  395. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  396. f.off += int64(written)
  397. }
  398. return written, err
  399. }
  400. func (f *WebDavFile) Close() error {
  401. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  402. err := f.bufWriter.Close()
  403. if f.entry != nil {
  404. f.entry = nil
  405. f.entryViewCache = nil
  406. }
  407. return err
  408. }
  409. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  410. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  411. if f.entry == nil {
  412. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  413. }
  414. if f.entry == nil {
  415. return 0, err
  416. }
  417. if err != nil {
  418. return 0, err
  419. }
  420. fileSize := int64(filer.FileSize(f.entry))
  421. if fileSize == 0 {
  422. return 0, io.EOF
  423. }
  424. if f.entryViewCache == nil {
  425. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
  426. f.reader = nil
  427. }
  428. if f.reader == nil {
  429. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
  430. f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
  431. }
  432. readSize, err = f.reader.ReadAt(p, f.off)
  433. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  434. f.off += int64(readSize)
  435. if err != nil && err != io.EOF {
  436. glog.Errorf("file read %s: %v", f.name, err)
  437. }
  438. return
  439. }
  440. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  441. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  442. dir, _ := util.FullPath(f.name).DirAndName()
  443. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  444. fi := FileInfo{
  445. size: int64(filer.FileSize(entry)),
  446. name: entry.Name,
  447. mode: os.FileMode(entry.Attributes.FileMode),
  448. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  449. isDirectory: entry.IsDirectory,
  450. }
  451. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  452. fi.name += "/"
  453. }
  454. glog.V(4).Infof("entry: %v", fi.name)
  455. ret = append(ret, &fi)
  456. return nil
  457. })
  458. old := f.off
  459. if old >= int64(len(ret)) {
  460. if count > 0 {
  461. return nil, io.EOF
  462. }
  463. return nil, nil
  464. }
  465. if count > 0 {
  466. f.off += int64(count)
  467. if f.off > int64(len(ret)) {
  468. f.off = int64(len(ret))
  469. }
  470. } else {
  471. f.off = int64(len(ret))
  472. old = 0
  473. }
  474. return ret[old:f.off], nil
  475. }
  476. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  477. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  478. ctx := context.Background()
  479. var err error
  480. switch whence {
  481. case io.SeekStart:
  482. f.off = 0
  483. case io.SeekEnd:
  484. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  485. return 0, err
  486. } else {
  487. f.off = fi.Size()
  488. }
  489. }
  490. f.off += offset
  491. return f.off, err
  492. }
  493. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  494. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  495. ctx := context.Background()
  496. return f.fs.stat(ctx, f.name)
  497. }