You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

587 lines
14 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/chrislusf/seaweedfs/weed/filer2"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/security"
  20. )
  21. type WebDavOption struct {
  22. Filer string
  23. FilerGrpcAddress string
  24. DomainName string
  25. BucketsPath string
  26. GrpcDialOption grpc.DialOption
  27. Collection string
  28. Uid uint32
  29. Gid uint32
  30. Cipher bool
  31. }
  32. type WebDavServer struct {
  33. option *WebDavOption
  34. secret security.SigningKey
  35. filer *filer2.Filer
  36. grpcDialOption grpc.DialOption
  37. Handler *webdav.Handler
  38. }
  39. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  40. fs, _ := NewWebDavFileSystem(option)
  41. ws = &WebDavServer{
  42. option: option,
  43. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  44. Handler: &webdav.Handler{
  45. FileSystem: fs,
  46. LockSystem: webdav.NewMemLS(),
  47. },
  48. }
  49. return ws, nil
  50. }
  51. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  52. type WebDavFileSystem struct {
  53. option *WebDavOption
  54. secret security.SigningKey
  55. filer *filer2.Filer
  56. grpcDialOption grpc.DialOption
  57. }
  58. type FileInfo struct {
  59. name string
  60. size int64
  61. mode os.FileMode
  62. modifiledTime time.Time
  63. isDirectory bool
  64. }
  65. func (fi *FileInfo) Name() string { return fi.name }
  66. func (fi *FileInfo) Size() int64 { return fi.size }
  67. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  68. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  69. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  70. func (fi *FileInfo) Sys() interface{} { return nil }
  71. type WebDavFile struct {
  72. fs *WebDavFileSystem
  73. name string
  74. isDirectory bool
  75. off int64
  76. entry *filer_pb.Entry
  77. entryViewCache []filer2.VisibleInterval
  78. }
  79. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  80. return &WebDavFileSystem{
  81. option: option,
  82. }, nil
  83. }
  84. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  85. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  86. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  87. return fn(client)
  88. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  89. }
  90. func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
  91. return hostAndPort
  92. }
  93. func clearName(name string) (string, error) {
  94. slashed := strings.HasSuffix(name, "/")
  95. name = path.Clean(name)
  96. if !strings.HasSuffix(name, "/") && slashed {
  97. name += "/"
  98. }
  99. if !strings.HasPrefix(name, "/") {
  100. return "", os.ErrInvalid
  101. }
  102. return name, nil
  103. }
  104. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  105. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  106. if !strings.HasSuffix(fullDirPath, "/") {
  107. fullDirPath += "/"
  108. }
  109. var err error
  110. if fullDirPath, err = clearName(fullDirPath); err != nil {
  111. return err
  112. }
  113. _, err = fs.stat(ctx, fullDirPath)
  114. if err == nil {
  115. return os.ErrExist
  116. }
  117. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  118. dir, name := filer2.FullPath(fullDirPath).DirAndName()
  119. request := &filer_pb.CreateEntryRequest{
  120. Directory: dir,
  121. Entry: &filer_pb.Entry{
  122. Name: name,
  123. IsDirectory: true,
  124. Attributes: &filer_pb.FuseAttributes{
  125. Mtime: time.Now().Unix(),
  126. Crtime: time.Now().Unix(),
  127. FileMode: uint32(perm | os.ModeDir),
  128. Uid: fs.option.Uid,
  129. Gid: fs.option.Gid,
  130. },
  131. },
  132. }
  133. glog.V(1).Infof("mkdir: %v", request)
  134. if err := filer_pb.CreateEntry(client, request); err != nil {
  135. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  136. }
  137. return nil
  138. })
  139. }
  140. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  141. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  142. var err error
  143. if fullFilePath, err = clearName(fullFilePath); err != nil {
  144. return nil, err
  145. }
  146. if flag&os.O_CREATE != 0 {
  147. // file should not have / suffix.
  148. if strings.HasSuffix(fullFilePath, "/") {
  149. return nil, os.ErrInvalid
  150. }
  151. _, err = fs.stat(ctx, fullFilePath)
  152. if err == nil {
  153. if flag&os.O_EXCL != 0 {
  154. return nil, os.ErrExist
  155. }
  156. fs.removeAll(ctx, fullFilePath)
  157. }
  158. dir, name := filer2.FullPath(fullFilePath).DirAndName()
  159. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  160. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  161. Directory: dir,
  162. Entry: &filer_pb.Entry{
  163. Name: name,
  164. IsDirectory: perm&os.ModeDir > 0,
  165. Attributes: &filer_pb.FuseAttributes{
  166. Mtime: time.Now().Unix(),
  167. Crtime: time.Now().Unix(),
  168. FileMode: uint32(perm),
  169. Uid: fs.option.Uid,
  170. Gid: fs.option.Gid,
  171. Collection: fs.option.Collection,
  172. Replication: "000",
  173. TtlSec: 0,
  174. },
  175. },
  176. }); err != nil {
  177. return fmt.Errorf("create %s: %v", fullFilePath, err)
  178. }
  179. return nil
  180. })
  181. if err != nil {
  182. return nil, err
  183. }
  184. return &WebDavFile{
  185. fs: fs,
  186. name: fullFilePath,
  187. isDirectory: false,
  188. }, nil
  189. }
  190. fi, err := fs.stat(ctx, fullFilePath)
  191. if err != nil {
  192. return nil, os.ErrNotExist
  193. }
  194. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  195. fullFilePath += "/"
  196. }
  197. return &WebDavFile{
  198. fs: fs,
  199. name: fullFilePath,
  200. isDirectory: false,
  201. }, nil
  202. }
  203. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  204. var err error
  205. if fullFilePath, err = clearName(fullFilePath); err != nil {
  206. return err
  207. }
  208. fi, err := fs.stat(ctx, fullFilePath)
  209. if err != nil {
  210. return err
  211. }
  212. if fi.IsDir() {
  213. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
  214. } else {
  215. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
  216. }
  217. dir, name := filer2.FullPath(fullFilePath).DirAndName()
  218. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  219. request := &filer_pb.DeleteEntryRequest{
  220. Directory: dir,
  221. Name: name,
  222. IsDeleteData: true,
  223. }
  224. glog.V(3).Infof("removing entry: %v", request)
  225. _, err := client.DeleteEntry(ctx, request)
  226. if err != nil {
  227. return fmt.Errorf("remove %s: %v", fullFilePath, err)
  228. }
  229. return nil
  230. })
  231. return err
  232. }
  233. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  234. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  235. return fs.removeAll(ctx, name)
  236. }
  237. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  238. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  239. var err error
  240. if oldName, err = clearName(oldName); err != nil {
  241. return err
  242. }
  243. if newName, err = clearName(newName); err != nil {
  244. return err
  245. }
  246. of, err := fs.stat(ctx, oldName)
  247. if err != nil {
  248. return os.ErrExist
  249. }
  250. if of.IsDir() {
  251. if strings.HasSuffix(oldName, "/") {
  252. oldName = strings.TrimRight(oldName, "/")
  253. }
  254. if strings.HasSuffix(newName, "/") {
  255. newName = strings.TrimRight(newName, "/")
  256. }
  257. }
  258. _, err = fs.stat(ctx, newName)
  259. if err == nil {
  260. return os.ErrExist
  261. }
  262. oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
  263. newDir, newBaseName := filer2.FullPath(newName).DirAndName()
  264. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  265. request := &filer_pb.AtomicRenameEntryRequest{
  266. OldDirectory: oldDir,
  267. OldName: oldBaseName,
  268. NewDirectory: newDir,
  269. NewName: newBaseName,
  270. }
  271. _, err := client.AtomicRenameEntry(ctx, request)
  272. if err != nil {
  273. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  274. }
  275. return nil
  276. })
  277. }
  278. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  279. var err error
  280. if fullFilePath, err = clearName(fullFilePath); err != nil {
  281. return nil, err
  282. }
  283. fullpath := filer2.FullPath(fullFilePath)
  284. var fi FileInfo
  285. entry, err := filer2.GetEntry(fs, fullpath)
  286. if entry == nil {
  287. return nil, os.ErrNotExist
  288. }
  289. if err != nil {
  290. return nil, err
  291. }
  292. fi.size = int64(filer2.TotalSize(entry.GetChunks()))
  293. fi.name = string(fullpath)
  294. fi.mode = os.FileMode(entry.Attributes.FileMode)
  295. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  296. fi.isDirectory = entry.IsDirectory
  297. if fi.name == "/" {
  298. fi.modifiledTime = time.Now()
  299. fi.isDirectory = true
  300. }
  301. return &fi, nil
  302. }
  303. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  304. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  305. return fs.stat(ctx, name)
  306. }
  307. func (f *WebDavFile) Write(buf []byte) (int, error) {
  308. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  309. dir, _ := filer2.FullPath(f.name).DirAndName()
  310. var err error
  311. ctx := context.Background()
  312. if f.entry == nil {
  313. f.entry, err = filer2.GetEntry(f.fs, filer2.FullPath(f.name))
  314. }
  315. if f.entry == nil {
  316. return 0, err
  317. }
  318. if err != nil {
  319. return 0, err
  320. }
  321. var fileId, host string
  322. var auth security.EncodedJwt
  323. var collection, replication string
  324. if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  325. request := &filer_pb.AssignVolumeRequest{
  326. Count: 1,
  327. Replication: "",
  328. Collection: f.fs.option.Collection,
  329. ParentPath: dir,
  330. }
  331. resp, err := client.AssignVolume(ctx, request)
  332. if err != nil {
  333. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  334. return err
  335. }
  336. if resp.Error != "" {
  337. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  338. }
  339. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  340. collection, replication = resp.Collection, resp.Replication
  341. return nil
  342. }); err != nil {
  343. return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  344. }
  345. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  346. bufReader := bytes.NewReader(buf)
  347. uploadResult, err := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, bufReader, false, "", nil, auth)
  348. if err != nil {
  349. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
  350. return 0, fmt.Errorf("upload data: %v", err)
  351. }
  352. if uploadResult.Error != "" {
  353. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
  354. return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
  355. }
  356. chunk := &filer_pb.FileChunk{
  357. FileId: fileId,
  358. Offset: f.off,
  359. Size: uint64(len(buf)),
  360. Mtime: time.Now().UnixNano(),
  361. ETag: uploadResult.ETag,
  362. CipherKey: uploadResult.CipherKey,
  363. }
  364. f.entry.Chunks = append(f.entry.Chunks, chunk)
  365. err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  366. f.entry.Attributes.Mtime = time.Now().Unix()
  367. f.entry.Attributes.Collection = collection
  368. f.entry.Attributes.Replication = replication
  369. request := &filer_pb.UpdateEntryRequest{
  370. Directory: dir,
  371. Entry: f.entry,
  372. }
  373. if _, err := client.UpdateEntry(ctx, request); err != nil {
  374. return fmt.Errorf("update %s: %v", f.name, err)
  375. }
  376. return nil
  377. })
  378. if err == nil {
  379. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  380. f.off += int64(len(buf))
  381. }
  382. return len(buf), err
  383. }
  384. func (f *WebDavFile) Close() error {
  385. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  386. if f.entry != nil {
  387. f.entry = nil
  388. f.entryViewCache = nil
  389. }
  390. return nil
  391. }
  392. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  393. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  394. if f.entry == nil {
  395. f.entry, err = filer2.GetEntry(f.fs, filer2.FullPath(f.name))
  396. }
  397. if f.entry == nil {
  398. return 0, err
  399. }
  400. if err != nil {
  401. return 0, err
  402. }
  403. if len(f.entry.Chunks) == 0 {
  404. return 0, io.EOF
  405. }
  406. if f.entryViewCache == nil {
  407. f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
  408. }
  409. chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p))
  410. totalRead, err := filer2.ReadIntoBuffer(f.fs, filer2.FullPath(f.name), p, chunkViews, f.off)
  411. if err != nil {
  412. return 0, err
  413. }
  414. readSize = int(totalRead)
  415. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead)
  416. f.off += totalRead
  417. if readSize == 0 {
  418. return 0, io.EOF
  419. }
  420. return
  421. }
  422. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  423. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  424. dir, _ := filer2.FullPath(f.name).DirAndName()
  425. err = filer2.ReadDirAllEntries(f.fs, filer2.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
  426. fi := FileInfo{
  427. size: int64(filer2.TotalSize(entry.GetChunks())),
  428. name: entry.Name,
  429. mode: os.FileMode(entry.Attributes.FileMode),
  430. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  431. isDirectory: entry.IsDirectory,
  432. }
  433. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  434. fi.name += "/"
  435. }
  436. glog.V(4).Infof("entry: %v", fi.name)
  437. ret = append(ret, &fi)
  438. })
  439. old := f.off
  440. if old >= int64(len(ret)) {
  441. if count > 0 {
  442. return nil, io.EOF
  443. }
  444. return nil, nil
  445. }
  446. if count > 0 {
  447. f.off += int64(count)
  448. if f.off > int64(len(ret)) {
  449. f.off = int64(len(ret))
  450. }
  451. } else {
  452. f.off = int64(len(ret))
  453. old = 0
  454. }
  455. return ret[old:f.off], nil
  456. }
  457. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  458. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  459. ctx := context.Background()
  460. var err error
  461. switch whence {
  462. case 0:
  463. f.off = 0
  464. case 2:
  465. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  466. return 0, err
  467. } else {
  468. f.off = fi.Size()
  469. }
  470. }
  471. f.off += offset
  472. return f.off, err
  473. }
  474. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  475. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  476. ctx := context.Background()
  477. return f.fs.stat(ctx, f.name)
  478. }