Browse Source

Merge pull request #1759 from kmlebedev/sink.local

replication to local disk storage
pull/1761/head
Chris Lu 4 years ago
committed by GitHub
parent
commit
5e07afb0f0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docker/Makefile
  2. 59
      docker/local-replicate-compose.yml
  3. 16
      docker/notification.toml
  4. 11
      docker/replication.toml
  5. 1
      weed/command/filer_replication.go
  6. 4
      weed/command/scaffold.go
  7. 101
      weed/replication/sink/localsink/local_sink.go

3
docker/Makefile

@ -21,6 +21,9 @@ k8s: build
dev_registry: build dev_registry: build
docker-compose -f local-registry-compose.yml -p seaweedfs up docker-compose -f local-registry-compose.yml -p seaweedfs up
dev_replicate: build
docker-compose -f local-replicate-compose.yml -p seaweedfs up
cluster: build cluster: build
docker-compose -f local-cluster-compose.yml -p seaweedfs up docker-compose -f local-cluster-compose.yml -p seaweedfs up

59
docker/local-replicate-compose.yml

@ -0,0 +1,59 @@
version: '2'
services:
master:
image: chrislusf/seaweedfs:local
ports:
- 9333:9333
- 19333:19333
command: "master -ip=master"
volume:
image: chrislusf/seaweedfs:local
ports:
- 8080:8080
- 18080:18080
command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1"
depends_on:
- master
filer:
image: chrislusf/seaweedfs:local
ports:
- 8888:8888
- 18888:18888
command: '-v=9 filer -master="master:9333"'
restart: on-failure
volumes:
- ./notification.toml:/etc/seaweedfs/notification.toml
depends_on:
- master
- volume
- rabbitmq
- replicate
environment:
RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/"
replicate:
image: chrislusf/seaweedfs:local
command: '-v=9 filer.replicate'
restart: on-failure
volumes:
- ./notification.toml:/etc/seaweedfs/notification.toml
- ./replication.toml:/etc/seaweedfs/replication.toml
depends_on:
- rabbitmq
environment:
RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/"
s3:
image: chrislusf/seaweedfs:local
ports:
- 8333:8333
command: 's3 -filer="filer:8888"'
depends_on:
- master
- volume
- filer
rabbitmq:
image: rabbitmq:3.8.10-management-alpine
ports:
- 5672:5672
- 15671:15671
- 15672:15672

16
docker/notification.toml

@ -0,0 +1,16 @@
[notification.log]
# this is only for debugging perpose and does not work with "weed filer.replicate"
enabled = false
[notification.gocdk_pub_sub]
# The Go Cloud Development Kit (https://gocloud.dev).
# PubSub API (https://godoc.org/gocloud.dev/pubsub).
# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
enabled = true
# This URL will Dial the RabbitMQ server at the URL in the environment
# variable RABBIT_SERVER_URL and open the exchange "myexchange".
# The exchange must have already been created by some other means, like
# the RabbitMQ management plugin.
topic_url = "rabbit://swexchange"
sub_url = "rabbit://swqueue"

11
docker/replication.toml

@ -0,0 +1,11 @@
[source.filer]
enabled = true
grpcAddress = "filer:18888"
# all files under this directory tree are replicated.
# this is not a directory on your hard drive, but on your filer.
# i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets"
[sink.local]
enabled = true
directory = "/data"

1
weed/command/filer_replication.go

@ -11,6 +11,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"

4
weed/command/scaffold.go

@ -350,6 +350,10 @@ grpcAddress = "localhost:18888"
# i.e., all files with this "prefix" are sent to notification message queue. # i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets" directory = "/buckets"
[sink.local]
enabled = false
directory = "/backup"
[sink.filer] [sink.filer]
enabled = false enabled = false
grpcAddress = "localhost:18888" grpcAddress = "localhost:18888"

101
weed/replication/sink/localsink/local_sink.go

@ -0,0 +1,101 @@
package localsink
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
"io/ioutil"
"os"
"path/filepath"
"strings"
)
type LocalSink struct {
dir string
filerSource *source.FilerSource
}
func init() {
sink.Sinks = append(sink.Sinks, &LocalSink{})
}
func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
localsink.filerSource = s
}
func (localsink *LocalSink) GetName() string {
return "local"
}
func (localsink *LocalSink) isMultiPartEntry(key string) bool {
return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
}
func (localsink *LocalSink) initialize(dir string) error {
localsink.dir = dir
return nil
}
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
dir := configuration.GetString(prefix + "directory")
glog.V(4).Infof("sink.local.directory: %v", dir)
return localsink.initialize(dir)
}
func (localsink *LocalSink) GetSinkToDirectory() string {
return localsink.dir
}
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
if localsink.isMultiPartEntry(key) {
return nil
}
glog.V(4).Infof("Delete Entry key: %s", key)
if err := os.Remove(key); err != nil {
return err
}
return nil
}
func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
if entry.IsDirectory || localsink.isMultiPartEntry(key) {
return nil
}
glog.V(4).Infof("Create Entry key: %s", key)
totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
dir := filepath.Dir(key)
if _, err := os.Stat(dir); os.IsNotExist(err) {
glog.V(4).Infof("Create Direcotry key: %s", dir)
if err = os.MkdirAll(dir, 0); err != nil {
return err
}
}
writeFunc := func(data []byte) error {
writeErr := ioutil.WriteFile(key, data, 0)
return writeErr
}
if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
return err
}
return nil
}
func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
if localsink.isMultiPartEntry(key) {
return true, nil
}
glog.V(4).Infof("Update Entry key: %s", key)
// do delete and create
return false, nil
}
Loading…
Cancel
Save