Browse Source

Merge branch 'master' into support_ssd_volume

pull/1794/head
Chris Lu 4 years ago
parent
commit
1c7e1295dc
  1. 10
      .github/workflows/release.yml
  2. 1
      docker/Dockerfile
  3. 3
      docker/Makefile
  4. 65
      docker/local-k8s-compose.yml
  5. 12
      docker/seaweedfs.sql
  6. 2
      k8s/seaweedfs/values.yaml
  7. 2
      weed/Makefile
  8. 9
      weed/command/filer.go
  9. 28
      weed/command/scaffold.go
  10. 44
      weed/filer/cassandra/cassandra_store.go
  11. 63
      weed/filer/configuration.go
  12. 253
      weed/filer/filerstore.go
  13. 3
      weed/filer/filerstore_hardlink.go
  14. 161
      weed/filer/filerstore_translate_path.go
  15. 299
      weed/filer/filerstore_wrapper.go
  16. 4
      weed/filer/redis2/redis_cluster_store.go
  17. 4
      weed/filer/redis2/redis_store.go
  18. 27
      weed/filer/redis2/universal_redis_store.go
  19. 2
      weed/server/filer_server.go
  20. 2
      weed/shell/command_fs_configure.go
  21. 16
      weed/shell/command_s3_bucket_create.go
  22. 14
      weed/shell/command_s3_bucket_delete.go
  23. 12
      weed/shell/command_s3_bucket_list.go
  24. 2
      weed/util/constants.go

10
.github/workflows/release.yml

@ -12,8 +12,13 @@ jobs:
strategy: strategy:
matrix: matrix:
goos: [linux, windows, darwin, freebsd, netbsd, openbsd ] goos: [linux, windows, darwin, freebsd, netbsd, openbsd ]
goarch: ["386", amd64]
# goarch: ["386", amd64, arm]
goarch: ["386", amd64, arm]
exclude:
- goarch: arm
goos: darwin
- goarch: 386
goos: freebsd
steps: steps:
- name: Check out code into the Go module directory - name: Check out code into the Go module directory
@ -26,7 +31,6 @@ jobs:
tag: dev tag: dev
fail-if-no-assets: false fail-if-no-assets: false
assets: | assets: |
weed-large-disk-*
weed-* weed-*
- name: Set BUILD_TIME env - name: Set BUILD_TIME env

1
docker/Dockerfile

@ -12,6 +12,7 @@ RUN \
SUPERCRONIC=supercronic-linux-$ARCH && \ SUPERCRONIC=supercronic-linux-$ARCH && \
# Install SeaweedFS and Supercronic ( for cron job mode ) # Install SeaweedFS and Supercronic ( for cron job mode )
apk add --no-cache --virtual build-dependencies --update wget curl ca-certificates && \ apk add --no-cache --virtual build-dependencies --update wget curl ca-certificates && \
apk add fuse && \
wget -P /tmp https://github.com/$(curl -s -L https://github.com/chrislusf/seaweedfs/releases/latest | egrep -o "chrislusf/seaweedfs/releases/download/.*/linux_$ARCH.tar.gz") && \ wget -P /tmp https://github.com/$(curl -s -L https://github.com/chrislusf/seaweedfs/releases/latest | egrep -o "chrislusf/seaweedfs/releases/download/.*/linux_$ARCH.tar.gz") && \
tar -C /usr/bin/ -xzvf /tmp/linux_$ARCH.tar.gz && \ tar -C /usr/bin/ -xzvf /tmp/linux_$ARCH.tar.gz && \
curl -fsSLO "$SUPERCRONIC_URL" && \ curl -fsSLO "$SUPERCRONIC_URL" && \

3
docker/Makefile

@ -12,6 +12,9 @@ build:
dev: build dev: build
docker-compose -f local-dev-compose.yml -p seaweedfs up docker-compose -f local-dev-compose.yml -p seaweedfs up
k8s: build
docker-compose -f local-k8s-compose.yml -p seaweedfs up
dev_registry: build dev_registry: build
docker-compose -f local-registry-compose.yml -p seaweedfs up docker-compose -f local-registry-compose.yml -p seaweedfs up

65
docker/local-k8s-compose.yml

@ -0,0 +1,65 @@
version: '2'
services:
master:
image: chrislusf/seaweedfs:local
ports:
- 9333:9333
- 19333:19333
command: "master -ip=master"
volume:
image: chrislusf/seaweedfs:local
ports:
- 8080:8080
- 18080:18080
command: "volume -mserver=master:9333 -port=8080 -ip=volume"
depends_on:
- master
mysql:
image: percona/percona-server:5.7
ports:
- 3306:3306
volumes:
- ./seaweedfs.sql:/docker-entrypoint-initdb.d/seaweedfs.sql
environment:
- MYSQL_ROOT_PASSWORD=secret
- MYSQL_DATABASE=seaweedfs
- MYSQL_PASSWORD=secret
- MYSQL_USER=seaweedfs
filer:
image: chrislusf/seaweedfs:local
ports:
- 8888:8888
- 18888:18888
environment:
- WEED_MYSQL_HOSTNAME=mysql
- WEED_MYSQL_PORT=3306
- WEED_MYSQL_DATABASE=seaweedfs
- WEED_MYSQL_USERNAME=seaweedfs
- WEED_MYSQL_PASSWORD=secret
- WEED_MYSQL_ENABLED=true
- WEED_LEVELDB2_ENABLED=false
command: 'filer -master="master:9333"'
depends_on:
- master
- volume
- mysql
ingress:
image: jwilder/nginx-proxy
ports:
- "80:80"
volumes:
- /var/run/docker.sock:/tmp/docker.sock:ro
- /tmp/nginx:/etc/nginx/conf.d
s3:
image: chrislusf/seaweedfs:local
ports:
- 8333:8333
command: 's3 -filer="filer:8888"'
depends_on:
- master
- volume
- filer
environment:
- VIRTUAL_HOST=s3
- VIRTUAL_PORT=8333

12
docker/seaweedfs.sql

@ -0,0 +1,12 @@
CREATE DATABASE IF NOT EXISTS seaweedfs;
CREATE USER IF NOT EXISTS 'seaweedfs'@'%' IDENTIFIED BY 'secret';
GRANT ALL PRIVILEGES ON seaweedfs_fast.* TO 'seaweedfs'@'%';
FLUSH PRIVILEGES;
USE seaweedfs;
CREATE TABLE IF NOT EXISTS filemeta (
dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
name VARCHAR(1000) COMMENT 'directory or file name',
directory TEXT COMMENT 'full path to parent directory',
meta LONGBLOB,
PRIMARY KEY (dirhash, name)
) DEFAULT CHARSET=utf8;

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: "" registry: ""
repository: "" repository: ""
imageName: chrislusf/seaweedfs imageName: chrislusf/seaweedfs
imageTag: "2.15"
imageTag: "2.16"
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret imagePullSecrets: imagepullsecret
restartPolicy: Always restartPolicy: Always

2
weed/Makefile

@ -20,7 +20,7 @@ debug_mount:
debug_server: debug_server:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=/Volumes/mobile_disk/99 -filer -volume.port=8343 -s3 -volume.max=0
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=/Volumes/mobile_disk/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1
debug_volume: debug_volume:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"

9
weed/command/filer.go

@ -43,8 +43,6 @@ type FilerOptions struct {
peers *string peers *string
metricsHttpPort *int metricsHttpPort *int
cacheToFilerLimit *int cacheToFilerLimit *int
// default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string defaultLevelDbDirectory *string
} }
@ -67,6 +65,7 @@ func init() {
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list") f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
// start s3 on filer // start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@ -92,6 +91,7 @@ var cmdFiler = &Command{
GET /path/to/ GET /path/to/
The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order. The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
If the "filer.toml" is not found, an embedded filer store will be craeted under "-defaultStoreDir".
The example filer.toml configuration file can be generated by "weed scaffold -config=filer" The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
@ -127,10 +127,7 @@ func (fo *FilerOptions) startFiler() {
publicVolumeMux = http.NewServeMux() publicVolumeMux = http.NewServeMux()
} }
defaultLevelDbDirectory := "./filerldb2"
if fo.defaultLevelDbDirectory != nil {
defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
}
defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
var peers []string var peers []string
if *fo.peers != "" { if *fo.peers != "" {

28
weed/command/scaffold.go

@ -138,12 +138,16 @@ hosts=[
] ]
username="" username=""
password="" password=""
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
[redis2] [redis2]
enabled = false enabled = false
address = "localhost:6379" address = "localhost:6379"
password = "" password = ""
database = 0 database = 0
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
[redis_cluster2] [redis_cluster2]
enabled = false enabled = false
@ -160,6 +164,8 @@ password = ""
readOnly = true readOnly = true
# automatically use the closest Redis server for reads # automatically use the closest Redis server for reads
routeByLatency = true routeByLatency = true
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
[etcd] [etcd]
enabled = false enabled = false
@ -185,6 +191,28 @@ sniff_enabled = false
healthcheck_enabled = false healthcheck_enabled = false
# increase the value is recommend, be sure the value in Elastic is greater or equal here # increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000 index.max_result_window = 10000
##########################
##########################
# To add path-specific filer store:
#
# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp
# 2. Add a location configuraiton. E.g., location = "/tmp/"
# 3. Copy and customize all other configurations.
# Make sure they are not the same if using the same store type!
# 4. Set enabled to true
#
# The following is just using cassandra as an example
##########################
[redis2.tmp]
enabled = false
location = "/tmp/"
address = "localhost:6379"
password = ""
database = 1
` `
NOTIFICATION_TOML_EXAMPLE = ` NOTIFICATION_TOML_EXAMPLE = `

44
weed/filer/cassandra/cassandra_store.go

@ -16,8 +16,9 @@ func init() {
} }
type CassandraStore struct { type CassandraStore struct {
cluster *gocql.ClusterConfig
session *gocql.Session
cluster *gocql.ClusterConfig
session *gocql.Session
superLargeDirectoryHash map[string]string
} }
func (store *CassandraStore) GetName() string { func (store *CassandraStore) GetName() string {
@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix
configuration.GetStringSlice(prefix+"hosts"), configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"username"), configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"), configuration.GetString(prefix+"password"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
) )
} }
func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) {
func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
return
}
func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) {
store.cluster = gocql.NewCluster(hosts...) store.cluster = gocql.NewCluster(hosts...)
if username != "" && password != "" { if username != "" && password != "" {
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password} store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam
if err != nil { if err != nil {
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
} }
// set directory hash
store.superLargeDirectoryHash = make(map[string]string)
existingHash := make(map[string]string)
for _, dir := range superLargeDirectories {
// adding dir hash to avoid duplicated names
dirHash := util.Md5String([]byte(dir))[:4]
store.superLargeDirectoryHash[dir] = dirHash
if existingDir, found := existingHash[dirHash]; found {
glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
}
existingHash[dirHash] = dir
}
return return
} }
@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
dir, name = dirHash+name, ""
}
meta, err := entry.EncodeAttributesAndChunks() meta, err := entry.EncodeAttributesAndChunks()
if err != nil { if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err) return fmt.Errorf("encode %s: %s", entry.FullPath, err)
@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName() dir, name := fullpath.DirAndName()
if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
dir, name = dirHash+name, ""
}
var data []byte var data []byte
if err := store.session.Query( if err := store.session.Query(
"SELECT meta FROM filemeta WHERE directory=? AND name=?", "SELECT meta FROM filemeta WHERE directory=? AND name=?",
@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName() dir, name := fullpath.DirAndName()
if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
dir, name = dirHash+name, ""
}
if err := store.session.Query( if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=? AND name=?", "DELETE FROM filemeta WHERE directory=? AND name=?",
@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full
} }
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
}
if err := store.session.Query( if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=?", "DELETE FROM filemeta WHERE directory=?",
@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) { limit int) (entries []*filer.Entry, err error) {
if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
}
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
if inclusive { if inclusive {
cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"

63
weed/filer/configuration.go

@ -1,10 +1,11 @@
package filer package filer
import ( import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper" "github.com/spf13/viper"
"os"
"reflect"
"strings"
) )
var ( var (
@ -15,25 +16,67 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
validateOneEnabledStore(config) validateOneEnabledStore(config)
// load configuration for default filer store
hasDefaultStoreConfigured := false
for _, store := range Stores { for _, store := range Stores {
if config.GetBool(store.GetName() + ".enabled") { if config.GetBool(store.GetName() + ".enabled") {
store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore)
if err := store.Initialize(config, store.GetName()+"."); err != nil { if err := store.Initialize(config, store.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v",
store.GetName(), err)
glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
} }
f.SetStore(store) f.SetStore(store)
glog.V(0).Infof("Configure filer for %s", store.GetName())
return
glog.V(0).Infof("configured filer store to %s", store.GetName())
hasDefaultStoreConfigured = true
break
}
}
if !hasDefaultStoreConfigured {
println()
println("Supported filer stores are:")
for _, store := range Stores {
println(" " + store.GetName())
} }
os.Exit(-1)
} }
println()
println("Supported filer stores are:")
// load path-specific filer store here
// f.Store.AddPathSpecificStore(path, store)
storeNames := make(map[string]FilerStore)
for _, store := range Stores { for _, store := range Stores {
println(" " + store.GetName())
storeNames[store.GetName()] = store
}
allKeys := config.AllKeys()
for _, key := range allKeys {
if !strings.HasSuffix(key, ".enabled") {
continue
}
key = key[:len(key)-len(".enabled")]
if !strings.Contains(key, ".") {
continue
}
parts := strings.Split(key, ".")
storeName, storeId := parts[0], parts[1]
store, found := storeNames[storeName]
if !found {
continue
}
store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore)
if err := store.Initialize(config, key+"."); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v", key, err)
}
location := config.GetString(key + ".location")
if location == "" {
glog.Errorf("path-specific filer store needs %s", key+".location")
os.Exit(-1)
}
f.Store.AddPathSpecificStore(location, storeId, store)
glog.V(0).Infof("configure filer %s for %s", store.GetName(), location)
} }
os.Exit(-1)
} }
func validateOneEnabledStore(config *viper.Viper) { func validateOneEnabledStore(config *viper.Viper) {

253
weed/filer/filerstore.go

@ -3,19 +3,14 @@ package filer
import ( import (
"context" "context"
"errors" "errors"
"github.com/chrislusf/seaweedfs/weed/glog"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
var ( var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found")
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found")
) )
type FilerStore interface { type FilerStore interface {
@ -42,243 +37,3 @@ type FilerStore interface {
Shutdown() Shutdown()
} }
type VirtualFilerStore interface {
FilerStore
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error
}
type FilerStoreWrapper struct {
ActualStore FilerStore
}
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
if innerStore, ok := store.(*FilerStoreWrapper); ok {
return innerStore
}
return &FilerStoreWrapper{
ActualStore: store,
}
}
func (fsw *FilerStoreWrapper) GetName() string {
return fsw.ActualStore.GetName()
}
func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
return fsw.ActualStore.Initialize(configuration, prefix)
}
func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
if entry.Mime == "application/octet-stream" {
entry.Mime = ""
}
if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
return err
}
glog.V(4).Infof("InsertEntry %s", entry.FullPath)
return fsw.ActualStore.InsertEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
if entry.Mime == "application/octet-stream" {
entry.Mime = ""
}
if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
return err
}
glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
return fsw.ActualStore.UpdateEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("FindEntry %s", fp)
entry, err = fsw.ActualStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
return
}
func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
}()
existingEntry, findErr := fsw.FindEntry(ctx, fp)
if findErr == filer_pb.ErrNotFound {
return nil
}
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
glog.V(4).Infof("DeleteEntry %s", fp)
return fsw.ActualStore.DeleteEntry(ctx, fp)
}
func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
}()
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("DeleteFolderChildren %s", fp)
return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
}
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
for _, entry := range entries {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
}
return entries, err
}
func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
if err == ErrUnsupportedListDirectoryPrefixed {
entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
}
if err != nil {
return nil, err
}
for _, entry := range entries {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
}
return entries, nil
}
func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return
}
count := 0
var lastFileName string
notPrefixed := entries
entries = nil
for count < limit && len(notPrefixed) > 0 {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
if count >= limit {
break
}
}
}
if count < limit {
notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
if err != nil {
return
}
}
}
return
}
func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
return fsw.ActualStore.BeginTransaction(ctx)
}
func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
return fsw.ActualStore.CommitTransaction(ctx)
}
func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
return fsw.ActualStore.RollbackTransaction(ctx)
}
func (fsw *FilerStoreWrapper) Shutdown() {
fsw.ActualStore.Shutdown()
}
func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return fsw.ActualStore.KvPut(ctx, key, value)
}
func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
return fsw.ActualStore.KvGet(ctx, key)
}
func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
return fsw.ActualStore.KvDelete(ctx, key)
}

3
weed/filer/filerstore_hardlink.go

@ -19,7 +19,8 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
// check what is existing entry // check what is existing entry
glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath) glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
actualStore := fsw.getActualStore(entry.FullPath)
existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
if err != nil && err != filer_pb.ErrNotFound { if err != nil && err != filer_pb.ErrNotFound {
return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
} }

161
weed/filer/filerstore_translate_path.go

@ -0,0 +1,161 @@
package filer
import (
"context"
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
)
var (
_ = FilerStore(&FilerStorePathTranlator{})
)
type FilerStorePathTranlator struct {
actualStore FilerStore
storeRoot string
}
func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStorePathTranlator {
if innerStore, ok := store.(*FilerStorePathTranlator); ok {
return innerStore
}
if !strings.HasSuffix(storeRoot, "/") {
storeRoot += "/"
}
return &FilerStorePathTranlator{
actualStore: store,
storeRoot: storeRoot,
}
}
func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.FullPath) {
newPath = fp
if t.storeRoot == "/" {
return
}
newPath = fp[len(t.storeRoot)-1:]
if newPath == "" {
newPath = "/"
}
return
}
func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath util.FullPath) {
previousPath = entry.FullPath
if t.storeRoot == "/" {
return
}
entry.FullPath = t.translatePath(previousPath)
return
}
func (t *FilerStorePathTranlator) recoverEntryPath(entry *Entry, previousPath util.FullPath) {
entry.FullPath = previousPath
}
func (t *FilerStorePathTranlator) GetName() string {
return t.actualStore.GetName()
}
func (t *FilerStorePathTranlator) Initialize(configuration util.Configuration, prefix string) error {
return t.actualStore.Initialize(configuration, prefix)
}
func (t *FilerStorePathTranlator) InsertEntry(ctx context.Context, entry *Entry) error {
previousPath := t.changeEntryPath(entry)
defer t.recoverEntryPath(entry, previousPath)
return t.actualStore.InsertEntry(ctx, entry)
}
func (t *FilerStorePathTranlator) UpdateEntry(ctx context.Context, entry *Entry) error {
previousPath := t.changeEntryPath(entry)
defer t.recoverEntryPath(entry, previousPath)
return t.actualStore.UpdateEntry(ctx, entry)
}
func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
if t.storeRoot == "/" {
return t.actualStore.FindEntry(ctx, fp)
}
newFullPath := t.translatePath(fp)
entry, err = t.actualStore.FindEntry(ctx, newFullPath)
if err == nil {
entry.FullPath = fp[:len(t.storeRoot)-1] + entry.FullPath
}
return
}
func (t *FilerStorePathTranlator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
newFullPath := t.translatePath(fp)
return t.actualStore.DeleteEntry(ctx, newFullPath)
}
func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
previousPath := t.changeEntryPath(existingEntry)
defer t.recoverEntryPath(existingEntry, previousPath)
return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
newFullPath := t.translatePath(fp)
return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
}
func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
newFullPath := t.translatePath(dirPath)
entries, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
for _, entry := range entries {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
}
return entries, err
}
func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
newFullPath := t.translatePath(dirPath)
entries, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix)
if err != nil {
return nil, err
}
for _, entry := range entries {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
}
return entries, nil
}
func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) {
return t.actualStore.BeginTransaction(ctx)
}
func (t *FilerStorePathTranlator) CommitTransaction(ctx context.Context) error {
return t.actualStore.CommitTransaction(ctx)
}
func (t *FilerStorePathTranlator) RollbackTransaction(ctx context.Context) error {
return t.actualStore.RollbackTransaction(ctx)
}
func (t *FilerStorePathTranlator) Shutdown() {
t.actualStore.Shutdown()
}
func (t *FilerStorePathTranlator) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return t.actualStore.KvPut(ctx, key, value)
}
func (t *FilerStorePathTranlator) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
return t.actualStore.KvGet(ctx, key)
}
func (t *FilerStorePathTranlator) KvDelete(ctx context.Context, key []byte) (err error) {
return t.actualStore.KvDelete(ctx, key)
}

299
weed/filer/filerstore_wrapper.go

@ -0,0 +1,299 @@
package filer
import (
"context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/viant/ptrie"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
_ = VirtualFilerStore(&FilerStoreWrapper{})
)
type VirtualFilerStore interface {
FilerStore
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error
AddPathSpecificStore(path string, storeId string, store FilerStore)
}
type FilerStoreWrapper struct {
defaultStore FilerStore
pathToStore ptrie.Trie
storeIdToStore map[string]FilerStore
}
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
if innerStore, ok := store.(*FilerStoreWrapper); ok {
return innerStore
}
return &FilerStoreWrapper{
defaultStore: store,
pathToStore: ptrie.New(),
storeIdToStore: make(map[string]FilerStore),
}
}
func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
err := fsw.pathToStore.Put([]byte(path), storeId)
if err != nil {
glog.Fatalf("put path specific store: %v", err)
}
}
func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
store = fsw.defaultStore
if path == "/" {
return
}
var storeId string
fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
storeId = value.(string)
return false
})
if storeId != "" {
store = fsw.storeIdToStore[storeId]
}
return
}
func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
return fsw.defaultStore
}
func (fsw *FilerStoreWrapper) GetName() string {
return fsw.getDefaultStore().GetName()
}
func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
return fsw.getDefaultStore().Initialize(configuration, prefix)
}
func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
actualStore := fsw.getActualStore(entry.FullPath)
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
if entry.Mime == "application/octet-stream" {
entry.Mime = ""
}
if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
return err
}
glog.V(4).Infof("InsertEntry %s", entry.FullPath)
return actualStore.InsertEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
actualStore := fsw.getActualStore(entry.FullPath)
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
if entry.Mime == "application/octet-stream" {
entry.Mime = ""
}
if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
return err
}
glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
return actualStore.UpdateEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
actualStore := fsw.getActualStore(fp)
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("FindEntry %s", fp)
entry, err = actualStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
return
}
func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
actualStore := fsw.getActualStore(fp)
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
}()
existingEntry, findErr := fsw.FindEntry(ctx, fp)
if findErr == filer_pb.ErrNotFound {
return nil
}
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
glog.V(4).Infof("DeleteEntry %s", fp)
return actualStore.DeleteEntry(ctx, fp)
}
func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
actualStore := fsw.getActualStore(existingEntry.FullPath)
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
}()
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
actualStore := fsw.getActualStore(fp + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("DeleteFolderChildren %s", fp)
return actualStore.DeleteFolderChildren(ctx, fp)
}
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
for _, entry := range entries {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
}
return entries, err
}
func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
if err == ErrUnsupportedListDirectoryPrefixed {
entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
}
if err != nil {
return nil, err
}
for _, entry := range entries {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
}
return entries, nil
}
func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return
}
count := 0
var lastFileName string
notPrefixed := entries
entries = nil
for count < limit && len(notPrefixed) > 0 {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
if count >= limit {
break
}
}
}
if count < limit {
notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
if err != nil {
return
}
}
}
return
}
func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
return fsw.getDefaultStore().BeginTransaction(ctx)
}
func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
return fsw.getDefaultStore().CommitTransaction(ctx)
}
func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
return fsw.getDefaultStore().RollbackTransaction(ctx)
}
func (fsw *FilerStoreWrapper) Shutdown() {
fsw.getDefaultStore().Shutdown()
}
func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return fsw.getDefaultStore().KvPut(ctx, key, value)
}
func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
return fsw.getDefaultStore().KvGet(ctx, key)
}
func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
return fsw.getDefaultStore().KvDelete(ctx, key)
}

4
weed/filer/redis2/redis_cluster_store.go

@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr
configuration.GetString(prefix+"password"), configuration.GetString(prefix+"password"),
configuration.GetBool(prefix+"useReadOnly"), configuration.GetBool(prefix+"useReadOnly"),
configuration.GetBool(prefix+"routeByLatency"), configuration.GetBool(prefix+"routeByLatency"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
) )
} }
func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addresses, Addrs: addresses,
Password: password, Password: password,
ReadOnly: readOnly, ReadOnly: readOnly,
RouteByLatency: routeByLatency, RouteByLatency: routeByLatency,
}) })
store.loadSuperLargeDirectories(superLargeDirectories)
return return
} }

4
weed/filer/redis2/redis_store.go

@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st
configuration.GetString(prefix+"address"), configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"), configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"), configuration.GetInt(prefix+"database"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
) )
} }
func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
store.Client = redis.NewClient(&redis.Options{ store.Client = redis.NewClient(&redis.Options{
Addr: hostPort, Addr: hostPort,
Password: password, Password: password,
DB: database, DB: database,
}) })
store.loadSuperLargeDirectories(superLargeDirectories)
return return
} }

27
weed/filer/redis2/universal_redis_store.go

@ -18,7 +18,21 @@ const (
) )
type UniversalRedis2Store struct { type UniversalRedis2Store struct {
Client redis.UniversalClient
Client redis.UniversalClient
superLargeDirectoryHash map[string]bool
}
func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
_, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
return
}
func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) {
// set directory hash
store.superLargeDirectoryHash = make(map[string]bool)
for _, dir := range superLargeDirectories {
store.superLargeDirectoryHash[dir] = true
}
} }
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
@ -47,6 +61,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
} }
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
}
if name != "" { if name != "" {
if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
@ -96,6 +114,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
} }
dir, name := fullpath.DirAndName() dir, name := fullpath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
}
if name != "" { if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
if err != nil { if err != nil {
@ -108,6 +129,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
if store.isSuperLargeDirectory(string(fullpath)) {
return nil
}
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil { if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)

2
weed/server/filer_server.go

@ -109,6 +109,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
os.MkdirAll(option.DefaultLevelDbDir, 0755) os.MkdirAll(option.DefaultLevelDbDir, 0755)
} }
glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir) glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
} else {
glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
} }
util.LoadConfiguration("notification", false) util.LoadConfiguration("notification", false)

2
weed/shell/command_fs_configure.go

@ -88,7 +88,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
// check collection // check collection
if *collection != "" && strings.HasPrefix(*locationPrefix, "/buckets/") { if *collection != "" && strings.HasPrefix(*locationPrefix, "/buckets/") {
return fmt.Errorf("one s3 bucket goes to one collection and not customizable.")
return fmt.Errorf("one s3 bucket goes to one collection and not customizable")
} }
// check replication // check replication

16
weed/shell/command_bucket_create.go → weed/shell/command_s3_bucket_create.go

@ -12,29 +12,29 @@ import (
) )
func init() { func init() {
Commands = append(Commands, &commandBucketCreate{})
Commands = append(Commands, &commandS3BucketCreate{})
} }
type commandBucketCreate struct {
type commandS3BucketCreate struct {
} }
func (c *commandBucketCreate) Name() string {
return "bucket.create"
func (c *commandS3BucketCreate) Name() string {
return "s3.bucket.create"
} }
func (c *commandBucketCreate) Help() string {
func (c *commandS3BucketCreate) Help() string {
return `create a bucket with a given name return `create a bucket with a given name
Example: Example:
bucket.create -name <bucket_name> -replication 001
s3.bucket.create -name <bucket_name> -replication 001
` `
} }
func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
bucketName := bucketCommand.String("name", "", "bucket name") bucketName := bucketCommand.String("name", "", "bucket name")
replication := bucketCommand.String("replication", "", "replication setting for the bucket")
replication := bucketCommand.String("replication", "", "replication setting for the bucket, if not set it will honor the setting defined by the filer or master")
if err = bucketCommand.Parse(args); err != nil { if err = bucketCommand.Parse(args); err != nil {
return nil return nil
} }

14
weed/shell/command_bucket_delete.go → weed/shell/command_s3_bucket_delete.go

@ -9,24 +9,24 @@ import (
) )
func init() { func init() {
Commands = append(Commands, &commandBucketDelete{})
Commands = append(Commands, &commandS3BucketDelete{})
} }
type commandBucketDelete struct {
type commandS3BucketDelete struct {
} }
func (c *commandBucketDelete) Name() string {
return "bucket.delete"
func (c *commandS3BucketDelete) Name() string {
return "s3.bucket.delete"
} }
func (c *commandBucketDelete) Help() string {
func (c *commandS3BucketDelete) Help() string {
return `delete a bucket by a given name return `delete a bucket by a given name
bucket.delete -name <bucket_name>
s3.bucket.delete -name <bucket_name>
` `
} }
func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
bucketName := bucketCommand.String("name", "", "bucket name") bucketName := bucketCommand.String("name", "", "bucket name")

12
weed/shell/command_bucket_list.go → weed/shell/command_s3_bucket_list.go

@ -11,23 +11,23 @@ import (
) )
func init() { func init() {
Commands = append(Commands, &commandBucketList{})
Commands = append(Commands, &commandS3BucketList{})
} }
type commandBucketList struct {
type commandS3BucketList struct {
} }
func (c *commandBucketList) Name() string {
return "bucket.list"
func (c *commandS3BucketList) Name() string {
return "s3.bucket.list"
} }
func (c *commandBucketList) Help() string {
func (c *commandS3BucketList) Help() string {
return `list all buckets return `list all buckets
` `
} }
func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
if err = bucketCommand.Parse(args); err != nil { if err = bucketCommand.Parse(args); err != nil {

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
) )
var ( var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 15)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 16)
COMMIT = "" COMMIT = ""
) )

Loading…
Cancel
Save