You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

613 lines
15 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "path"
  10. "strings"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
  13. "golang.org/x/net/webdav"
  14. "google.golang.org/grpc"
  15. "github.com/chrislusf/seaweedfs/weed/operation"
  16. "github.com/chrislusf/seaweedfs/weed/pb"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  20. "github.com/chrislusf/seaweedfs/weed/filer"
  21. "github.com/chrislusf/seaweedfs/weed/glog"
  22. "github.com/chrislusf/seaweedfs/weed/security"
  23. )
  24. type WebDavOption struct {
  25. Filer string
  26. FilerGrpcAddress string
  27. DomainName string
  28. BucketsPath string
  29. GrpcDialOption grpc.DialOption
  30. Collection string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. }
  37. type WebDavServer struct {
  38. option *WebDavOption
  39. secret security.SigningKey
  40. filer *filer.Filer
  41. grpcDialOption grpc.DialOption
  42. Handler *webdav.Handler
  43. }
  44. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  45. fs, _ := NewWebDavFileSystem(option)
  46. ws = &WebDavServer{
  47. option: option,
  48. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  49. Handler: &webdav.Handler{
  50. FileSystem: fs,
  51. LockSystem: webdav.NewMemLS(),
  52. },
  53. }
  54. return ws, nil
  55. }
  56. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  57. type WebDavFileSystem struct {
  58. option *WebDavOption
  59. secret security.SigningKey
  60. filer *filer.Filer
  61. grpcDialOption grpc.DialOption
  62. chunkCache *chunk_cache.TieredChunkCache
  63. signature int32
  64. }
  65. type FileInfo struct {
  66. name string
  67. size int64
  68. mode os.FileMode
  69. modifiledTime time.Time
  70. isDirectory bool
  71. }
  72. func (fi *FileInfo) Name() string { return fi.name }
  73. func (fi *FileInfo) Size() int64 { return fi.size }
  74. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  75. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  76. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  77. func (fi *FileInfo) Sys() interface{} { return nil }
  78. type WebDavFile struct {
  79. fs *WebDavFileSystem
  80. name string
  81. isDirectory bool
  82. off int64
  83. entry *filer_pb.Entry
  84. entryViewCache []filer.VisibleInterval
  85. reader io.ReaderAt
  86. bufWriter *buffered_writer.BufferedWriteCloser
  87. collection string
  88. replication string
  89. }
  90. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  91. chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024)
  92. return &WebDavFileSystem{
  93. option: option,
  94. chunkCache: chunkCache,
  95. signature: util.RandomInt32(),
  96. }, nil
  97. }
  98. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  99. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  100. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  101. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  102. return fn(client)
  103. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  104. }
  105. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  106. return location.Url
  107. }
  108. func clearName(name string) (string, error) {
  109. slashed := strings.HasSuffix(name, "/")
  110. name = path.Clean(name)
  111. if !strings.HasSuffix(name, "/") && slashed {
  112. name += "/"
  113. }
  114. if !strings.HasPrefix(name, "/") {
  115. return "", os.ErrInvalid
  116. }
  117. return name, nil
  118. }
  119. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  120. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  121. if !strings.HasSuffix(fullDirPath, "/") {
  122. fullDirPath += "/"
  123. }
  124. var err error
  125. if fullDirPath, err = clearName(fullDirPath); err != nil {
  126. return err
  127. }
  128. _, err = fs.stat(ctx, fullDirPath)
  129. if err == nil {
  130. return os.ErrExist
  131. }
  132. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  133. dir, name := util.FullPath(fullDirPath).DirAndName()
  134. request := &filer_pb.CreateEntryRequest{
  135. Directory: dir,
  136. Entry: &filer_pb.Entry{
  137. Name: name,
  138. IsDirectory: true,
  139. Attributes: &filer_pb.FuseAttributes{
  140. Mtime: time.Now().Unix(),
  141. Crtime: time.Now().Unix(),
  142. FileMode: uint32(perm | os.ModeDir),
  143. Uid: fs.option.Uid,
  144. Gid: fs.option.Gid,
  145. },
  146. },
  147. Signatures: []int32{fs.signature},
  148. }
  149. glog.V(1).Infof("mkdir: %v", request)
  150. if err := filer_pb.CreateEntry(client, request); err != nil {
  151. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  152. }
  153. return nil
  154. })
  155. }
  156. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  157. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  158. var err error
  159. if fullFilePath, err = clearName(fullFilePath); err != nil {
  160. return nil, err
  161. }
  162. if flag&os.O_CREATE != 0 {
  163. // file should not have / suffix.
  164. if strings.HasSuffix(fullFilePath, "/") {
  165. return nil, os.ErrInvalid
  166. }
  167. _, err = fs.stat(ctx, fullFilePath)
  168. if err == nil {
  169. if flag&os.O_EXCL != 0 {
  170. return nil, os.ErrExist
  171. }
  172. fs.removeAll(ctx, fullFilePath)
  173. }
  174. dir, name := util.FullPath(fullFilePath).DirAndName()
  175. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  176. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  177. Directory: dir,
  178. Entry: &filer_pb.Entry{
  179. Name: name,
  180. IsDirectory: perm&os.ModeDir > 0,
  181. Attributes: &filer_pb.FuseAttributes{
  182. Mtime: time.Now().Unix(),
  183. Crtime: time.Now().Unix(),
  184. FileMode: uint32(perm),
  185. Uid: fs.option.Uid,
  186. Gid: fs.option.Gid,
  187. Collection: fs.option.Collection,
  188. Replication: "000",
  189. TtlSec: 0,
  190. },
  191. },
  192. Signatures: []int32{fs.signature},
  193. }); err != nil {
  194. return fmt.Errorf("create %s: %v", fullFilePath, err)
  195. }
  196. return nil
  197. })
  198. if err != nil {
  199. return nil, err
  200. }
  201. return &WebDavFile{
  202. fs: fs,
  203. name: fullFilePath,
  204. isDirectory: false,
  205. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  206. }, nil
  207. }
  208. fi, err := fs.stat(ctx, fullFilePath)
  209. if err != nil {
  210. return nil, os.ErrNotExist
  211. }
  212. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  213. fullFilePath += "/"
  214. }
  215. return &WebDavFile{
  216. fs: fs,
  217. name: fullFilePath,
  218. isDirectory: false,
  219. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  220. }, nil
  221. }
  222. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  223. var err error
  224. if fullFilePath, err = clearName(fullFilePath); err != nil {
  225. return err
  226. }
  227. dir, name := util.FullPath(fullFilePath).DirAndName()
  228. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  229. }
  230. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  231. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  232. return fs.removeAll(ctx, name)
  233. }
  234. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  235. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  236. var err error
  237. if oldName, err = clearName(oldName); err != nil {
  238. return err
  239. }
  240. if newName, err = clearName(newName); err != nil {
  241. return err
  242. }
  243. of, err := fs.stat(ctx, oldName)
  244. if err != nil {
  245. return os.ErrExist
  246. }
  247. if of.IsDir() {
  248. if strings.HasSuffix(oldName, "/") {
  249. oldName = strings.TrimRight(oldName, "/")
  250. }
  251. if strings.HasSuffix(newName, "/") {
  252. newName = strings.TrimRight(newName, "/")
  253. }
  254. }
  255. _, err = fs.stat(ctx, newName)
  256. if err == nil {
  257. return os.ErrExist
  258. }
  259. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  260. newDir, newBaseName := util.FullPath(newName).DirAndName()
  261. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  262. request := &filer_pb.AtomicRenameEntryRequest{
  263. OldDirectory: oldDir,
  264. OldName: oldBaseName,
  265. NewDirectory: newDir,
  266. NewName: newBaseName,
  267. }
  268. _, err := client.AtomicRenameEntry(ctx, request)
  269. if err != nil {
  270. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  271. }
  272. return nil
  273. })
  274. }
  275. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  276. var err error
  277. if fullFilePath, err = clearName(fullFilePath); err != nil {
  278. return nil, err
  279. }
  280. fullpath := util.FullPath(fullFilePath)
  281. var fi FileInfo
  282. entry, err := filer_pb.GetEntry(fs, fullpath)
  283. if entry == nil {
  284. return nil, os.ErrNotExist
  285. }
  286. if err != nil {
  287. return nil, err
  288. }
  289. fi.size = int64(filer.FileSize(entry))
  290. fi.name = string(fullpath)
  291. fi.mode = os.FileMode(entry.Attributes.FileMode)
  292. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  293. fi.isDirectory = entry.IsDirectory
  294. if fi.name == "/" {
  295. fi.modifiledTime = time.Now()
  296. fi.isDirectory = true
  297. }
  298. return &fi, nil
  299. }
  300. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  301. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  302. return fs.stat(ctx, name)
  303. }
  304. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
  305. var fileId, host string
  306. var auth security.EncodedJwt
  307. if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  308. ctx := context.Background()
  309. request := &filer_pb.AssignVolumeRequest{
  310. Count: 1,
  311. Replication: "",
  312. Collection: f.fs.option.Collection,
  313. Path: name,
  314. }
  315. resp, err := client.AssignVolume(ctx, request)
  316. if err != nil {
  317. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  318. return err
  319. }
  320. if resp.Error != "" {
  321. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  322. }
  323. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  324. f.collection, f.replication = resp.Collection, resp.Replication
  325. return nil
  326. }); flushErr != nil {
  327. return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
  328. }
  329. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  330. uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
  331. if flushErr != nil {
  332. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
  333. return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
  334. }
  335. if uploadResult.Error != "" {
  336. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
  337. return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
  338. }
  339. return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
  340. }
  341. func (f *WebDavFile) Write(buf []byte) (int, error) {
  342. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  343. dir, _ := util.FullPath(f.name).DirAndName()
  344. var getErr error
  345. ctx := context.Background()
  346. if f.entry == nil {
  347. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  348. }
  349. if f.entry == nil {
  350. return 0, getErr
  351. }
  352. if getErr != nil {
  353. return 0, getErr
  354. }
  355. if f.bufWriter.FlushFunc == nil {
  356. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  357. var chunk *filer_pb.FileChunk
  358. chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
  359. if flushErr != nil {
  360. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  361. }
  362. f.entry.Content = nil
  363. f.entry.Chunks = append(f.entry.Chunks, chunk)
  364. return flushErr
  365. }
  366. f.bufWriter.CloseFunc = func() error {
  367. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
  368. if manifestErr != nil {
  369. // not good, but should be ok
  370. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  371. } else {
  372. f.entry.Chunks = manifestedChunks
  373. }
  374. flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  375. f.entry.Attributes.Mtime = time.Now().Unix()
  376. f.entry.Attributes.Collection = f.collection
  377. f.entry.Attributes.Replication = f.replication
  378. request := &filer_pb.UpdateEntryRequest{
  379. Directory: dir,
  380. Entry: f.entry,
  381. Signatures: []int32{f.fs.signature},
  382. }
  383. if _, err := client.UpdateEntry(ctx, request); err != nil {
  384. return fmt.Errorf("update %s: %v", f.name, err)
  385. }
  386. return nil
  387. })
  388. return flushErr
  389. }
  390. }
  391. written, err := f.bufWriter.Write(buf)
  392. if err == nil {
  393. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  394. f.off += int64(written)
  395. }
  396. return written, err
  397. }
  398. func (f *WebDavFile) Close() error {
  399. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  400. err := f.bufWriter.Close()
  401. if f.entry != nil {
  402. f.entry = nil
  403. f.entryViewCache = nil
  404. }
  405. return err
  406. }
  407. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  408. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  409. if f.entry == nil {
  410. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  411. }
  412. if f.entry == nil {
  413. return 0, err
  414. }
  415. if err != nil {
  416. return 0, err
  417. }
  418. fileSize := int64(filer.FileSize(f.entry))
  419. if fileSize == 0 {
  420. return 0, io.EOF
  421. }
  422. if f.entryViewCache == nil {
  423. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
  424. f.reader = nil
  425. }
  426. if f.reader == nil {
  427. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
  428. f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
  429. }
  430. readSize, err = f.reader.ReadAt(p, f.off)
  431. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  432. f.off += int64(readSize)
  433. if err != nil && err != io.EOF {
  434. glog.Errorf("file read %s: %v", f.name, err)
  435. }
  436. return
  437. }
  438. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  439. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  440. dir, _ := util.FullPath(f.name).DirAndName()
  441. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  442. fi := FileInfo{
  443. size: int64(filer.FileSize(entry)),
  444. name: entry.Name,
  445. mode: os.FileMode(entry.Attributes.FileMode),
  446. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  447. isDirectory: entry.IsDirectory,
  448. }
  449. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  450. fi.name += "/"
  451. }
  452. glog.V(4).Infof("entry: %v", fi.name)
  453. ret = append(ret, &fi)
  454. return nil
  455. })
  456. old := f.off
  457. if old >= int64(len(ret)) {
  458. if count > 0 {
  459. return nil, io.EOF
  460. }
  461. return nil, nil
  462. }
  463. if count > 0 {
  464. f.off += int64(count)
  465. if f.off > int64(len(ret)) {
  466. f.off = int64(len(ret))
  467. }
  468. } else {
  469. f.off = int64(len(ret))
  470. old = 0
  471. }
  472. return ret[old:f.off], nil
  473. }
  474. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  475. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  476. ctx := context.Background()
  477. var err error
  478. switch whence {
  479. case io.SeekStart:
  480. f.off = 0
  481. case io.SeekEnd:
  482. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  483. return 0, err
  484. } else {
  485. f.off = fi.Size()
  486. }
  487. }
  488. f.off += offset
  489. return f.off, err
  490. }
  491. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  492. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  493. ctx := context.Background()
  494. return f.fs.stat(ctx, f.name)
  495. }