You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

590 lines
14 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/chrislusf/seaweedfs/weed/filer2"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/security"
  20. )
  21. type WebDavOption struct {
  22. Filer string
  23. FilerGrpcAddress string
  24. DomainName string
  25. BucketsPath string
  26. GrpcDialOption grpc.DialOption
  27. Collection string
  28. Uid uint32
  29. Gid uint32
  30. Cipher bool
  31. }
  32. type WebDavServer struct {
  33. option *WebDavOption
  34. secret security.SigningKey
  35. filer *filer2.Filer
  36. grpcDialOption grpc.DialOption
  37. Handler *webdav.Handler
  38. }
  39. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  40. fs, _ := NewWebDavFileSystem(option)
  41. ws = &WebDavServer{
  42. option: option,
  43. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  44. Handler: &webdav.Handler{
  45. FileSystem: fs,
  46. LockSystem: webdav.NewMemLS(),
  47. },
  48. }
  49. return ws, nil
  50. }
  51. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  52. type WebDavFileSystem struct {
  53. option *WebDavOption
  54. secret security.SigningKey
  55. filer *filer2.Filer
  56. grpcDialOption grpc.DialOption
  57. }
  58. type FileInfo struct {
  59. name string
  60. size int64
  61. mode os.FileMode
  62. modifiledTime time.Time
  63. isDirectory bool
  64. }
  65. func (fi *FileInfo) Name() string { return fi.name }
  66. func (fi *FileInfo) Size() int64 { return fi.size }
  67. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  68. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  69. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  70. func (fi *FileInfo) Sys() interface{} { return nil }
  71. type WebDavFile struct {
  72. fs *WebDavFileSystem
  73. name string
  74. isDirectory bool
  75. off int64
  76. entry *filer_pb.Entry
  77. entryViewCache []filer2.VisibleInterval
  78. reader io.ReadSeeker
  79. }
  80. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  81. return &WebDavFileSystem{
  82. option: option,
  83. }, nil
  84. }
  85. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  86. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  87. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  88. return fn(client)
  89. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  90. }
  91. func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
  92. return hostAndPort
  93. }
  94. func clearName(name string) (string, error) {
  95. slashed := strings.HasSuffix(name, "/")
  96. name = path.Clean(name)
  97. if !strings.HasSuffix(name, "/") && slashed {
  98. name += "/"
  99. }
  100. if !strings.HasPrefix(name, "/") {
  101. return "", os.ErrInvalid
  102. }
  103. return name, nil
  104. }
  105. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  106. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  107. if !strings.HasSuffix(fullDirPath, "/") {
  108. fullDirPath += "/"
  109. }
  110. var err error
  111. if fullDirPath, err = clearName(fullDirPath); err != nil {
  112. return err
  113. }
  114. _, err = fs.stat(ctx, fullDirPath)
  115. if err == nil {
  116. return os.ErrExist
  117. }
  118. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  119. dir, name := util.FullPath(fullDirPath).DirAndName()
  120. request := &filer_pb.CreateEntryRequest{
  121. Directory: dir,
  122. Entry: &filer_pb.Entry{
  123. Name: name,
  124. IsDirectory: true,
  125. Attributes: &filer_pb.FuseAttributes{
  126. Mtime: time.Now().Unix(),
  127. Crtime: time.Now().Unix(),
  128. FileMode: uint32(perm | os.ModeDir),
  129. Uid: fs.option.Uid,
  130. Gid: fs.option.Gid,
  131. },
  132. },
  133. }
  134. glog.V(1).Infof("mkdir: %v", request)
  135. if err := filer_pb.CreateEntry(client, request); err != nil {
  136. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  137. }
  138. return nil
  139. })
  140. }
  141. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  142. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  143. var err error
  144. if fullFilePath, err = clearName(fullFilePath); err != nil {
  145. return nil, err
  146. }
  147. if flag&os.O_CREATE != 0 {
  148. // file should not have / suffix.
  149. if strings.HasSuffix(fullFilePath, "/") {
  150. return nil, os.ErrInvalid
  151. }
  152. _, err = fs.stat(ctx, fullFilePath)
  153. if err == nil {
  154. if flag&os.O_EXCL != 0 {
  155. return nil, os.ErrExist
  156. }
  157. fs.removeAll(ctx, fullFilePath)
  158. }
  159. dir, name := util.FullPath(fullFilePath).DirAndName()
  160. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  161. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  162. Directory: dir,
  163. Entry: &filer_pb.Entry{
  164. Name: name,
  165. IsDirectory: perm&os.ModeDir > 0,
  166. Attributes: &filer_pb.FuseAttributes{
  167. Mtime: time.Now().Unix(),
  168. Crtime: time.Now().Unix(),
  169. FileMode: uint32(perm),
  170. Uid: fs.option.Uid,
  171. Gid: fs.option.Gid,
  172. Collection: fs.option.Collection,
  173. Replication: "000",
  174. TtlSec: 0,
  175. },
  176. },
  177. }); err != nil {
  178. return fmt.Errorf("create %s: %v", fullFilePath, err)
  179. }
  180. return nil
  181. })
  182. if err != nil {
  183. return nil, err
  184. }
  185. return &WebDavFile{
  186. fs: fs,
  187. name: fullFilePath,
  188. isDirectory: false,
  189. }, nil
  190. }
  191. fi, err := fs.stat(ctx, fullFilePath)
  192. if err != nil {
  193. return nil, os.ErrNotExist
  194. }
  195. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  196. fullFilePath += "/"
  197. }
  198. return &WebDavFile{
  199. fs: fs,
  200. name: fullFilePath,
  201. isDirectory: false,
  202. }, nil
  203. }
  204. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  205. var err error
  206. if fullFilePath, err = clearName(fullFilePath); err != nil {
  207. return err
  208. }
  209. fi, err := fs.stat(ctx, fullFilePath)
  210. if err != nil {
  211. return err
  212. }
  213. if fi.IsDir() {
  214. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
  215. } else {
  216. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
  217. }
  218. dir, name := util.FullPath(fullFilePath).DirAndName()
  219. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  220. request := &filer_pb.DeleteEntryRequest{
  221. Directory: dir,
  222. Name: name,
  223. IsDeleteData: true,
  224. }
  225. glog.V(3).Infof("removing entry: %v", request)
  226. _, err := client.DeleteEntry(ctx, request)
  227. if err != nil {
  228. return fmt.Errorf("remove %s: %v", fullFilePath, err)
  229. }
  230. return nil
  231. })
  232. return err
  233. }
  234. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  235. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  236. return fs.removeAll(ctx, name)
  237. }
  238. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  239. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  240. var err error
  241. if oldName, err = clearName(oldName); err != nil {
  242. return err
  243. }
  244. if newName, err = clearName(newName); err != nil {
  245. return err
  246. }
  247. of, err := fs.stat(ctx, oldName)
  248. if err != nil {
  249. return os.ErrExist
  250. }
  251. if of.IsDir() {
  252. if strings.HasSuffix(oldName, "/") {
  253. oldName = strings.TrimRight(oldName, "/")
  254. }
  255. if strings.HasSuffix(newName, "/") {
  256. newName = strings.TrimRight(newName, "/")
  257. }
  258. }
  259. _, err = fs.stat(ctx, newName)
  260. if err == nil {
  261. return os.ErrExist
  262. }
  263. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  264. newDir, newBaseName := util.FullPath(newName).DirAndName()
  265. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  266. request := &filer_pb.AtomicRenameEntryRequest{
  267. OldDirectory: oldDir,
  268. OldName: oldBaseName,
  269. NewDirectory: newDir,
  270. NewName: newBaseName,
  271. }
  272. _, err := client.AtomicRenameEntry(ctx, request)
  273. if err != nil {
  274. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  275. }
  276. return nil
  277. })
  278. }
  279. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  280. var err error
  281. if fullFilePath, err = clearName(fullFilePath); err != nil {
  282. return nil, err
  283. }
  284. fullpath := util.FullPath(fullFilePath)
  285. var fi FileInfo
  286. entry, err := filer_pb.GetEntry(fs, fullpath)
  287. if entry == nil {
  288. return nil, os.ErrNotExist
  289. }
  290. if err != nil {
  291. return nil, err
  292. }
  293. fi.size = int64(filer2.TotalSize(entry.GetChunks()))
  294. fi.name = string(fullpath)
  295. fi.mode = os.FileMode(entry.Attributes.FileMode)
  296. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  297. fi.isDirectory = entry.IsDirectory
  298. if fi.name == "/" {
  299. fi.modifiledTime = time.Now()
  300. fi.isDirectory = true
  301. }
  302. return &fi, nil
  303. }
  304. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  305. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  306. return fs.stat(ctx, name)
  307. }
  308. func (f *WebDavFile) Write(buf []byte) (int, error) {
  309. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  310. dir, _ := util.FullPath(f.name).DirAndName()
  311. var err error
  312. ctx := context.Background()
  313. if f.entry == nil {
  314. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  315. }
  316. if f.entry == nil {
  317. return 0, err
  318. }
  319. if err != nil {
  320. return 0, err
  321. }
  322. var fileId, host string
  323. var auth security.EncodedJwt
  324. var collection, replication string
  325. if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  326. request := &filer_pb.AssignVolumeRequest{
  327. Count: 1,
  328. Replication: "",
  329. Collection: f.fs.option.Collection,
  330. ParentPath: dir,
  331. }
  332. resp, err := client.AssignVolume(ctx, request)
  333. if err != nil {
  334. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  335. return err
  336. }
  337. if resp.Error != "" {
  338. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  339. }
  340. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  341. collection, replication = resp.Collection, resp.Replication
  342. return nil
  343. }); err != nil {
  344. return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  345. }
  346. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  347. uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
  348. if err != nil {
  349. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
  350. return 0, fmt.Errorf("upload data: %v", err)
  351. }
  352. if uploadResult.Error != "" {
  353. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
  354. return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
  355. }
  356. chunk := &filer_pb.FileChunk{
  357. FileId: fileId,
  358. Offset: f.off,
  359. Size: uint64(len(buf)),
  360. Mtime: time.Now().UnixNano(),
  361. ETag: uploadResult.ETag,
  362. CipherKey: uploadResult.CipherKey,
  363. IsGzipped: uploadResult.Gzip > 0,
  364. }
  365. f.entry.Chunks = append(f.entry.Chunks, chunk)
  366. err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  367. f.entry.Attributes.Mtime = time.Now().Unix()
  368. f.entry.Attributes.Collection = collection
  369. f.entry.Attributes.Replication = replication
  370. request := &filer_pb.UpdateEntryRequest{
  371. Directory: dir,
  372. Entry: f.entry,
  373. }
  374. if _, err := client.UpdateEntry(ctx, request); err != nil {
  375. return fmt.Errorf("update %s: %v", f.name, err)
  376. }
  377. return nil
  378. })
  379. if err == nil {
  380. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  381. f.off += int64(len(buf))
  382. }
  383. return len(buf), err
  384. }
  385. func (f *WebDavFile) Close() error {
  386. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  387. if f.entry != nil {
  388. f.entry = nil
  389. f.entryViewCache = nil
  390. }
  391. return nil
  392. }
  393. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  394. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  395. if f.entry == nil {
  396. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  397. }
  398. if f.entry == nil {
  399. return 0, err
  400. }
  401. if err != nil {
  402. return 0, err
  403. }
  404. if len(f.entry.Chunks) == 0 {
  405. return 0, io.EOF
  406. }
  407. if f.entryViewCache == nil {
  408. f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
  409. f.reader = nil
  410. }
  411. if f.reader == nil {
  412. chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
  413. f.reader = filer2.NewChunkStreamReaderFromClient(f.fs, chunkViews)
  414. }
  415. f.reader.Seek(f.off, io.SeekStart)
  416. readSize, err = f.reader.Read(p)
  417. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  418. f.off += int64(readSize)
  419. if err != nil {
  420. glog.Errorf("file read %s: %v", f.name, err)
  421. }
  422. return
  423. }
  424. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  425. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  426. dir, _ := util.FullPath(f.name).DirAndName()
  427. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
  428. fi := FileInfo{
  429. size: int64(filer2.TotalSize(entry.GetChunks())),
  430. name: entry.Name,
  431. mode: os.FileMode(entry.Attributes.FileMode),
  432. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  433. isDirectory: entry.IsDirectory,
  434. }
  435. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  436. fi.name += "/"
  437. }
  438. glog.V(4).Infof("entry: %v", fi.name)
  439. ret = append(ret, &fi)
  440. })
  441. old := f.off
  442. if old >= int64(len(ret)) {
  443. if count > 0 {
  444. return nil, io.EOF
  445. }
  446. return nil, nil
  447. }
  448. if count > 0 {
  449. f.off += int64(count)
  450. if f.off > int64(len(ret)) {
  451. f.off = int64(len(ret))
  452. }
  453. } else {
  454. f.off = int64(len(ret))
  455. old = 0
  456. }
  457. return ret[old:f.off], nil
  458. }
  459. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  460. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  461. ctx := context.Background()
  462. var err error
  463. switch whence {
  464. case 0:
  465. f.off = 0
  466. case 2:
  467. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  468. return 0, err
  469. } else {
  470. f.off = fi.Size()
  471. }
  472. }
  473. f.off += offset
  474. return f.off, err
  475. }
  476. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  477. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  478. ctx := context.Background()
  479. return f.fs.stat(ctx, f.name)
  480. }