You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.1 KiB

  1. package localsink
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/filer"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
  7. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  8. "github.com/chrislusf/seaweedfs/weed/replication/source"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "io/ioutil"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. "time"
  15. )
  16. type LocalSink struct {
  17. dir string
  18. todaysDateFormat string
  19. filerSource *source.FilerSource
  20. }
  21. func init() {
  22. sink.Sinks = append(sink.Sinks, &LocalSink{})
  23. }
  24. func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
  25. localsink.filerSource = s
  26. }
  27. func (localsink *LocalSink) GetName() string {
  28. return "local"
  29. }
  30. func (localsink *LocalSink) isMultiPartEntry(key string) bool {
  31. return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
  32. }
  33. func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error {
  34. localsink.dir = dir
  35. localsink.todaysDateFormat = todaysDateFormat
  36. return nil
  37. }
  38. func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
  39. dir := configuration.GetString(prefix + "directory")
  40. todaysDateFormat := configuration.GetString(prefix + "todays_date_format")
  41. glog.V(4).Infof("sink.local.directory: %v", dir)
  42. return localsink.initialize(dir, todaysDateFormat)
  43. }
  44. func (localsink *LocalSink) GetSinkToDirectory() string {
  45. if localsink.todaysDateFormat != "" {
  46. return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat))
  47. }
  48. return localsink.dir
  49. }
  50. func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  51. if localsink.isMultiPartEntry(key) {
  52. return nil
  53. }
  54. glog.V(4).Infof("Delete Entry key: %s", key)
  55. if err := os.Remove(key); err != nil {
  56. glog.V(0).Infof("remove entry key %s: %s", key, err)
  57. }
  58. return nil
  59. }
  60. func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  61. if entry.IsDirectory || localsink.isMultiPartEntry(key) {
  62. return nil
  63. }
  64. glog.V(4).Infof("Create Entry key: %s", key)
  65. totalSize := filer.FileSize(entry)
  66. chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
  67. dir := filepath.Dir(key)
  68. if _, err := os.Stat(dir); os.IsNotExist(err) {
  69. glog.V(4).Infof("Create Direcotry key: %s", dir)
  70. if err = os.MkdirAll(dir, 0); err != nil {
  71. return err
  72. }
  73. }
  74. writeFunc := func(data []byte) error {
  75. writeErr := ioutil.WriteFile(key, data, 0)
  76. return writeErr
  77. }
  78. if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
  79. return err
  80. }
  81. return nil
  82. }
  83. func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  84. if localsink.isMultiPartEntry(key) {
  85. return true, nil
  86. }
  87. glog.V(4).Infof("Update Entry key: %s", key)
  88. // do delete and create
  89. return false, nil
  90. }