You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

576 lines
14 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/spf13/viper"
  17. "github.com/chrislusf/seaweedfs/weed/filer2"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/security"
  20. )
  21. type WebDavOption struct {
  22. Filer string
  23. FilerGrpcAddress string
  24. DomainName string
  25. BucketsPath string
  26. GrpcDialOption grpc.DialOption
  27. Collection string
  28. Uid uint32
  29. Gid uint32
  30. }
  31. type WebDavServer struct {
  32. option *WebDavOption
  33. secret security.SigningKey
  34. filer *filer2.Filer
  35. grpcDialOption grpc.DialOption
  36. Handler *webdav.Handler
  37. }
  38. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  39. fs, _ := NewWebDavFileSystem(option)
  40. ws = &WebDavServer{
  41. option: option,
  42. grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
  43. Handler: &webdav.Handler{
  44. FileSystem: fs,
  45. LockSystem: webdav.NewMemLS(),
  46. },
  47. }
  48. return ws, nil
  49. }
  50. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  51. type WebDavFileSystem struct {
  52. option *WebDavOption
  53. secret security.SigningKey
  54. filer *filer2.Filer
  55. grpcDialOption grpc.DialOption
  56. }
  57. type FileInfo struct {
  58. name string
  59. size int64
  60. mode os.FileMode
  61. modifiledTime time.Time
  62. isDirectory bool
  63. }
  64. func (fi *FileInfo) Name() string { return fi.name }
  65. func (fi *FileInfo) Size() int64 { return fi.size }
  66. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  67. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  68. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  69. func (fi *FileInfo) Sys() interface{} { return nil }
  70. type WebDavFile struct {
  71. fs *WebDavFileSystem
  72. name string
  73. isDirectory bool
  74. off int64
  75. entry *filer_pb.Entry
  76. entryViewCache []filer2.VisibleInterval
  77. }
  78. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  79. return &WebDavFileSystem{
  80. option: option,
  81. }, nil
  82. }
  83. func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
  84. return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
  85. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  86. return fn(client)
  87. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  88. }
  89. func clearName(name string) (string, error) {
  90. slashed := strings.HasSuffix(name, "/")
  91. name = path.Clean(name)
  92. if !strings.HasSuffix(name, "/") && slashed {
  93. name += "/"
  94. }
  95. if !strings.HasPrefix(name, "/") {
  96. return "", os.ErrInvalid
  97. }
  98. return name, nil
  99. }
  100. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  101. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  102. if !strings.HasSuffix(fullDirPath, "/") {
  103. fullDirPath += "/"
  104. }
  105. var err error
  106. if fullDirPath, err = clearName(fullDirPath); err != nil {
  107. return err
  108. }
  109. _, err = fs.stat(ctx, fullDirPath)
  110. if err == nil {
  111. return os.ErrExist
  112. }
  113. return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  114. dir, name := filer2.FullPath(fullDirPath).DirAndName()
  115. request := &filer_pb.CreateEntryRequest{
  116. Directory: dir,
  117. Entry: &filer_pb.Entry{
  118. Name: name,
  119. IsDirectory: true,
  120. Attributes: &filer_pb.FuseAttributes{
  121. Mtime: time.Now().Unix(),
  122. Crtime: time.Now().Unix(),
  123. FileMode: uint32(perm | os.ModeDir),
  124. Uid: fs.option.Uid,
  125. Gid: fs.option.Gid,
  126. },
  127. },
  128. }
  129. glog.V(1).Infof("mkdir: %v", request)
  130. if _, err := client.CreateEntry(ctx, request); err != nil {
  131. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  132. }
  133. return nil
  134. })
  135. }
  136. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  137. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  138. var err error
  139. if fullFilePath, err = clearName(fullFilePath); err != nil {
  140. return nil, err
  141. }
  142. if flag&os.O_CREATE != 0 {
  143. // file should not have / suffix.
  144. if strings.HasSuffix(fullFilePath, "/") {
  145. return nil, os.ErrInvalid
  146. }
  147. _, err = fs.stat(ctx, fullFilePath)
  148. if err == nil {
  149. if flag&os.O_EXCL != 0 {
  150. return nil, os.ErrExist
  151. }
  152. fs.removeAll(ctx, fullFilePath)
  153. }
  154. dir, name := filer2.FullPath(fullFilePath).DirAndName()
  155. err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  156. if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
  157. Directory: dir,
  158. Entry: &filer_pb.Entry{
  159. Name: name,
  160. IsDirectory: perm&os.ModeDir > 0,
  161. Attributes: &filer_pb.FuseAttributes{
  162. Mtime: time.Now().Unix(),
  163. Crtime: time.Now().Unix(),
  164. FileMode: uint32(perm),
  165. Uid: fs.option.Uid,
  166. Gid: fs.option.Gid,
  167. Collection: fs.option.Collection,
  168. Replication: "000",
  169. TtlSec: 0,
  170. },
  171. },
  172. }); err != nil {
  173. return fmt.Errorf("create %s: %v", fullFilePath, err)
  174. }
  175. return nil
  176. })
  177. if err != nil {
  178. return nil, err
  179. }
  180. return &WebDavFile{
  181. fs: fs,
  182. name: fullFilePath,
  183. isDirectory: false,
  184. }, nil
  185. }
  186. fi, err := fs.stat(ctx, fullFilePath)
  187. if err != nil {
  188. return nil, os.ErrNotExist
  189. }
  190. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  191. fullFilePath += "/"
  192. }
  193. return &WebDavFile{
  194. fs: fs,
  195. name: fullFilePath,
  196. isDirectory: false,
  197. }, nil
  198. }
  199. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  200. var err error
  201. if fullFilePath, err = clearName(fullFilePath); err != nil {
  202. return err
  203. }
  204. fi, err := fs.stat(ctx, fullFilePath)
  205. if err != nil {
  206. return err
  207. }
  208. if fi.IsDir() {
  209. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
  210. } else {
  211. //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
  212. }
  213. dir, name := filer2.FullPath(fullFilePath).DirAndName()
  214. err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  215. request := &filer_pb.DeleteEntryRequest{
  216. Directory: dir,
  217. Name: name,
  218. IsDeleteData: true,
  219. }
  220. glog.V(3).Infof("removing entry: %v", request)
  221. _, err := client.DeleteEntry(ctx, request)
  222. if err != nil {
  223. return fmt.Errorf("remove %s: %v", fullFilePath, err)
  224. }
  225. return nil
  226. })
  227. return err
  228. }
  229. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  230. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  231. return fs.removeAll(ctx, name)
  232. }
  233. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  234. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  235. var err error
  236. if oldName, err = clearName(oldName); err != nil {
  237. return err
  238. }
  239. if newName, err = clearName(newName); err != nil {
  240. return err
  241. }
  242. of, err := fs.stat(ctx, oldName)
  243. if err != nil {
  244. return os.ErrExist
  245. }
  246. if of.IsDir() {
  247. if strings.HasSuffix(oldName, "/") {
  248. oldName = strings.TrimRight(oldName, "/")
  249. }
  250. if strings.HasSuffix(newName, "/") {
  251. newName = strings.TrimRight(newName, "/")
  252. }
  253. }
  254. _, err = fs.stat(ctx, newName)
  255. if err == nil {
  256. return os.ErrExist
  257. }
  258. oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
  259. newDir, newBaseName := filer2.FullPath(newName).DirAndName()
  260. return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  261. request := &filer_pb.AtomicRenameEntryRequest{
  262. OldDirectory: oldDir,
  263. OldName: oldBaseName,
  264. NewDirectory: newDir,
  265. NewName: newBaseName,
  266. }
  267. _, err := client.AtomicRenameEntry(ctx, request)
  268. if err != nil {
  269. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  270. }
  271. return nil
  272. })
  273. }
  274. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  275. var err error
  276. if fullFilePath, err = clearName(fullFilePath); err != nil {
  277. return nil, err
  278. }
  279. fullpath := filer2.FullPath(fullFilePath)
  280. var fi FileInfo
  281. entry, err := filer2.GetEntry(ctx, fs, fullpath)
  282. if entry == nil {
  283. return nil, os.ErrNotExist
  284. }
  285. if err != nil {
  286. return nil, err
  287. }
  288. fi.size = int64(filer2.TotalSize(entry.GetChunks()))
  289. fi.name = string(fullpath)
  290. fi.mode = os.FileMode(entry.Attributes.FileMode)
  291. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  292. fi.isDirectory = entry.IsDirectory
  293. if fi.name == "/" {
  294. fi.modifiledTime = time.Now()
  295. fi.isDirectory = true
  296. }
  297. return &fi, nil
  298. }
  299. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  300. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  301. return fs.stat(ctx, name)
  302. }
  303. func (f *WebDavFile) Write(buf []byte) (int, error) {
  304. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  305. var err error
  306. ctx := context.Background()
  307. if f.entry == nil {
  308. f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name))
  309. }
  310. if f.entry == nil {
  311. return 0, err
  312. }
  313. if err != nil {
  314. return 0, err
  315. }
  316. var fileId, host string
  317. var auth security.EncodedJwt
  318. if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  319. request := &filer_pb.AssignVolumeRequest{
  320. Count: 1,
  321. Replication: "000",
  322. Collection: f.fs.option.Collection,
  323. }
  324. resp, err := client.AssignVolume(ctx, request)
  325. if err != nil {
  326. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  327. return err
  328. }
  329. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  330. return nil
  331. }); err != nil {
  332. return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  333. }
  334. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  335. bufReader := bytes.NewReader(buf)
  336. uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth)
  337. if err != nil {
  338. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
  339. return 0, fmt.Errorf("upload data: %v", err)
  340. }
  341. if uploadResult.Error != "" {
  342. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
  343. return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
  344. }
  345. chunk := &filer_pb.FileChunk{
  346. FileId: fileId,
  347. Offset: f.off,
  348. Size: uint64(len(buf)),
  349. Mtime: time.Now().UnixNano(),
  350. ETag: uploadResult.ETag,
  351. }
  352. f.entry.Chunks = append(f.entry.Chunks, chunk)
  353. dir, _ := filer2.FullPath(f.name).DirAndName()
  354. err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  355. f.entry.Attributes.Mtime = time.Now().Unix()
  356. request := &filer_pb.UpdateEntryRequest{
  357. Directory: dir,
  358. Entry: f.entry,
  359. }
  360. if _, err := client.UpdateEntry(ctx, request); err != nil {
  361. return fmt.Errorf("update %s: %v", f.name, err)
  362. }
  363. return nil
  364. })
  365. if err == nil {
  366. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  367. f.off += int64(len(buf))
  368. }
  369. return len(buf), err
  370. }
  371. func (f *WebDavFile) Close() error {
  372. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  373. if f.entry != nil {
  374. f.entry = nil
  375. f.entryViewCache = nil
  376. }
  377. return nil
  378. }
  379. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  380. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  381. ctx := context.Background()
  382. if f.entry == nil {
  383. f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name))
  384. }
  385. if f.entry == nil {
  386. return 0, err
  387. }
  388. if err != nil {
  389. return 0, err
  390. }
  391. if len(f.entry.Chunks) == 0 {
  392. return 0, io.EOF
  393. }
  394. if f.entryViewCache == nil {
  395. f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
  396. }
  397. chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p))
  398. totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, filer2.FullPath(f.name), p, chunkViews, f.off)
  399. if err != nil {
  400. return 0, err
  401. }
  402. readSize = int(totalRead)
  403. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead)
  404. f.off += totalRead
  405. if readSize == 0 {
  406. return 0, io.EOF
  407. }
  408. return
  409. }
  410. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  411. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  412. ctx := context.Background()
  413. dir, _ := filer2.FullPath(f.name).DirAndName()
  414. err = filer2.ReadDirAllEntries(ctx, f.fs, filer2.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
  415. fi := FileInfo{
  416. size: int64(filer2.TotalSize(entry.GetChunks())),
  417. name: entry.Name,
  418. mode: os.FileMode(entry.Attributes.FileMode),
  419. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  420. isDirectory: entry.IsDirectory,
  421. }
  422. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  423. fi.name += "/"
  424. }
  425. glog.V(4).Infof("entry: %v", fi.name)
  426. ret = append(ret, &fi)
  427. })
  428. old := f.off
  429. if old >= int64(len(ret)) {
  430. if count > 0 {
  431. return nil, io.EOF
  432. }
  433. return nil, nil
  434. }
  435. if count > 0 {
  436. f.off += int64(count)
  437. if f.off > int64(len(ret)) {
  438. f.off = int64(len(ret))
  439. }
  440. } else {
  441. f.off = int64(len(ret))
  442. old = 0
  443. }
  444. return ret[old:f.off], nil
  445. }
  446. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  447. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  448. ctx := context.Background()
  449. var err error
  450. switch whence {
  451. case 0:
  452. f.off = 0
  453. case 2:
  454. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  455. return 0, err
  456. } else {
  457. f.off = fi.Size()
  458. }
  459. }
  460. f.off += offset
  461. return f.off, err
  462. }
  463. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  464. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  465. ctx := context.Background()
  466. return f.fs.stat(ctx, f.name)
  467. }