You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

571 lines
14 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  18. "github.com/chrislusf/seaweedfs/weed/filer"
  19. "github.com/chrislusf/seaweedfs/weed/glog"
  20. "github.com/chrislusf/seaweedfs/weed/security"
  21. )
  22. type WebDavOption struct {
  23. Filer string
  24. FilerGrpcAddress string
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Uid uint32
  30. Gid uint32
  31. Cipher bool
  32. CacheDir string
  33. CacheSizeMB int64
  34. }
  35. type WebDavServer struct {
  36. option *WebDavOption
  37. secret security.SigningKey
  38. filer *filer.Filer
  39. grpcDialOption grpc.DialOption
  40. Handler *webdav.Handler
  41. }
  42. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  43. fs, _ := NewWebDavFileSystem(option)
  44. ws = &WebDavServer{
  45. option: option,
  46. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  47. Handler: &webdav.Handler{
  48. FileSystem: fs,
  49. LockSystem: webdav.NewMemLS(),
  50. },
  51. }
  52. return ws, nil
  53. }
  54. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  55. type WebDavFileSystem struct {
  56. option *WebDavOption
  57. secret security.SigningKey
  58. filer *filer.Filer
  59. grpcDialOption grpc.DialOption
  60. chunkCache *chunk_cache.TieredChunkCache
  61. signature int32
  62. }
  63. type FileInfo struct {
  64. name string
  65. size int64
  66. mode os.FileMode
  67. modifiledTime time.Time
  68. isDirectory bool
  69. }
  70. func (fi *FileInfo) Name() string { return fi.name }
  71. func (fi *FileInfo) Size() int64 { return fi.size }
  72. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  73. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  74. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  75. func (fi *FileInfo) Sys() interface{} { return nil }
  76. type WebDavFile struct {
  77. fs *WebDavFileSystem
  78. name string
  79. isDirectory bool
  80. off int64
  81. entry *filer_pb.Entry
  82. entryViewCache []filer.VisibleInterval
  83. reader io.ReaderAt
  84. }
  85. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  86. chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024)
  87. return &WebDavFileSystem{
  88. option: option,
  89. chunkCache: chunkCache,
  90. signature: util.RandomInt32(),
  91. }, nil
  92. }
  93. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  94. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  95. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  96. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  97. return fn(client)
  98. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  99. }
  100. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  101. return location.Url
  102. }
  103. func clearName(name string) (string, error) {
  104. slashed := strings.HasSuffix(name, "/")
  105. name = path.Clean(name)
  106. if !strings.HasSuffix(name, "/") && slashed {
  107. name += "/"
  108. }
  109. if !strings.HasPrefix(name, "/") {
  110. return "", os.ErrInvalid
  111. }
  112. return name, nil
  113. }
  114. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  115. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  116. if !strings.HasSuffix(fullDirPath, "/") {
  117. fullDirPath += "/"
  118. }
  119. var err error
  120. if fullDirPath, err = clearName(fullDirPath); err != nil {
  121. return err
  122. }
  123. _, err = fs.stat(ctx, fullDirPath)
  124. if err == nil {
  125. return os.ErrExist
  126. }
  127. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  128. dir, name := util.FullPath(fullDirPath).DirAndName()
  129. request := &filer_pb.CreateEntryRequest{
  130. Directory: dir,
  131. Entry: &filer_pb.Entry{
  132. Name: name,
  133. IsDirectory: true,
  134. Attributes: &filer_pb.FuseAttributes{
  135. Mtime: time.Now().Unix(),
  136. Crtime: time.Now().Unix(),
  137. FileMode: uint32(perm | os.ModeDir),
  138. Uid: fs.option.Uid,
  139. Gid: fs.option.Gid,
  140. },
  141. },
  142. Signatures: []int32{fs.signature},
  143. }
  144. glog.V(1).Infof("mkdir: %v", request)
  145. if err := filer_pb.CreateEntry(client, request); err != nil {
  146. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  147. }
  148. return nil
  149. })
  150. }
  151. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  152. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  153. var err error
  154. if fullFilePath, err = clearName(fullFilePath); err != nil {
  155. return nil, err
  156. }
  157. if flag&os.O_CREATE != 0 {
  158. // file should not have / suffix.
  159. if strings.HasSuffix(fullFilePath, "/") {
  160. return nil, os.ErrInvalid
  161. }
  162. _, err = fs.stat(ctx, fullFilePath)
  163. if err == nil {
  164. if flag&os.O_EXCL != 0 {
  165. return nil, os.ErrExist
  166. }
  167. fs.removeAll(ctx, fullFilePath)
  168. }
  169. dir, name := util.FullPath(fullFilePath).DirAndName()
  170. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  171. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  172. Directory: dir,
  173. Entry: &filer_pb.Entry{
  174. Name: name,
  175. IsDirectory: perm&os.ModeDir > 0,
  176. Attributes: &filer_pb.FuseAttributes{
  177. Mtime: time.Now().Unix(),
  178. Crtime: time.Now().Unix(),
  179. FileMode: uint32(perm),
  180. Uid: fs.option.Uid,
  181. Gid: fs.option.Gid,
  182. Collection: fs.option.Collection,
  183. Replication: "000",
  184. TtlSec: 0,
  185. },
  186. },
  187. Signatures: []int32{fs.signature},
  188. }); err != nil {
  189. return fmt.Errorf("create %s: %v", fullFilePath, err)
  190. }
  191. return nil
  192. })
  193. if err != nil {
  194. return nil, err
  195. }
  196. return &WebDavFile{
  197. fs: fs,
  198. name: fullFilePath,
  199. isDirectory: false,
  200. }, nil
  201. }
  202. fi, err := fs.stat(ctx, fullFilePath)
  203. if err != nil {
  204. return nil, os.ErrNotExist
  205. }
  206. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  207. fullFilePath += "/"
  208. }
  209. return &WebDavFile{
  210. fs: fs,
  211. name: fullFilePath,
  212. isDirectory: false,
  213. }, nil
  214. }
  215. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  216. var err error
  217. if fullFilePath, err = clearName(fullFilePath); err != nil {
  218. return err
  219. }
  220. dir, name := util.FullPath(fullFilePath).DirAndName()
  221. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  222. }
  223. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  224. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  225. return fs.removeAll(ctx, name)
  226. }
  227. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  228. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  229. var err error
  230. if oldName, err = clearName(oldName); err != nil {
  231. return err
  232. }
  233. if newName, err = clearName(newName); err != nil {
  234. return err
  235. }
  236. of, err := fs.stat(ctx, oldName)
  237. if err != nil {
  238. return os.ErrExist
  239. }
  240. if of.IsDir() {
  241. if strings.HasSuffix(oldName, "/") {
  242. oldName = strings.TrimRight(oldName, "/")
  243. }
  244. if strings.HasSuffix(newName, "/") {
  245. newName = strings.TrimRight(newName, "/")
  246. }
  247. }
  248. _, err = fs.stat(ctx, newName)
  249. if err == nil {
  250. return os.ErrExist
  251. }
  252. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  253. newDir, newBaseName := util.FullPath(newName).DirAndName()
  254. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  255. request := &filer_pb.AtomicRenameEntryRequest{
  256. OldDirectory: oldDir,
  257. OldName: oldBaseName,
  258. NewDirectory: newDir,
  259. NewName: newBaseName,
  260. }
  261. _, err := client.AtomicRenameEntry(ctx, request)
  262. if err != nil {
  263. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  264. }
  265. return nil
  266. })
  267. }
  268. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  269. var err error
  270. if fullFilePath, err = clearName(fullFilePath); err != nil {
  271. return nil, err
  272. }
  273. fullpath := util.FullPath(fullFilePath)
  274. var fi FileInfo
  275. entry, err := filer_pb.GetEntry(fs, fullpath)
  276. if entry == nil {
  277. return nil, os.ErrNotExist
  278. }
  279. if err != nil {
  280. return nil, err
  281. }
  282. fi.size = int64(filer.FileSize(entry))
  283. fi.name = string(fullpath)
  284. fi.mode = os.FileMode(entry.Attributes.FileMode)
  285. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  286. fi.isDirectory = entry.IsDirectory
  287. if fi.name == "/" {
  288. fi.modifiledTime = time.Now()
  289. fi.isDirectory = true
  290. }
  291. return &fi, nil
  292. }
  293. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  294. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  295. return fs.stat(ctx, name)
  296. }
  297. func (f *WebDavFile) Write(buf []byte) (int, error) {
  298. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  299. dir, _ := util.FullPath(f.name).DirAndName()
  300. var err error
  301. ctx := context.Background()
  302. if f.entry == nil {
  303. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  304. }
  305. if f.entry == nil {
  306. return 0, err
  307. }
  308. if err != nil {
  309. return 0, err
  310. }
  311. var fileId, host string
  312. var auth security.EncodedJwt
  313. var collection, replication string
  314. if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  315. request := &filer_pb.AssignVolumeRequest{
  316. Count: 1,
  317. Replication: "",
  318. Collection: f.fs.option.Collection,
  319. ParentPath: dir,
  320. }
  321. resp, err := client.AssignVolume(ctx, request)
  322. if err != nil {
  323. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  324. return err
  325. }
  326. if resp.Error != "" {
  327. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  328. }
  329. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  330. collection, replication = resp.Collection, resp.Replication
  331. return nil
  332. }); err != nil {
  333. return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  334. }
  335. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  336. uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
  337. if err != nil {
  338. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
  339. return 0, fmt.Errorf("upload data: %v", err)
  340. }
  341. if uploadResult.Error != "" {
  342. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
  343. return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
  344. }
  345. f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off))
  346. err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  347. f.entry.Attributes.Mtime = time.Now().Unix()
  348. f.entry.Attributes.Collection = collection
  349. f.entry.Attributes.Replication = replication
  350. request := &filer_pb.UpdateEntryRequest{
  351. Directory: dir,
  352. Entry: f.entry,
  353. Signatures: []int32{f.fs.signature},
  354. }
  355. if _, err := client.UpdateEntry(ctx, request); err != nil {
  356. return fmt.Errorf("update %s: %v", f.name, err)
  357. }
  358. return nil
  359. })
  360. if err == nil {
  361. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  362. f.off += int64(len(buf))
  363. }
  364. return len(buf), err
  365. }
  366. func (f *WebDavFile) Close() error {
  367. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  368. if f.entry != nil {
  369. f.entry = nil
  370. f.entryViewCache = nil
  371. }
  372. return nil
  373. }
  374. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  375. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  376. if f.entry == nil {
  377. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  378. }
  379. if f.entry == nil {
  380. return 0, err
  381. }
  382. if err != nil {
  383. return 0, err
  384. }
  385. fileSize := int64(filer.FileSize(f.entry))
  386. if fileSize == 0 {
  387. return 0, io.EOF
  388. }
  389. if f.entryViewCache == nil {
  390. f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
  391. f.reader = nil
  392. }
  393. if f.reader == nil {
  394. chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
  395. f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
  396. }
  397. readSize, err = f.reader.ReadAt(p, f.off)
  398. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  399. f.off += int64(readSize)
  400. if err != nil && err != io.EOF {
  401. glog.Errorf("file read %s: %v", f.name, err)
  402. }
  403. return
  404. }
  405. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  406. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  407. dir, _ := util.FullPath(f.name).DirAndName()
  408. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  409. fi := FileInfo{
  410. size: int64(filer.FileSize(entry)),
  411. name: entry.Name,
  412. mode: os.FileMode(entry.Attributes.FileMode),
  413. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  414. isDirectory: entry.IsDirectory,
  415. }
  416. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  417. fi.name += "/"
  418. }
  419. glog.V(4).Infof("entry: %v", fi.name)
  420. ret = append(ret, &fi)
  421. return nil
  422. })
  423. old := f.off
  424. if old >= int64(len(ret)) {
  425. if count > 0 {
  426. return nil, io.EOF
  427. }
  428. return nil, nil
  429. }
  430. if count > 0 {
  431. f.off += int64(count)
  432. if f.off > int64(len(ret)) {
  433. f.off = int64(len(ret))
  434. }
  435. } else {
  436. f.off = int64(len(ret))
  437. old = 0
  438. }
  439. return ret[old:f.off], nil
  440. }
  441. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  442. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  443. ctx := context.Background()
  444. var err error
  445. switch whence {
  446. case io.SeekStart:
  447. f.off = 0
  448. case io.SeekEnd:
  449. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  450. return 0, err
  451. } else {
  452. f.off = fi.Size()
  453. }
  454. }
  455. f.off += offset
  456. return f.off, err
  457. }
  458. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  459. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  460. ctx := context.Background()
  461. return f.fs.stat(ctx, f.name)
  462. }