You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

574 lines
14 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer2"
  17. "github.com/chrislusf/seaweedfs/weed/glog"
  18. "github.com/chrislusf/seaweedfs/weed/security"
  19. )
  20. type WebDavOption struct {
  21. Filer string
  22. FilerGrpcAddress string
  23. DomainName string
  24. BucketsPath string
  25. GrpcDialOption grpc.DialOption
  26. Collection string
  27. Uid uint32
  28. Gid uint32
  29. }
  30. type WebDavServer struct {
  31. option *WebDavOption
  32. secret security.SigningKey
  33. filer *filer2.Filer
  34. grpcDialOption grpc.DialOption
  35. Handler *webdav.Handler
  36. }
  37. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  38. fs, _ := NewWebDavFileSystem(option)
  39. ws = &WebDavServer{
  40. option: option,
  41. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  42. Handler: &webdav.Handler{
  43. FileSystem: fs,
  44. LockSystem: webdav.NewMemLS(),
  45. },
  46. }
  47. return ws, nil
  48. }
  49. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  50. type WebDavFileSystem struct {
  51. option *WebDavOption
  52. secret security.SigningKey
  53. filer *filer2.Filer
  54. grpcDialOption grpc.DialOption
  55. }
  56. type FileInfo struct {
  57. name string
  58. size int64
  59. mode os.FileMode
  60. modifiledTime time.Time
  61. isDirectory bool
  62. }
  63. func (fi *FileInfo) Name() string { return fi.name }
  64. func (fi *FileInfo) Size() int64 { return fi.size }
  65. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  66. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  67. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  68. func (fi *FileInfo) Sys() interface{} { return nil }
  69. type WebDavFile struct {
  70. fs *WebDavFileSystem
  71. name string
  72. isDirectory bool
  73. off int64
  74. entry *filer_pb.Entry
  75. entryViewCache []filer2.VisibleInterval
  76. }
  77. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  78. return &WebDavFileSystem{
  79. option: option,
  80. }, nil
  81. }
  82. func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
  83. return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
  84. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  85. return fn(ctx2, client)
  86. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  87. }
  88. func clearName(name string) (string, error) {
  89. slashed := strings.HasSuffix(name, "/")
  90. name = path.Clean(name)
  91. if !strings.HasSuffix(name, "/") && slashed {
  92. name += "/"
  93. }
  94. if !strings.HasPrefix(name, "/") {
  95. return "", os.ErrInvalid
  96. }
  97. return name, nil
  98. }
  99. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  100. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  101. if !strings.HasSuffix(fullDirPath, "/") {
  102. fullDirPath += "/"
  103. }
  104. var err error
  105. if fullDirPath, err = clearName(fullDirPath); err != nil {
  106. return err
  107. }
  108. _, err = fs.stat(ctx, fullDirPath)
  109. if err == nil {
  110. return os.ErrExist
  111. }
  112. return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  113. dir, name := filer2.FullPath(fullDirPath).DirAndName()
  114. request := &filer_pb.CreateEntryRequest{
  115. Directory: dir,
  116. Entry: &filer_pb.Entry{
  117. Name: name,
  118. IsDirectory: true,
  119. Attributes: &filer_pb.FuseAttributes{
  120. Mtime: time.Now().Unix(),
  121. Crtime: time.Now().Unix(),
  122. FileMode: uint32(perm | os.ModeDir),
  123. Uid: fs.option.Uid,
  124. Gid: fs.option.Gid,
  125. },
  126. },
  127. }
  128. glog.V(1).Infof("mkdir: %v", request)
  129. if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
  130. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  131. }
  132. return nil
  133. })
  134. }
  135. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  136. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  137. var err error
  138. if fullFilePath, err = clearName(fullFilePath); err != nil {
  139. return nil, err
  140. }
  141. if flag&os.O_CREATE != 0 {
  142. // file should not have / suffix.
  143. if strings.HasSuffix(fullFilePath, "/") {
  144. return nil, os.ErrInvalid
  145. }
  146. _, err = fs.stat(ctx, fullFilePath)
  147. if err == nil {
  148. if flag&os.O_EXCL != 0 {
  149. return nil, os.ErrExist
  150. }
  151. fs.removeAll(ctx, fullFilePath)
  152. }
  153. dir, name := filer2.FullPath(fullFilePath).DirAndName()
  154. err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  155. if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{
  156. Directory: dir,
  157. Entry: &filer_pb.Entry{
  158. Name: name,
  159. IsDirectory: perm&os.ModeDir > 0,
  160. Attributes: &filer_pb.FuseAttributes{
  161. Mtime: time.Now().Unix(),
  162. Crtime: time.Now().Unix(),
  163. FileMode: uint32(perm),
  164. Uid: fs.option.Uid,
  165. Gid: fs.option.Gid,
  166. Collection: fs.option.Collection,
  167. Replication: "000",
  168. TtlSec: 0,
  169. },
  170. },
  171. }); err != nil {
  172. return fmt.Errorf("create %s: %v", fullFilePath, err)
  173. }
  174. return nil
  175. })
  176. if err != nil {
  177. return nil, err
  178. }
  179. return &WebDavFile{
  180. fs: fs,
  181. name: fullFilePath,
  182. isDirectory: false,
  183. }, nil
  184. }
  185. fi, err := fs.stat(ctx, fullFilePath)
  186. if err != nil {
  187. return nil, os.ErrNotExist
  188. }
  189. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  190. fullFilePath += "/"
  191. }
  192. return &WebDavFile{
  193. fs: fs,
  194. name: fullFilePath,
  195. isDirectory: false,
  196. }, nil
  197. }
  198. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  199. var err error
  200. if fullFilePath, err = clearName(fullFilePath); err != nil {
  201. return err
  202. }
  203. fi, err := fs.stat(ctx, fullFilePath)
  204. if err != nil {
  205. return err
  206. }
  207. if fi.IsDir() {
  208. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
  209. } else {
  210. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
  211. }
  212. dir, name := filer2.FullPath(fullFilePath).DirAndName()
  213. err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  214. request := &filer_pb.DeleteEntryRequest{
  215. Directory: dir,
  216. Name: name,
  217. IsDeleteData: true,
  218. }
  219. glog.V(3).Infof("removing entry: %v", request)
  220. _, err := client.DeleteEntry(ctx, request)
  221. if err != nil {
  222. return fmt.Errorf("remove %s: %v", fullFilePath, err)
  223. }
  224. return nil
  225. })
  226. return err
  227. }
  228. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  229. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  230. return fs.removeAll(ctx, name)
  231. }
  232. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  233. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  234. var err error
  235. if oldName, err = clearName(oldName); err != nil {
  236. return err
  237. }
  238. if newName, err = clearName(newName); err != nil {
  239. return err
  240. }
  241. of, err := fs.stat(ctx, oldName)
  242. if err != nil {
  243. return os.ErrExist
  244. }
  245. if of.IsDir() {
  246. if strings.HasSuffix(oldName, "/") {
  247. oldName = strings.TrimRight(oldName, "/")
  248. }
  249. if strings.HasSuffix(newName, "/") {
  250. newName = strings.TrimRight(newName, "/")
  251. }
  252. }
  253. _, err = fs.stat(ctx, newName)
  254. if err == nil {
  255. return os.ErrExist
  256. }
  257. oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
  258. newDir, newBaseName := filer2.FullPath(newName).DirAndName()
  259. return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  260. request := &filer_pb.AtomicRenameEntryRequest{
  261. OldDirectory: oldDir,
  262. OldName: oldBaseName,
  263. NewDirectory: newDir,
  264. NewName: newBaseName,
  265. }
  266. _, err := client.AtomicRenameEntry(ctx, request)
  267. if err != nil {
  268. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  269. }
  270. return nil
  271. })
  272. }
  273. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  274. var err error
  275. if fullFilePath, err = clearName(fullFilePath); err != nil {
  276. return nil, err
  277. }
  278. fullpath := filer2.FullPath(fullFilePath)
  279. var fi FileInfo
  280. entry, err := filer2.GetEntry(ctx, fs, fullpath)
  281. if entry == nil {
  282. return nil, os.ErrNotExist
  283. }
  284. if err != nil {
  285. return nil, err
  286. }
  287. fi.size = int64(filer2.TotalSize(entry.GetChunks()))
  288. fi.name = string(fullpath)
  289. fi.mode = os.FileMode(entry.Attributes.FileMode)
  290. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  291. fi.isDirectory = entry.IsDirectory
  292. if fi.name == "/" {
  293. fi.modifiledTime = time.Now()
  294. fi.isDirectory = true
  295. }
  296. return &fi, nil
  297. }
  298. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  299. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  300. return fs.stat(ctx, name)
  301. }
  302. func (f *WebDavFile) Write(buf []byte) (int, error) {
  303. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  304. var err error
  305. ctx := context.Background()
  306. if f.entry == nil {
  307. f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name))
  308. }
  309. if f.entry == nil {
  310. return 0, err
  311. }
  312. if err != nil {
  313. return 0, err
  314. }
  315. var fileId, host string
  316. var auth security.EncodedJwt
  317. if err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  318. request := &filer_pb.AssignVolumeRequest{
  319. Count: 1,
  320. Replication: "000",
  321. Collection: f.fs.option.Collection,
  322. }
  323. resp, err := client.AssignVolume(ctx, request)
  324. if err != nil {
  325. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  326. return err
  327. }
  328. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  329. return nil
  330. }); err != nil {
  331. return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  332. }
  333. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  334. bufReader := bytes.NewReader(buf)
  335. uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth)
  336. if err != nil {
  337. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
  338. return 0, fmt.Errorf("upload data: %v", err)
  339. }
  340. if uploadResult.Error != "" {
  341. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
  342. return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
  343. }
  344. chunk := &filer_pb.FileChunk{
  345. FileId: fileId,
  346. Offset: f.off,
  347. Size: uint64(len(buf)),
  348. Mtime: time.Now().UnixNano(),
  349. ETag: uploadResult.ETag,
  350. }
  351. f.entry.Chunks = append(f.entry.Chunks, chunk)
  352. dir, _ := filer2.FullPath(f.name).DirAndName()
  353. err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  354. f.entry.Attributes.Mtime = time.Now().Unix()
  355. request := &filer_pb.UpdateEntryRequest{
  356. Directory: dir,
  357. Entry: f.entry,
  358. }
  359. if _, err := client.UpdateEntry(ctx, request); err != nil {
  360. return fmt.Errorf("update %s: %v", f.name, err)
  361. }
  362. return nil
  363. })
  364. if err == nil {
  365. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  366. f.off += int64(len(buf))
  367. }
  368. return len(buf), err
  369. }
  370. func (f *WebDavFile) Close() error {
  371. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  372. if f.entry != nil {
  373. f.entry = nil
  374. f.entryViewCache = nil
  375. }
  376. return nil
  377. }
  378. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  379. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  380. ctx := context.Background()
  381. if f.entry == nil {
  382. f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name))
  383. }
  384. if f.entry == nil {
  385. return 0, err
  386. }
  387. if err != nil {
  388. return 0, err
  389. }
  390. if len(f.entry.Chunks) == 0 {
  391. return 0, io.EOF
  392. }
  393. if f.entryViewCache == nil {
  394. f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
  395. }
  396. chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p))
  397. totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, filer2.FullPath(f.name), p, chunkViews, f.off)
  398. if err != nil {
  399. return 0, err
  400. }
  401. readSize = int(totalRead)
  402. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead)
  403. f.off += totalRead
  404. if readSize == 0 {
  405. return 0, io.EOF
  406. }
  407. return
  408. }
  409. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  410. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  411. ctx := context.Background()
  412. dir, _ := filer2.FullPath(f.name).DirAndName()
  413. err = filer2.ReadDirAllEntries(ctx, f.fs, filer2.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
  414. fi := FileInfo{
  415. size: int64(filer2.TotalSize(entry.GetChunks())),
  416. name: entry.Name,
  417. mode: os.FileMode(entry.Attributes.FileMode),
  418. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  419. isDirectory: entry.IsDirectory,
  420. }
  421. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  422. fi.name += "/"
  423. }
  424. glog.V(4).Infof("entry: %v", fi.name)
  425. ret = append(ret, &fi)
  426. })
  427. old := f.off
  428. if old >= int64(len(ret)) {
  429. if count > 0 {
  430. return nil, io.EOF
  431. }
  432. return nil, nil
  433. }
  434. if count > 0 {
  435. f.off += int64(count)
  436. if f.off > int64(len(ret)) {
  437. f.off = int64(len(ret))
  438. }
  439. } else {
  440. f.off = int64(len(ret))
  441. old = 0
  442. }
  443. return ret[old:f.off], nil
  444. }
  445. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  446. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  447. ctx := context.Background()
  448. var err error
  449. switch whence {
  450. case 0:
  451. f.off = 0
  452. case 2:
  453. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  454. return 0, err
  455. } else {
  456. f.off = fi.Size()
  457. }
  458. }
  459. f.off += offset
  460. return f.off, err
  461. }
  462. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  463. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  464. ctx := context.Background()
  465. return f.fs.stat(ctx, f.name)
  466. }