You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 lines
6.2 KiB

  1. package rclone_backend
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/rclone/rclone/fs/config/configfile"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "io"
  8. "os"
  9. "time"
  10. "github.com/google/uuid"
  11. _ "github.com/rclone/rclone/backend/all"
  12. "github.com/rclone/rclone/fs"
  13. "github.com/rclone/rclone/fs/accounting"
  14. "github.com/rclone/rclone/fs/object"
  15. "github.com/seaweedfs/seaweedfs/weed/glog"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  18. )
  19. func init() {
  20. backend.BackendStorageFactories["rclone"] = &RcloneBackendFactory{}
  21. configfile.Install()
  22. }
  23. type RcloneBackendFactory struct {
  24. }
  25. func (factory *RcloneBackendFactory) StorageType() backend.StorageType {
  26. return "rclone"
  27. }
  28. func (factory *RcloneBackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
  29. return newRcloneBackendStorage(configuration, configPrefix, id)
  30. }
  31. type RcloneBackendStorage struct {
  32. id string
  33. remoteName string
  34. fs fs.Fs
  35. }
  36. func newRcloneBackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *RcloneBackendStorage, err error) {
  37. s = &RcloneBackendStorage{}
  38. s.id = id
  39. s.remoteName = configuration.GetString(configPrefix + "remote_name")
  40. ctx := context.TODO()
  41. accounting.Start(ctx)
  42. fsPath := fmt.Sprintf("%s:", s.remoteName)
  43. s.fs, err = fs.NewFs(ctx, fsPath)
  44. if err != nil {
  45. glog.Errorf("failed to instantiate Rclone filesystem: %s", err)
  46. return
  47. }
  48. glog.V(0).Infof("created backend storage rclone.%s for remote name %s", s.id, s.remoteName)
  49. return
  50. }
  51. func (s *RcloneBackendStorage) ToProperties() map[string]string {
  52. m := make(map[string]string)
  53. m["remote_name"] = s.remoteName
  54. return m
  55. }
  56. func (s *RcloneBackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
  57. f := &RcloneBackendStorageFile{
  58. backendStorage: s,
  59. key: key,
  60. tierInfo: tierInfo,
  61. }
  62. return f
  63. }
  64. func (s *RcloneBackendStorage) CopyFile(f *os.File, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
  65. randomUuid, err := uuid.NewRandom()
  66. if err != nil {
  67. return key, 0, err
  68. }
  69. key = randomUuid.String()
  70. glog.V(1).Infof("copy dat file of %s to remote rclone.%s as %s", f.Name(), s.id, key)
  71. util.Retry("upload via Rclone", func() error {
  72. size, err = uploadViaRclone(s.fs, f.Name(), key, fn)
  73. return err
  74. })
  75. return
  76. }
  77. func uploadViaRclone(rfs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  78. ctx := context.TODO()
  79. file, err := os.Open(filename)
  80. defer func(file *os.File) {
  81. err := file.Close()
  82. if err != nil {
  83. return
  84. }
  85. }(file)
  86. if err != nil {
  87. return 0, err
  88. }
  89. stat, err := file.Stat()
  90. if err != nil {
  91. return 0, err
  92. }
  93. info := object.NewStaticObjectInfo(key, stat.ModTime(), stat.Size(), true, nil, rfs)
  94. tr := accounting.NewStats(ctx).NewTransfer(info)
  95. defer tr.Done(ctx, err)
  96. acc := tr.Account(ctx, file)
  97. pr := ProgressReader{acc: acc, tr: tr, fn: fn}
  98. obj, err := rfs.Put(ctx, &pr, info)
  99. if err != nil {
  100. return 0, err
  101. }
  102. return obj.Size(), err
  103. }
  104. func (s *RcloneBackendStorage) DownloadFile(filename string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
  105. glog.V(1).Infof("download dat file of %s from remote rclone.%s as %s", filename, s.id, key)
  106. util.Retry("download via Rclone", func() error {
  107. size, err = downloadViaRclone(s.fs, filename, key, fn)
  108. return err
  109. })
  110. return
  111. }
  112. func downloadViaRclone(fs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  113. ctx := context.TODO()
  114. obj, err := fs.NewObject(ctx, key)
  115. if err != nil {
  116. return 0, err
  117. }
  118. rc, err := obj.Open(ctx)
  119. defer func(rc io.ReadCloser) {
  120. err := rc.Close()
  121. if err != nil {
  122. return
  123. }
  124. }(rc)
  125. if err != nil {
  126. return 0, err
  127. }
  128. file, err := os.Create(filename)
  129. defer func(file *os.File) {
  130. err := file.Close()
  131. if err != nil {
  132. return
  133. }
  134. }(file)
  135. tr := accounting.NewStats(ctx).NewTransfer(obj)
  136. defer tr.Done(ctx, err)
  137. acc := tr.Account(ctx, rc)
  138. pr := ProgressReader{acc: acc, tr: tr, fn: fn}
  139. written, err := io.Copy(file, &pr)
  140. if err != nil {
  141. return 0, err
  142. }
  143. return written, nil
  144. }
  145. func (s *RcloneBackendStorage) DeleteFile(key string) (err error) {
  146. glog.V(1).Infof("delete dat file %s from remote", key)
  147. util.Retry("delete via Rclone", func() error {
  148. err = deleteViaRclone(s.fs, key)
  149. return err
  150. })
  151. return
  152. }
  153. func deleteViaRclone(fs fs.Fs, key string) (err error) {
  154. ctx := context.TODO()
  155. obj, err := fs.NewObject(ctx, key)
  156. if err != nil {
  157. return err
  158. }
  159. return obj.Remove(ctx)
  160. }
  161. type RcloneBackendStorageFile struct {
  162. backendStorage *RcloneBackendStorage
  163. key string
  164. tierInfo *volume_server_pb.VolumeInfo
  165. }
  166. func (rcloneBackendStorageFile RcloneBackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
  167. ctx := context.TODO()
  168. obj, err := rcloneBackendStorageFile.backendStorage.fs.NewObject(ctx, rcloneBackendStorageFile.key)
  169. if err != nil {
  170. return 0, err
  171. }
  172. opt := fs.RangeOption{Start: off, End: off + int64(len(p)) - 1}
  173. rc, err := obj.Open(ctx, &opt)
  174. defer func(rc io.ReadCloser) {
  175. err := rc.Close()
  176. if err != nil {
  177. return
  178. }
  179. }(rc)
  180. if err != nil {
  181. return 0, err
  182. }
  183. return io.ReadFull(rc, p)
  184. }
  185. func (rcloneBackendStorageFile RcloneBackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
  186. panic("not implemented")
  187. }
  188. func (rcloneBackendStorageFile RcloneBackendStorageFile) Truncate(off int64) error {
  189. panic("not implemented")
  190. }
  191. func (rcloneBackendStorageFile RcloneBackendStorageFile) Close() error {
  192. return nil
  193. }
  194. func (rcloneBackendStorageFile RcloneBackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
  195. files := rcloneBackendStorageFile.tierInfo.GetFiles()
  196. if len(files) == 0 {
  197. err = fmt.Errorf("remote file info not found")
  198. return
  199. }
  200. datSize = int64(files[0].FileSize)
  201. modTime = time.Unix(int64(files[0].ModifiedTime), 0)
  202. return
  203. }
  204. func (rcloneBackendStorageFile RcloneBackendStorageFile) Name() string {
  205. return rcloneBackendStorageFile.key
  206. }
  207. func (rcloneBackendStorageFile RcloneBackendStorageFile) Sync() error {
  208. return nil
  209. }