You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

569 lines
14 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/filesys"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/pb/pb_cache"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/filer2"
  20. "github.com/chrislusf/seaweedfs/weed/glog"
  21. "github.com/chrislusf/seaweedfs/weed/security"
  22. )
  23. type WebDavOption struct {
  24. Filer string
  25. FilerGrpcAddress string
  26. DomainName string
  27. BucketsPath string
  28. GrpcDialOption grpc.DialOption
  29. Collection string
  30. Uid uint32
  31. Gid uint32
  32. Cipher bool
  33. }
  34. type WebDavServer struct {
  35. option *WebDavOption
  36. secret security.SigningKey
  37. filer *filer2.Filer
  38. grpcDialOption grpc.DialOption
  39. Handler *webdav.Handler
  40. }
  41. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  42. fs, _ := NewWebDavFileSystem(option)
  43. ws = &WebDavServer{
  44. option: option,
  45. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  46. Handler: &webdav.Handler{
  47. FileSystem: fs,
  48. LockSystem: webdav.NewMemLS(),
  49. },
  50. }
  51. return ws, nil
  52. }
  53. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  54. type WebDavFileSystem struct {
  55. option *WebDavOption
  56. secret security.SigningKey
  57. filer *filer2.Filer
  58. grpcDialOption grpc.DialOption
  59. chunkCache *pb_cache.ChunkCache
  60. }
  61. type FileInfo struct {
  62. name string
  63. size int64
  64. mode os.FileMode
  65. modifiledTime time.Time
  66. isDirectory bool
  67. }
  68. func (fi *FileInfo) Name() string { return fi.name }
  69. func (fi *FileInfo) Size() int64 { return fi.size }
  70. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  71. func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
  72. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  73. func (fi *FileInfo) Sys() interface{} { return nil }
  74. type WebDavFile struct {
  75. fs *WebDavFileSystem
  76. name string
  77. isDirectory bool
  78. off int64
  79. entry *filer_pb.Entry
  80. entryViewCache []filer2.VisibleInterval
  81. reader io.ReaderAt
  82. }
  83. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  84. return &WebDavFileSystem{
  85. option: option,
  86. chunkCache: pb_cache.NewChunkCache(1000),
  87. }, nil
  88. }
  89. func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  90. return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  91. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  92. return fn(client)
  93. }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
  94. }
  95. func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
  96. return hostAndPort
  97. }
  98. func clearName(name string) (string, error) {
  99. slashed := strings.HasSuffix(name, "/")
  100. name = path.Clean(name)
  101. if !strings.HasSuffix(name, "/") && slashed {
  102. name += "/"
  103. }
  104. if !strings.HasPrefix(name, "/") {
  105. return "", os.ErrInvalid
  106. }
  107. return name, nil
  108. }
  109. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  110. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  111. if !strings.HasSuffix(fullDirPath, "/") {
  112. fullDirPath += "/"
  113. }
  114. var err error
  115. if fullDirPath, err = clearName(fullDirPath); err != nil {
  116. return err
  117. }
  118. _, err = fs.stat(ctx, fullDirPath)
  119. if err == nil {
  120. return os.ErrExist
  121. }
  122. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  123. dir, name := util.FullPath(fullDirPath).DirAndName()
  124. request := &filer_pb.CreateEntryRequest{
  125. Directory: dir,
  126. Entry: &filer_pb.Entry{
  127. Name: name,
  128. IsDirectory: true,
  129. Attributes: &filer_pb.FuseAttributes{
  130. Mtime: time.Now().Unix(),
  131. Crtime: time.Now().Unix(),
  132. FileMode: uint32(perm | os.ModeDir),
  133. Uid: fs.option.Uid,
  134. Gid: fs.option.Gid,
  135. },
  136. },
  137. }
  138. glog.V(1).Infof("mkdir: %v", request)
  139. if err := filer_pb.CreateEntry(client, request); err != nil {
  140. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  141. }
  142. return nil
  143. })
  144. }
  145. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  146. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  147. var err error
  148. if fullFilePath, err = clearName(fullFilePath); err != nil {
  149. return nil, err
  150. }
  151. if flag&os.O_CREATE != 0 {
  152. // file should not have / suffix.
  153. if strings.HasSuffix(fullFilePath, "/") {
  154. return nil, os.ErrInvalid
  155. }
  156. _, err = fs.stat(ctx, fullFilePath)
  157. if err == nil {
  158. if flag&os.O_EXCL != 0 {
  159. return nil, os.ErrExist
  160. }
  161. fs.removeAll(ctx, fullFilePath)
  162. }
  163. dir, name := util.FullPath(fullFilePath).DirAndName()
  164. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  165. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  166. Directory: dir,
  167. Entry: &filer_pb.Entry{
  168. Name: name,
  169. IsDirectory: perm&os.ModeDir > 0,
  170. Attributes: &filer_pb.FuseAttributes{
  171. Mtime: time.Now().Unix(),
  172. Crtime: time.Now().Unix(),
  173. FileMode: uint32(perm),
  174. Uid: fs.option.Uid,
  175. Gid: fs.option.Gid,
  176. Collection: fs.option.Collection,
  177. Replication: "000",
  178. TtlSec: 0,
  179. },
  180. },
  181. }); err != nil {
  182. return fmt.Errorf("create %s: %v", fullFilePath, err)
  183. }
  184. return nil
  185. })
  186. if err != nil {
  187. return nil, err
  188. }
  189. return &WebDavFile{
  190. fs: fs,
  191. name: fullFilePath,
  192. isDirectory: false,
  193. }, nil
  194. }
  195. fi, err := fs.stat(ctx, fullFilePath)
  196. if err != nil {
  197. return nil, os.ErrNotExist
  198. }
  199. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  200. fullFilePath += "/"
  201. }
  202. return &WebDavFile{
  203. fs: fs,
  204. name: fullFilePath,
  205. isDirectory: false,
  206. }, nil
  207. }
  208. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  209. var err error
  210. if fullFilePath, err = clearName(fullFilePath); err != nil {
  211. return err
  212. }
  213. dir, name := util.FullPath(fullFilePath).DirAndName()
  214. return filer_pb.Remove(fs, dir, name, true, false, false)
  215. }
  216. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  217. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  218. return fs.removeAll(ctx, name)
  219. }
  220. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  221. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  222. var err error
  223. if oldName, err = clearName(oldName); err != nil {
  224. return err
  225. }
  226. if newName, err = clearName(newName); err != nil {
  227. return err
  228. }
  229. of, err := fs.stat(ctx, oldName)
  230. if err != nil {
  231. return os.ErrExist
  232. }
  233. if of.IsDir() {
  234. if strings.HasSuffix(oldName, "/") {
  235. oldName = strings.TrimRight(oldName, "/")
  236. }
  237. if strings.HasSuffix(newName, "/") {
  238. newName = strings.TrimRight(newName, "/")
  239. }
  240. }
  241. _, err = fs.stat(ctx, newName)
  242. if err == nil {
  243. return os.ErrExist
  244. }
  245. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  246. newDir, newBaseName := util.FullPath(newName).DirAndName()
  247. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  248. request := &filer_pb.AtomicRenameEntryRequest{
  249. OldDirectory: oldDir,
  250. OldName: oldBaseName,
  251. NewDirectory: newDir,
  252. NewName: newBaseName,
  253. }
  254. _, err := client.AtomicRenameEntry(ctx, request)
  255. if err != nil {
  256. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  257. }
  258. return nil
  259. })
  260. }
  261. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  262. var err error
  263. if fullFilePath, err = clearName(fullFilePath); err != nil {
  264. return nil, err
  265. }
  266. fullpath := util.FullPath(fullFilePath)
  267. var fi FileInfo
  268. entry, err := filer_pb.GetEntry(fs, fullpath)
  269. if entry == nil {
  270. return nil, os.ErrNotExist
  271. }
  272. if err != nil {
  273. return nil, err
  274. }
  275. fi.size = int64(filer2.TotalSize(entry.GetChunks()))
  276. fi.name = string(fullpath)
  277. fi.mode = os.FileMode(entry.Attributes.FileMode)
  278. fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
  279. fi.isDirectory = entry.IsDirectory
  280. if fi.name == "/" {
  281. fi.modifiledTime = time.Now()
  282. fi.isDirectory = true
  283. }
  284. return &fi, nil
  285. }
  286. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  287. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  288. return fs.stat(ctx, name)
  289. }
  290. func (f *WebDavFile) Write(buf []byte) (int, error) {
  291. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  292. dir, _ := util.FullPath(f.name).DirAndName()
  293. var err error
  294. ctx := context.Background()
  295. if f.entry == nil {
  296. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  297. }
  298. if f.entry == nil {
  299. return 0, err
  300. }
  301. if err != nil {
  302. return 0, err
  303. }
  304. var fileId, host string
  305. var auth security.EncodedJwt
  306. var collection, replication string
  307. if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  308. request := &filer_pb.AssignVolumeRequest{
  309. Count: 1,
  310. Replication: "",
  311. Collection: f.fs.option.Collection,
  312. ParentPath: dir,
  313. }
  314. resp, err := client.AssignVolume(ctx, request)
  315. if err != nil {
  316. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  317. return err
  318. }
  319. if resp.Error != "" {
  320. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  321. }
  322. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  323. collection, replication = resp.Collection, resp.Replication
  324. return nil
  325. }); err != nil {
  326. return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  327. }
  328. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  329. uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
  330. if err != nil {
  331. glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
  332. return 0, fmt.Errorf("upload data: %v", err)
  333. }
  334. if uploadResult.Error != "" {
  335. glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
  336. return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
  337. }
  338. chunk := &filer_pb.FileChunk{
  339. FileId: fileId,
  340. Offset: f.off,
  341. Size: uint64(len(buf)),
  342. Mtime: time.Now().UnixNano(),
  343. ETag: uploadResult.ETag,
  344. CipherKey: uploadResult.CipherKey,
  345. IsGzipped: uploadResult.Gzip > 0,
  346. }
  347. f.entry.Chunks = append(f.entry.Chunks, chunk)
  348. err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  349. f.entry.Attributes.Mtime = time.Now().Unix()
  350. f.entry.Attributes.Collection = collection
  351. f.entry.Attributes.Replication = replication
  352. request := &filer_pb.UpdateEntryRequest{
  353. Directory: dir,
  354. Entry: f.entry,
  355. }
  356. if _, err := client.UpdateEntry(ctx, request); err != nil {
  357. return fmt.Errorf("update %s: %v", f.name, err)
  358. }
  359. return nil
  360. })
  361. if err == nil {
  362. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  363. f.off += int64(len(buf))
  364. }
  365. return len(buf), err
  366. }
  367. func (f *WebDavFile) Close() error {
  368. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  369. if f.entry != nil {
  370. f.entry = nil
  371. f.entryViewCache = nil
  372. }
  373. return nil
  374. }
  375. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  376. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  377. if f.entry == nil {
  378. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  379. }
  380. if f.entry == nil {
  381. return 0, err
  382. }
  383. if err != nil {
  384. return 0, err
  385. }
  386. if len(f.entry.Chunks) == 0 {
  387. return 0, io.EOF
  388. }
  389. if f.entryViewCache == nil {
  390. f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
  391. f.reader = nil
  392. }
  393. if f.reader == nil {
  394. chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
  395. f.reader = filesys.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache)
  396. }
  397. readSize, err = f.reader.ReadAt(p, f.off)
  398. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  399. f.off += int64(readSize)
  400. if err != nil {
  401. glog.Errorf("file read %s: %v", f.name, err)
  402. }
  403. return
  404. }
  405. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  406. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  407. dir, _ := util.FullPath(f.name).DirAndName()
  408. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
  409. fi := FileInfo{
  410. size: int64(filer2.TotalSize(entry.GetChunks())),
  411. name: entry.Name,
  412. mode: os.FileMode(entry.Attributes.FileMode),
  413. modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
  414. isDirectory: entry.IsDirectory,
  415. }
  416. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  417. fi.name += "/"
  418. }
  419. glog.V(4).Infof("entry: %v", fi.name)
  420. ret = append(ret, &fi)
  421. })
  422. old := f.off
  423. if old >= int64(len(ret)) {
  424. if count > 0 {
  425. return nil, io.EOF
  426. }
  427. return nil, nil
  428. }
  429. if count > 0 {
  430. f.off += int64(count)
  431. if f.off > int64(len(ret)) {
  432. f.off = int64(len(ret))
  433. }
  434. } else {
  435. f.off = int64(len(ret))
  436. old = 0
  437. }
  438. return ret[old:f.off], nil
  439. }
  440. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  441. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  442. ctx := context.Background()
  443. var err error
  444. switch whence {
  445. case 0:
  446. f.off = 0
  447. case 2:
  448. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  449. return 0, err
  450. } else {
  451. f.off = fi.Size()
  452. }
  453. }
  454. f.off += offset
  455. return f.off, err
  456. }
  457. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  458. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  459. ctx := context.Background()
  460. return f.fs.stat(ctx, f.name)
  461. }