diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 70c14487c..2e1e9c162 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -12,8 +12,13 @@ jobs: strategy: matrix: goos: [linux, windows, darwin, freebsd, netbsd, openbsd ] - goarch: ["386", amd64] - # goarch: ["386", amd64, arm] + goarch: ["386", amd64, arm] + exclude: + - goarch: arm + goos: darwin + - goarch: 386 + goos: freebsd + steps: - name: Check out code into the Go module directory @@ -26,7 +31,6 @@ jobs: tag: dev fail-if-no-assets: false assets: | - weed-large-disk-* weed-* - name: Set BUILD_TIME env diff --git a/docker/Dockerfile b/docker/Dockerfile index 7146b91c7..be7414d0b 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -12,6 +12,7 @@ RUN \ SUPERCRONIC=supercronic-linux-$ARCH && \ # Install SeaweedFS and Supercronic ( for cron job mode ) apk add --no-cache --virtual build-dependencies --update wget curl ca-certificates && \ + apk add fuse && \ wget -P /tmp https://github.com/$(curl -s -L https://github.com/chrislusf/seaweedfs/releases/latest | egrep -o "chrislusf/seaweedfs/releases/download/.*/linux_$ARCH.tar.gz") && \ tar -C /usr/bin/ -xzvf /tmp/linux_$ARCH.tar.gz && \ curl -fsSLO "$SUPERCRONIC_URL" && \ diff --git a/docker/Makefile b/docker/Makefile index 8ab83ca18..c2e9a12e7 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -12,6 +12,9 @@ build: dev: build docker-compose -f local-dev-compose.yml -p seaweedfs up +k8s: build + docker-compose -f local-k8s-compose.yml -p seaweedfs up + dev_registry: build docker-compose -f local-registry-compose.yml -p seaweedfs up diff --git a/docker/local-k8s-compose.yml b/docker/local-k8s-compose.yml new file mode 100644 index 000000000..0dda89ca4 --- /dev/null +++ b/docker/local-k8s-compose.yml @@ -0,0 +1,65 @@ +version: '2' + +services: + master: + image: chrislusf/seaweedfs:local + ports: + - 9333:9333 + - 19333:19333 + command: "master -ip=master" + volume: + image: chrislusf/seaweedfs:local + ports: + - 8080:8080 + - 18080:18080 + command: "volume -mserver=master:9333 -port=8080 -ip=volume" + depends_on: + - master + mysql: + image: percona/percona-server:5.7 + ports: + - 3306:3306 + volumes: + - ./seaweedfs.sql:/docker-entrypoint-initdb.d/seaweedfs.sql + environment: + - MYSQL_ROOT_PASSWORD=secret + - MYSQL_DATABASE=seaweedfs + - MYSQL_PASSWORD=secret + - MYSQL_USER=seaweedfs + filer: + image: chrislusf/seaweedfs:local + ports: + - 8888:8888 + - 18888:18888 + environment: + - WEED_MYSQL_HOSTNAME=mysql + - WEED_MYSQL_PORT=3306 + - WEED_MYSQL_DATABASE=seaweedfs + - WEED_MYSQL_USERNAME=seaweedfs + - WEED_MYSQL_PASSWORD=secret + - WEED_MYSQL_ENABLED=true + - WEED_LEVELDB2_ENABLED=false + command: 'filer -master="master:9333"' + depends_on: + - master + - volume + - mysql + ingress: + image: jwilder/nginx-proxy + ports: + - "80:80" + volumes: + - /var/run/docker.sock:/tmp/docker.sock:ro + - /tmp/nginx:/etc/nginx/conf.d + s3: + image: chrislusf/seaweedfs:local + ports: + - 8333:8333 + command: 's3 -filer="filer:8888"' + depends_on: + - master + - volume + - filer + environment: + - VIRTUAL_HOST=s3 + - VIRTUAL_PORT=8333 \ No newline at end of file diff --git a/docker/seaweedfs.sql b/docker/seaweedfs.sql new file mode 100644 index 000000000..38ebc575c --- /dev/null +++ b/docker/seaweedfs.sql @@ -0,0 +1,12 @@ +CREATE DATABASE IF NOT EXISTS seaweedfs; +CREATE USER IF NOT EXISTS 'seaweedfs'@'%' IDENTIFIED BY 'secret'; +GRANT ALL PRIVILEGES ON seaweedfs_fast.* TO 'seaweedfs'@'%'; +FLUSH PRIVILEGES; +USE seaweedfs; +CREATE TABLE IF NOT EXISTS filemeta ( + dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', + name VARCHAR(1000) COMMENT 'directory or file name', + directory TEXT COMMENT 'full path to parent directory', + meta LONGBLOB, + PRIMARY KEY (dirhash, name) +) DEFAULT CHARSET=utf8; \ No newline at end of file diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 72bdb684a..7667d2815 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "2.15" + imageTag: "2.16" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/Makefile b/weed/Makefile index 12b8e8173..fd0843c22 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -20,7 +20,7 @@ debug_mount: debug_server: go build -gcflags="all=-N -l" - dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=/Volumes/mobile_disk/99 -filer -volume.port=8343 -s3 -volume.max=0 + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=/Volumes/mobile_disk/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 debug_volume: go build -gcflags="all=-N -l" diff --git a/weed/command/filer.go b/weed/command/filer.go index e72056893..a3008eb29 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -43,8 +43,6 @@ type FilerOptions struct { peers *string metricsHttpPort *int cacheToFilerLimit *int - - // default leveldb directory, used in "weed server" mode defaultLevelDbDirectory *string } @@ -67,6 +65,7 @@ func init() { f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") + f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -92,6 +91,7 @@ var cmdFiler = &Command{ GET /path/to/ The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order. + If the "filer.toml" is not found, an embedded filer store will be craeted under "-defaultStoreDir". The example filer.toml configuration file can be generated by "weed scaffold -config=filer" @@ -127,10 +127,7 @@ func (fo *FilerOptions) startFiler() { publicVolumeMux = http.NewServeMux() } - defaultLevelDbDirectory := "./filerldb2" - if fo.defaultLevelDbDirectory != nil { - defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") - } + defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") var peers []string if *fo.peers != "" { diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 82410f6d9..04a988027 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -138,12 +138,16 @@ hosts=[ ] username="" password="" +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [redis2] enabled = false address = "localhost:6379" password = "" database = 0 +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [redis_cluster2] enabled = false @@ -160,6 +164,8 @@ password = "" readOnly = true # automatically use the closest Redis server for reads routeByLatency = true +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [etcd] enabled = false @@ -185,6 +191,28 @@ sniff_enabled = false healthcheck_enabled = false # increase the value is recommend, be sure the value in Elastic is greater or equal here index.max_result_window = 10000 + + + +########################## +########################## +# To add path-specific filer store: +# +# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp +# 2. Add a location configuraiton. E.g., location = "/tmp/" +# 3. Copy and customize all other configurations. +# Make sure they are not the same if using the same store type! +# 4. Set enabled to true +# +# The following is just using cassandra as an example +########################## +[redis2.tmp] +enabled = false +location = "/tmp/" +address = "localhost:6379" +password = "" +database = 1 + ` NOTIFICATION_TOML_EXAMPLE = ` diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index ae8cb7a86..49f5625d9 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -16,8 +16,9 @@ func init() { } type CassandraStore struct { - cluster *gocql.ClusterConfig - session *gocql.Session + cluster *gocql.ClusterConfig + session *gocql.Session + superLargeDirectoryHash map[string]string } func (store *CassandraStore) GetName() string { @@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix configuration.GetStringSlice(prefix+"hosts"), configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) { +func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) { + dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) { store.cluster = gocql.NewCluster(hosts...) if username != "" && password != "" { store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password} @@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam if err != nil { glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) } + + // set directory hash + store.superLargeDirectoryHash = make(map[string]string) + existingHash := make(map[string]string) + for _, dir := range superLargeDirectories { + // adding dir hash to avoid duplicated names + dirHash := util.Md5String([]byte(dir))[:4] + store.superLargeDirectoryHash[dir] = dirHash + if existingDir, found := existingHash[dirHash]; found { + glog.Fatalf("directory %s has the same hash as %s", dir, existingDir) + } + existingHash[dirHash] = dir + } return } @@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error { func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { dir, name := entry.FullPath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } + meta, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encode %s: %s", entry.FullPath, err) @@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { dir, name := fullpath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } + var data []byte if err := store.session.Query( "SELECT meta FROM filemeta WHERE directory=? AND name=?", @@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { dir, name := fullpath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } if err := store.session.Query( "DELETE FROM filemeta WHERE directory=? AND name=?", @@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full } func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { + return nil // filer.ErrUnsupportedSuperLargeDirectoryListing + } if err := store.session.Query( "DELETE FROM filemeta WHERE directory=?", @@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { + if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { + return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing + } + cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" if inclusive { cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go index 3dce67d6d..a6f18709e 100644 --- a/weed/filer/configuration.go +++ b/weed/filer/configuration.go @@ -1,10 +1,11 @@ package filer import ( - "os" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" + "os" + "reflect" + "strings" ) var ( @@ -15,25 +16,67 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) { validateOneEnabledStore(config) + // load configuration for default filer store + hasDefaultStoreConfigured := false for _, store := range Stores { if config.GetBool(store.GetName() + ".enabled") { + store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore) if err := store.Initialize(config, store.GetName()+"."); err != nil { - glog.Fatalf("Failed to initialize store for %s: %+v", - store.GetName(), err) + glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err) } f.SetStore(store) - glog.V(0).Infof("Configure filer for %s", store.GetName()) - return + glog.V(0).Infof("configured filer store to %s", store.GetName()) + hasDefaultStoreConfigured = true + break + } + } + + if !hasDefaultStoreConfigured { + println() + println("Supported filer stores are:") + for _, store := range Stores { + println(" " + store.GetName()) } + os.Exit(-1) } - println() - println("Supported filer stores are:") + // load path-specific filer store here + // f.Store.AddPathSpecificStore(path, store) + storeNames := make(map[string]FilerStore) for _, store := range Stores { - println(" " + store.GetName()) + storeNames[store.GetName()] = store + } + allKeys := config.AllKeys() + for _, key := range allKeys { + if !strings.HasSuffix(key, ".enabled") { + continue + } + key = key[:len(key)-len(".enabled")] + if !strings.Contains(key, ".") { + continue + } + + parts := strings.Split(key, ".") + storeName, storeId := parts[0], parts[1] + + store, found := storeNames[storeName] + if !found { + continue + } + store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore) + if err := store.Initialize(config, key+"."); err != nil { + glog.Fatalf("Failed to initialize store for %s: %+v", key, err) + } + location := config.GetString(key + ".location") + if location == "" { + glog.Errorf("path-specific filer store needs %s", key+".location") + os.Exit(-1) + } + f.Store.AddPathSpecificStore(location, storeId, store) + + glog.V(0).Infof("configure filer %s for %s", store.GetName(), location) } - os.Exit(-1) } func validateOneEnabledStore(config *viper.Viper) { diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 4b28c3021..f1e6c6c35 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -3,19 +3,14 @@ package filer import ( "context" "errors" - "github.com/chrislusf/seaweedfs/weed/glog" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) var ( - ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") - ErrKvNotImplemented = errors.New("kv not implemented yet") - ErrKvNotFound = errors.New("kv: not found") + ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") + ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") + ErrKvNotImplemented = errors.New("kv not implemented yet") + ErrKvNotFound = errors.New("kv: not found") ) type FilerStore interface { @@ -42,243 +37,3 @@ type FilerStore interface { Shutdown() } - -type VirtualFilerStore interface { - FilerStore - DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error - DeleteOneEntry(ctx context.Context, entry *Entry) error -} - -type FilerStoreWrapper struct { - ActualStore FilerStore -} - -func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { - if innerStore, ok := store.(*FilerStoreWrapper); ok { - return innerStore - } - return &FilerStoreWrapper{ - ActualStore: store, - } -} - -func (fsw *FilerStoreWrapper) GetName() string { - return fsw.ActualStore.GetName() -} - -func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { - return fsw.ActualStore.Initialize(configuration, prefix) -} - -func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - if entry.Mime == "application/octet-stream" { - entry.Mime = "" - } - - if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { - return err - } - - glog.V(4).Infof("InsertEntry %s", entry.FullPath) - return fsw.ActualStore.InsertEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - if entry.Mime == "application/octet-stream" { - entry.Mime = "" - } - - if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { - return err - } - - glog.V(4).Infof("UpdateEntry %s", entry.FullPath) - return fsw.ActualStore.UpdateEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds()) - }() - - glog.V(4).Infof("FindEntry %s", fp) - entry, err = fsw.ActualStore.FindEntry(ctx, fp) - if err != nil { - return nil, err - } - - fsw.maybeReadHardLink(ctx, entry) - - filer_pb.AfterEntryDeserialization(entry.Chunks) - return -} - -func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) - }() - - existingEntry, findErr := fsw.FindEntry(ctx, fp) - if findErr == filer_pb.ErrNotFound { - return nil - } - if len(existingEntry.HardLinkId) != 0 { - // remove hard link - glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) - if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { - return err - } - } - - glog.V(4).Infof("DeleteEntry %s", fp) - return fsw.ActualStore.DeleteEntry(ctx, fp) -} - -func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) - }() - - if len(existingEntry.HardLinkId) != 0 { - // remove hard link - glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) - if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { - return err - } - } - - glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) - return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath) -} - -func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) - }() - - glog.V(4).Infof("DeleteFolderChildren %s", fp) - return fsw.ActualStore.DeleteFolderChildren(ctx, fp) -} - -func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds()) - }() - - glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) - entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - for _, entry := range entries { - fsw.maybeReadHardLink(ctx, entry) - filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, err -} - -func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) - }() - glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) - entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) - if err == ErrUnsupportedListDirectoryPrefixed { - entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) - } - if err != nil { - return nil, err - } - for _, entry := range entries { - fsw.maybeReadHardLink(ctx, entry) - filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, nil -} - -func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) { - entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - - if prefix == "" { - return - } - - count := 0 - var lastFileName string - notPrefixed := entries - entries = nil - for count < limit && len(notPrefixed) > 0 { - for _, entry := range notPrefixed { - lastFileName = entry.Name() - if strings.HasPrefix(entry.Name(), prefix) { - count++ - entries = append(entries, entry) - if count >= limit { - break - } - } - } - if count < limit { - notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit) - if err != nil { - return - } - } - } - return -} - -func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { - return fsw.ActualStore.BeginTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { - return fsw.ActualStore.CommitTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { - return fsw.ActualStore.RollbackTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) Shutdown() { - fsw.ActualStore.Shutdown() -} - -func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - return fsw.ActualStore.KvPut(ctx, key, value) -} -func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - return fsw.ActualStore.KvGet(ctx, key) -} -func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { - return fsw.ActualStore.KvDelete(ctx, key) -} diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go index c6b3734b0..316c76a0c 100644 --- a/weed/filer/filerstore_hardlink.go +++ b/weed/filer/filerstore_hardlink.go @@ -19,7 +19,8 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry // check what is existing entry glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath) - existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) + actualStore := fsw.getActualStore(entry.FullPath) + existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath) if err != nil && err != filer_pb.ErrNotFound { return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) } diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go new file mode 100644 index 000000000..ea0f9db77 --- /dev/null +++ b/weed/filer/filerstore_translate_path.go @@ -0,0 +1,161 @@ +package filer + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/util" + "strings" +) + +var ( + _ = FilerStore(&FilerStorePathTranlator{}) +) + +type FilerStorePathTranlator struct { + actualStore FilerStore + storeRoot string +} + +func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStorePathTranlator { + if innerStore, ok := store.(*FilerStorePathTranlator); ok { + return innerStore + } + + if !strings.HasSuffix(storeRoot, "/") { + storeRoot += "/" + } + + return &FilerStorePathTranlator{ + actualStore: store, + storeRoot: storeRoot, + } +} + +func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.FullPath) { + newPath = fp + if t.storeRoot == "/" { + return + } + newPath = fp[len(t.storeRoot)-1:] + if newPath == "" { + newPath = "/" + } + return +} +func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath util.FullPath) { + previousPath = entry.FullPath + if t.storeRoot == "/" { + return + } + entry.FullPath = t.translatePath(previousPath) + return +} +func (t *FilerStorePathTranlator) recoverEntryPath(entry *Entry, previousPath util.FullPath) { + entry.FullPath = previousPath +} + +func (t *FilerStorePathTranlator) GetName() string { + return t.actualStore.GetName() +} + +func (t *FilerStorePathTranlator) Initialize(configuration util.Configuration, prefix string) error { + return t.actualStore.Initialize(configuration, prefix) +} + +func (t *FilerStorePathTranlator) InsertEntry(ctx context.Context, entry *Entry) error { + previousPath := t.changeEntryPath(entry) + defer t.recoverEntryPath(entry, previousPath) + + return t.actualStore.InsertEntry(ctx, entry) +} + +func (t *FilerStorePathTranlator) UpdateEntry(ctx context.Context, entry *Entry) error { + previousPath := t.changeEntryPath(entry) + defer t.recoverEntryPath(entry, previousPath) + + return t.actualStore.UpdateEntry(ctx, entry) +} + +func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { + if t.storeRoot == "/" { + return t.actualStore.FindEntry(ctx, fp) + } + newFullPath := t.translatePath(fp) + entry, err = t.actualStore.FindEntry(ctx, newFullPath) + if err == nil { + entry.FullPath = fp[:len(t.storeRoot)-1] + entry.FullPath + } + return +} + +func (t *FilerStorePathTranlator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + newFullPath := t.translatePath(fp) + return t.actualStore.DeleteEntry(ctx, newFullPath) +} + +func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { + + previousPath := t.changeEntryPath(existingEntry) + defer t.recoverEntryPath(existingEntry, previousPath) + + return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath) +} + +func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + newFullPath := t.translatePath(fp) + + return t.actualStore.DeleteFolderChildren(ctx, newFullPath) +} + +func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { + + newFullPath := t.translatePath(dirPath) + + entries, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath + } + return entries, err +} + +func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { + + newFullPath := t.translatePath(dirPath) + + entries, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix) + if err != nil { + return nil, err + } + for _, entry := range entries { + entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath + } + return entries, nil +} + +func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) { + return t.actualStore.BeginTransaction(ctx) +} + +func (t *FilerStorePathTranlator) CommitTransaction(ctx context.Context) error { + return t.actualStore.CommitTransaction(ctx) +} + +func (t *FilerStorePathTranlator) RollbackTransaction(ctx context.Context) error { + return t.actualStore.RollbackTransaction(ctx) +} + +func (t *FilerStorePathTranlator) Shutdown() { + t.actualStore.Shutdown() +} + +func (t *FilerStorePathTranlator) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + return t.actualStore.KvPut(ctx, key, value) +} +func (t *FilerStorePathTranlator) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + return t.actualStore.KvGet(ctx, key) +} +func (t *FilerStorePathTranlator) KvDelete(ctx context.Context, key []byte) (err error) { + return t.actualStore.KvDelete(ctx, key) +} diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go new file mode 100644 index 000000000..3206d5ba4 --- /dev/null +++ b/weed/filer/filerstore_wrapper.go @@ -0,0 +1,299 @@ +package filer + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/viant/ptrie" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + _ = VirtualFilerStore(&FilerStoreWrapper{}) +) + +type VirtualFilerStore interface { + FilerStore + DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error + DeleteOneEntry(ctx context.Context, entry *Entry) error + AddPathSpecificStore(path string, storeId string, store FilerStore) +} + +type FilerStoreWrapper struct { + defaultStore FilerStore + pathToStore ptrie.Trie + storeIdToStore map[string]FilerStore +} + +func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { + if innerStore, ok := store.(*FilerStoreWrapper); ok { + return innerStore + } + return &FilerStoreWrapper{ + defaultStore: store, + pathToStore: ptrie.New(), + storeIdToStore: make(map[string]FilerStore), + } +} + +func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) { + fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store) + err := fsw.pathToStore.Put([]byte(path), storeId) + if err != nil { + glog.Fatalf("put path specific store: %v", err) + } +} + +func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) { + store = fsw.defaultStore + if path == "/" { + return + } + var storeId string + fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool { + storeId = value.(string) + return false + }) + if storeId != "" { + store = fsw.storeIdToStore[storeId] + } + return +} + +func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) { + return fsw.defaultStore +} + +func (fsw *FilerStoreWrapper) GetName() string { + return fsw.getDefaultStore().GetName() +} + +func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { + return fsw.getDefaultStore().Initialize(configuration, prefix) +} + +func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { + actualStore := fsw.getActualStore(entry.FullPath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } + + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err + } + + glog.V(4).Infof("InsertEntry %s", entry.FullPath) + return actualStore.InsertEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { + actualStore := fsw.getActualStore(entry.FullPath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } + + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err + } + + glog.V(4).Infof("UpdateEntry %s", entry.FullPath) + return actualStore.UpdateEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { + actualStore := fsw.getActualStore(fp) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("FindEntry %s", fp) + entry, err = actualStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + + fsw.maybeReadHardLink(ctx, entry) + + filer_pb.AfterEntryDeserialization(entry.Chunks) + return +} + +func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + actualStore := fsw.getActualStore(fp) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + existingEntry, findErr := fsw.FindEntry(ctx, fp) + if findErr == filer_pb.ErrNotFound { + return nil + } + if len(existingEntry.HardLinkId) != 0 { + // remove hard link + glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + + glog.V(4).Infof("DeleteEntry %s", fp) + return actualStore.DeleteEntry(ctx, fp) +} + +func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { + actualStore := fsw.getActualStore(existingEntry.FullPath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + if len(existingEntry.HardLinkId) != 0 { + // remove hard link + glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + + glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) + return actualStore.DeleteEntry(ctx, existingEntry.FullPath) +} + +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + actualStore := fsw.getActualStore(fp + "/") + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("DeleteFolderChildren %s", fp) + return actualStore.DeleteFolderChildren(ctx, fp) +} + +func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { + actualStore := fsw.getActualStore(dirPath + "/") + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) + entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, err +} + +func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { + actualStore := fsw.getActualStore(dirPath + "/") + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) + }() + glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) + entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) + if err == ErrUnsupportedListDirectoryPrefixed { + entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) + } + if err != nil { + return nil, err + } + for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, nil +} + +func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) { + actualStore := fsw.getActualStore(dirPath + "/") + entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + + if prefix == "" { + return + } + + count := 0 + var lastFileName string + notPrefixed := entries + entries = nil + for count < limit && len(notPrefixed) > 0 { + for _, entry := range notPrefixed { + lastFileName = entry.Name() + if strings.HasPrefix(entry.Name(), prefix) { + count++ + entries = append(entries, entry) + if count >= limit { + break + } + } + } + if count < limit { + notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit) + if err != nil { + return + } + } + } + return +} + +func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { + return fsw.getDefaultStore().BeginTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { + return fsw.getDefaultStore().CommitTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { + return fsw.getDefaultStore().RollbackTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) Shutdown() { + fsw.getDefaultStore().Shutdown() +} + +func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + return fsw.getDefaultStore().KvPut(ctx, key, value) +} +func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + return fsw.getDefaultStore().KvGet(ctx, key) +} +func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { + return fsw.getDefaultStore().KvDelete(ctx, key) +} diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go index d155dbe88..c7742bb19 100644 --- a/weed/filer/redis2/redis_cluster_store.go +++ b/weed/filer/redis2/redis_cluster_store.go @@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr configuration.GetString(prefix+"password"), configuration.GetBool(prefix+"useReadOnly"), configuration.GetBool(prefix+"routeByLatency"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { +func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { store.Client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: addresses, Password: password, ReadOnly: readOnly, RouteByLatency: routeByLatency, }) + store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go index ed04c817b..da404ed4c 100644 --- a/weed/filer/redis2/redis_store.go +++ b/weed/filer/redis2/redis_store.go @@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st configuration.GetString(prefix+"address"), configuration.GetString(prefix+"password"), configuration.GetInt(prefix+"database"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) { +func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) { store.Client = redis.NewClient(&redis.Options{ Addr: hostPort, Password: password, DB: database, }) + store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 0374314c0..00d02ea14 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -18,7 +18,21 @@ const ( ) type UniversalRedis2Store struct { - Client redis.UniversalClient + Client redis.UniversalClient + superLargeDirectoryHash map[string]bool +} + +func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) { + _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) { + // set directory hash + store.superLargeDirectoryHash = make(map[string]bool) + for _, dir := range superLargeDirectories { + store.superLargeDirectoryHash[dir] = true + } } func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { @@ -47,6 +61,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer } dir, name := entry.FullPath.DirAndName() + if store.isSuperLargeDirectory(dir) { + return nil + } + if name != "" { if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) @@ -96,6 +114,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti } dir, name := fullpath.DirAndName() + if store.isSuperLargeDirectory(dir) { + return nil + } if name != "" { _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() if err != nil { @@ -108,6 +129,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + if store.isSuperLargeDirectory(string(fullpath)) { + return nil + } + members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() if err != nil { return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index d04053df5..2991d14ab 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -109,6 +109,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) os.MkdirAll(option.DefaultLevelDbDir, 0755) } glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir) + } else { + glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir) } util.LoadConfiguration("notification", false) diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index a1e8ca581..a891c1b47 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -88,7 +88,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io // check collection if *collection != "" && strings.HasPrefix(*locationPrefix, "/buckets/") { - return fmt.Errorf("one s3 bucket goes to one collection and not customizable.") + return fmt.Errorf("one s3 bucket goes to one collection and not customizable") } // check replication diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_s3_bucket_create.go similarity index 75% rename from weed/shell/command_bucket_create.go rename to weed/shell/command_s3_bucket_create.go index 52d96e4c3..28cf1d945 100644 --- a/weed/shell/command_bucket_create.go +++ b/weed/shell/command_s3_bucket_create.go @@ -12,29 +12,29 @@ import ( ) func init() { - Commands = append(Commands, &commandBucketCreate{}) + Commands = append(Commands, &commandS3BucketCreate{}) } -type commandBucketCreate struct { +type commandS3BucketCreate struct { } -func (c *commandBucketCreate) Name() string { - return "bucket.create" +func (c *commandS3BucketCreate) Name() string { + return "s3.bucket.create" } -func (c *commandBucketCreate) Help() string { +func (c *commandS3BucketCreate) Help() string { return `create a bucket with a given name Example: - bucket.create -name -replication 001 + s3.bucket.create -name -replication 001 ` } -func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) bucketName := bucketCommand.String("name", "", "bucket name") - replication := bucketCommand.String("replication", "", "replication setting for the bucket") + replication := bucketCommand.String("replication", "", "replication setting for the bucket, if not set it will honor the setting defined by the filer or master") if err = bucketCommand.Parse(args); err != nil { return nil } diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go similarity index 69% rename from weed/shell/command_bucket_delete.go rename to weed/shell/command_s3_bucket_delete.go index 02790b9e2..a8d8c5c29 100644 --- a/weed/shell/command_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -9,24 +9,24 @@ import ( ) func init() { - Commands = append(Commands, &commandBucketDelete{}) + Commands = append(Commands, &commandS3BucketDelete{}) } -type commandBucketDelete struct { +type commandS3BucketDelete struct { } -func (c *commandBucketDelete) Name() string { - return "bucket.delete" +func (c *commandS3BucketDelete) Name() string { + return "s3.bucket.delete" } -func (c *commandBucketDelete) Help() string { +func (c *commandS3BucketDelete) Help() string { return `delete a bucket by a given name - bucket.delete -name + s3.bucket.delete -name ` } -func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) bucketName := bucketCommand.String("name", "", "bucket name") diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_s3_bucket_list.go similarity index 83% rename from weed/shell/command_bucket_list.go rename to weed/shell/command_s3_bucket_list.go index 2e446b6b2..4acf9a866 100644 --- a/weed/shell/command_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -11,23 +11,23 @@ import ( ) func init() { - Commands = append(Commands, &commandBucketList{}) + Commands = append(Commands, &commandS3BucketList{}) } -type commandBucketList struct { +type commandS3BucketList struct { } -func (c *commandBucketList) Name() string { - return "bucket.list" +func (c *commandS3BucketList) Name() string { + return "s3.bucket.list" } -func (c *commandBucketList) Help() string { +func (c *commandS3BucketList) Help() string { return `list all buckets ` } -func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) if err = bucketCommand.Parse(args); err != nil { diff --git a/weed/util/constants.go b/weed/util/constants.go index 52ba08494..89155e9a2 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 15) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 16) COMMIT = "" )