Browse Source

initial commit

pull/5580/head
Konstantin Lebedev 9 months ago
parent
commit
f7cea8a4f9
  1. 23
      other/java/client/src/main/proto/filer.proto
  2. 1
      weed/command/filer.go
  3. 3
      weed/command/s3.go
  4. 1
      weed/command/server.go
  5. 5
      weed/filer/abstract_sql/abstract_sql_store.go
  6. 4
      weed/filer/arangodb/arangodb_store.go
  7. 4
      weed/filer/cassandra/cassandra_store.go
  8. 4
      weed/filer/elastic/v7/elastic_store.go
  9. 4
      weed/filer/etcd/etcd_store.go
  10. 4
      weed/filer/filer.go
  11. 2
      weed/filer/filerstore.go
  12. 10
      weed/filer/filerstore_translate_path.go
  13. 6
      weed/filer/filerstore_wrapper.go
  14. 4
      weed/filer/hbase/hbase_store.go
  15. 4
      weed/filer/leveldb/leveldb_store.go
  16. 4
      weed/filer/leveldb2/leveldb2_store.go
  17. 4
      weed/filer/leveldb3/leveldb3_store.go
  18. 4
      weed/filer/mongodb/mongodb_store.go
  19. 4
      weed/filer/mysql/mysql_sql_gen.go
  20. 4
      weed/filer/postgres/postgres_sql_gen.go
  21. 4
      weed/filer/redis/universal_redis_store.go
  22. 4
      weed/filer/redis2/universal_redis_store.go
  23. 4
      weed/filer/redis3/universal_redis_store.go
  24. 4
      weed/filer/redis_lua/universal_redis_store.go
  25. 4
      weed/filer/rocksdb/rocksdb_store.go
  26. 4
      weed/filer/tikv/tikv_store.go
  27. 4
      weed/filer/ydb/ydb_store.go
  28. 27
      weed/pb/filer.proto
  29. 1290
      weed/pb/filer_pb/filer.pb.go
  30. 2
      weed/pb/filer_pb/filer_grpc.pb.go
  31. 41
      weed/s3api/s3api_object_handlers_list.go
  32. 1
      weed/s3api/s3api_server.go

23
other/java/client/src/main/proto/filer.proto

@ -97,6 +97,7 @@ message ListEntriesRequest {
string startFromFileName = 3;
bool inclusiveStartFrom = 4;
uint32 limit = 5;
bool recursive = 6;
}
message ListEntriesResponse {
@ -110,6 +111,7 @@ message RemoteEntry {
int64 remote_mtime = 4;
int64 remote_size = 5;
}
message Entry {
string name = 1;
bool is_directory = 2;
@ -283,6 +285,7 @@ message Location {
uint32 grpc_port = 3;
string data_center = 4;
}
message LookupVolumeResponse {
map<string, Locations> locations_map = 1;
}
@ -290,13 +293,16 @@ message LookupVolumeResponse {
message Collection {
string name = 1;
}
message CollectionListRequest {
bool include_normal_volumes = 1;
bool include_ec_volumes = 2;
}
message CollectionListResponse {
repeated Collection collections = 1;
}
message DeleteCollectionRequest {
string collection = 1;
}
@ -310,6 +316,7 @@ message StatisticsRequest {
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
uint64 total_size = 4;
uint64 used_size = 5;
@ -320,6 +327,7 @@ message PingRequest {
string target = 1; // default to ping itself
string target_type = 2;
}
message PingResponse {
int64 start_time_ns = 1;
int64 remote_time_ns = 2;
@ -328,6 +336,7 @@ message PingResponse {
message GetFilerConfigurationRequest {
}
message GetFilerConfigurationResponse {
repeated string masters = 1;
string replication = 2;
@ -354,6 +363,7 @@ message SubscribeMetadataRequest {
int32 client_epoch = 9;
repeated string directories = 10; // exact directory to watch
}
message SubscribeMetadataResponse {
string directory = 1;
EventNotification event_notification = 2;
@ -372,6 +382,7 @@ message KeepConnectedRequest {
uint32 grpc_port = 2;
repeated string resources = 3;
}
message KeepConnectedResponse {
}
@ -396,14 +407,17 @@ message LocateBrokerResponse {
message KvGetRequest {
bytes key = 1;
}
message KvGetResponse {
bytes value = 1;
string error = 2;
}
message KvPutRequest {
bytes key = 1;
bytes value = 2;
}
message KvPutResponse {
string error = 1;
}
@ -437,6 +451,7 @@ message CacheRemoteObjectToLocalClusterRequest {
string directory = 1;
string name = 2;
}
message CacheRemoteObjectToLocalClusterResponse {
Entry entry = 1;
}
@ -451,36 +466,44 @@ message LockRequest {
bool is_moved = 4;
string owner = 5;
}
message LockResponse {
string renew_token = 1;
string lock_owner = 2;
string lock_host_moved_to = 3;
string error = 4;
}
message UnlockRequest {
string name = 1;
string renew_token = 2;
bool is_moved = 3;
}
message UnlockResponse {
string error = 1;
string moved_to = 2;
}
message FindLockOwnerRequest {
string name = 1;
bool is_moved = 2;
}
message FindLockOwnerResponse {
string owner = 1;
}
message Lock {
string name = 1;
string renew_token = 2;
int64 expired_at_ns = 3;
string owner = 4;
}
message TransferLocksRequest {
repeated Lock locks = 1;
}
message TransferLocksResponse {
}

1
weed/command/filer.go

@ -110,6 +110,7 @@ func init() {
filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
filerS3Options.allowListRecursive = cmdFiler.Flag.Bool("s3.allowListRecursive", false, "allows recursive listing of directories by prefix on the side of the filer store with SQL")
filerS3Options.localSocket = cmdFiler.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
// start webdav on filer

3
weed/command/s3.go

@ -51,6 +51,7 @@ type S3Options struct {
metricsHttpPort *int
allowEmptyFolder *bool
allowDeleteBucketNotEmpty *bool
allowListRecursive *bool
auditLogConfig *string
localFilerSocket *string
dataCenter *string
@ -77,6 +78,7 @@ func init() {
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
s3StandaloneOptions.allowListRecursive = cmdS3.Flag.Bool("allowListRecursive", false, "allows recursive listing of directories by prefix on the side of the filer store with SQL")
s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
}
@ -228,6 +230,7 @@ func (s3opt *S3Options) startS3Server() bool {
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
AllowListRecursive: *s3opt.allowListRecursive,
LocalFilerSocket: localFilerSocket,
DataCenter: *s3opt.dataCenter,
FilerGroup: filerGroup,

1
weed/command/server.go

@ -153,6 +153,7 @@ func init() {
s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
s3Options.allowListRecursive = cmdServer.Flag.Bool("s3.allowListRecursive", false, "allows recursive listing of directories by prefix on the side of the filer store with SQL")
s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
iamOptions.port = cmdServer.Flag.Int("iam.port", 8111, "iam server http listen port")

5
weed/filer/abstract_sql/abstract_sql_store.go

@ -20,6 +20,7 @@ type SqlGenerator interface {
GetSqlDeleteFolderChildren(tableName string) string
GetSqlListExclusive(tableName string) string
GetSqlListInclusive(tableName string) string
GetSqlListRecursive(tableName string) string
GetSqlCreateTable(tableName string) string
GetSqlDropTable(tableName string) string
}
@ -331,6 +332,10 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, nil
}
func (store *AbstractSqlStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
}
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
}

4
weed/filer/arangodb/arangodb_store.go

@ -291,6 +291,10 @@ func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath ut
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *ArangodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/")
if err != nil {

4
weed/filer/cassandra/cassandra_store.go

@ -183,6 +183,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, d
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *CassandraStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {

4
weed/filer/elastic/v7/elastic_store.go

@ -103,6 +103,10 @@ func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *ElasticStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
index := getIndex(entry.FullPath, false)
dir, _ := entry.FullPath.DirAndName()

4
weed/filer/etcd/etcd_store.go

@ -177,6 +177,10 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil
}
func (store *EtcdStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
lastFileStart := directoryPrefix

4
weed/filer/filer.go

@ -303,8 +303,8 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di
}
} else if !dirEntry.IsDirectory() {
glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
return fmt.Errorf("%s is a file", dirPath)
glog.Errorf("CreateEntry %s: dir entry %+v should be a directory", entry.FullPath, dirEntry)
//return fmt.Errorf("%s is a file", dirPath)
}
return nil

2
weed/filer/filerstore.go

@ -11,6 +11,7 @@ const CountEntryChunksForGzip = 50
var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrUnsupportedRecursivePrefixed = errors.New("unsupported recursive prefix listing")
ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found")
@ -31,6 +32,7 @@ type FilerStore interface {
DeleteFolderChildren(context.Context, util.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error

10
weed/filer/filerstore_translate_path.go

@ -117,6 +117,16 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir
})
}
func (t *FilerStorePathTranslator) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
return t.actualStore.ListRecursivePrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})
}
func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)

6
weed/filer/filerstore_wrapper.go

@ -261,7 +261,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
if limit > math.MaxInt32-1 {
limit = math.MaxInt32 - 1
}
// glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
glog.V(5).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
adjustedEntryFunc := func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks())
@ -274,6 +274,10 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, err
}
func (fsw *FilerStoreWrapper) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, err
}
func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")

4
weed/filer/hbase/hbase_store.go

@ -152,6 +152,10 @@ func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *HbaseStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix))

4
weed/filer/leveldb/leveldb_store.go

@ -174,6 +174,10 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath wee
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *LevelDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)

4
weed/filer/leveldb2/leveldb2_store.go

@ -178,6 +178,10 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath we
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *LevelDB2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(dirPath, prefix, store.dbCount)

4
weed/filer/leveldb3/leveldb3_store.go

@ -301,6 +301,10 @@ func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath we
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *LevelDB3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, _, shortPath, err := store.findDB(dirPath, true)

4
weed/filer/mongodb/mongodb_store.go

@ -181,6 +181,10 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *MongodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}}

4
weed/filer/mysql/mysql_sql_gen.go

@ -49,6 +49,10 @@ func (gen *SqlGenMysql) GetSqlListInclusive(tableName string) string {
return fmt.Sprintf("SELECT `name`, `meta` FROM `%s` WHERE `dirhash` = ? AND `name` >= ? AND `directory` = ? AND `name` LIKE ? ORDER BY `name` ASC LIMIT ?", tableName)
}
func (gen *SqlGenMysql) GetSqlListRecursive(tableName string) string {
return fmt.Sprintf("SELECT `name`, `meta` FROM `%s` WHERE `dirhash` > ? AND `name` > ? AND `directory` LIKE ? AND `name` LIKE ? ORDER BY `directory,name` ASC LIMIT ?", tableName)
}
func (gen *SqlGenMysql) GetSqlCreateTable(tableName string) string {
return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName)
}

4
weed/filer/postgres/postgres_sql_gen.go

@ -49,6 +49,10 @@ func (gen *SqlGenPostgres) GetSqlListInclusive(tableName string) string {
return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, tableName)
}
func (gen *SqlGenPostgres) GetSqlListRecursive(tableName string) string {
return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash>$1 AND name>$2 AND directory like $3 AND name like $4 ORDER BY DIRECTORY,NAME ASC LIMIT $5`, tableName)
}
func (gen *SqlGenPostgres) GetSqlCreateTable(tableName string) string {
return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName)
}

4
weed/filer/redis/universal_redis_store.go

@ -138,6 +138,10 @@ func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Conte
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedisStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/redis2/universal_redis_store.go

@ -165,6 +165,10 @@ func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Cont
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedis2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/redis3/universal_redis_store.go

@ -135,6 +135,10 @@ func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Cont
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedis3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/redis_lua/universal_redis_store.go

@ -133,6 +133,10 @@ func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Co
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedisLuaStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/rocksdb/rocksdb_store.go

@ -235,6 +235,10 @@ func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath wee
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *RocksDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)

4
weed/filer/tikv/tikv_store.go

@ -210,6 +210,10 @@ func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.F
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *TikvStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
lastFileName := ""
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)

4
weed/filer/ydb/ydb_store.go

@ -289,6 +289,10 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
return lastFileName, nil
}
func (store *YdbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
session, err := store.DB.Table().CreateSession(ctx)
if err != nil {

27
weed/pb/filer.proto

@ -97,6 +97,7 @@ message ListEntriesRequest {
string startFromFileName = 3;
bool inclusiveStartFrom = 4;
uint32 limit = 5;
bool recursive = 6;
}
message ListEntriesResponse {
@ -110,6 +111,7 @@ message RemoteEntry {
int64 remote_mtime = 4;
int64 remote_size = 5;
}
message Entry {
string name = 1;
bool is_directory = 2;
@ -198,6 +200,7 @@ message UpdateEntryRequest {
bool is_from_other_cluster = 3;
repeated int32 signatures = 4;
}
message UpdateEntryResponse {
}
@ -206,6 +209,7 @@ message AppendToEntryRequest {
string entry_name = 2;
repeated FileChunk chunks = 3;
}
message AppendToEntryResponse {
}
@ -242,11 +246,13 @@ message StreamRenameEntryRequest {
string new_name = 4;
repeated int32 signatures = 5;
}
message StreamRenameEntryResponse {
string directory = 1;
EventNotification event_notification = 2;
int64 ts_ns = 3;
}
message AssignVolumeRequest {
int32 count = 1;
string collection = 2;
@ -283,6 +289,7 @@ message Location {
uint32 grpc_port = 3;
string data_center = 4;
}
message LookupVolumeResponse {
map<string, Locations> locations_map = 1;
}
@ -290,13 +297,16 @@ message LookupVolumeResponse {
message Collection {
string name = 1;
}
message CollectionListRequest {
bool include_normal_volumes = 1;
bool include_ec_volumes = 2;
}
message CollectionListResponse {
repeated Collection collections = 1;
}
message DeleteCollectionRequest {
string collection = 1;
}
@ -310,6 +320,7 @@ message StatisticsRequest {
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
uint64 total_size = 4;
uint64 used_size = 5;
@ -320,6 +331,7 @@ message PingRequest {
string target = 1; // default to ping itself
string target_type = 2;
}
message PingResponse {
int64 start_time_ns = 1;
int64 remote_time_ns = 2;
@ -328,6 +340,7 @@ message PingResponse {
message GetFilerConfigurationRequest {
}
message GetFilerConfigurationResponse {
repeated string masters = 1;
string replication = 2;
@ -354,6 +367,7 @@ message SubscribeMetadataRequest {
int32 client_epoch = 9;
repeated string directories = 10; // exact directory to watch
}
message SubscribeMetadataResponse {
string directory = 1;
EventNotification event_notification = 2;
@ -372,6 +386,7 @@ message KeepConnectedRequest {
uint32 grpc_port = 2;
repeated string resources = 3;
}
message KeepConnectedResponse {
}
@ -396,14 +411,17 @@ message LocateBrokerResponse {
message KvGetRequest {
bytes key = 1;
}
message KvGetResponse {
bytes value = 1;
string error = 2;
}
message KvPutRequest {
bytes key = 1;
bytes value = 2;
}
message KvPutResponse {
string error = 1;
}
@ -437,6 +455,7 @@ message CacheRemoteObjectToLocalClusterRequest {
string directory = 1;
string name = 2;
}
message CacheRemoteObjectToLocalClusterResponse {
Entry entry = 1;
}
@ -451,36 +470,44 @@ message LockRequest {
bool is_moved = 4;
string owner = 5;
}
message LockResponse {
string renew_token = 1;
string lock_owner = 2;
string lock_host_moved_to = 3;
string error = 4;
}
message UnlockRequest {
string name = 1;
string renew_token = 2;
bool is_moved = 3;
}
message UnlockResponse {
string error = 1;
string moved_to = 2;
}
message FindLockOwnerRequest {
string name = 1;
bool is_moved = 2;
}
message FindLockOwnerResponse {
string owner = 1;
}
message Lock {
string name = 1;
string renew_token = 2;
int64 expired_at_ns = 3;
string owner = 4;
}
message TransferLocksRequest {
repeated Lock locks = 1;
}
message TransferLocksResponse {
}

1290
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

2
weed/pb/filer_pb/filer_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.3
// - protoc v5.26.1
// source: filer.proto
package filer_pb

41
weed/s3api/s3api_object_handlers_list.go

@ -135,6 +135,15 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
prefixEndsOnDelimiter: strings.HasSuffix(originalPrefix, "/") && len(originalMarker) == 0,
}
if s3a.option.AllowListRecursive {
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return nil
})
return
}
// check filer
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
for {
@ -314,6 +323,38 @@ func toParentAndDescendants(dirAndName string) (dir, name string) {
return
}
func (s3a *S3ApiServer) doListFilerRecursiveEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) {
if prefix == "/" && delimiter == "/" {
return
}
request := &filer_pb.ListEntriesRequest{
Directory: dir,
Prefix: prefix,
Limit: uint32(cursor.maxKeys + 2),
StartFromFileName: marker,
InclusiveStartFrom: inclusiveStartFrom,
Recursive: true,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, listErr := client.ListEntries(ctx, request)
if listErr != nil {
return "", fmt.Errorf("list entires %+v: %v", request, listErr)
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return "", fmt.Errorf("iterating entires %+v: %v", request, recvErr)
}
}
eachEntryFn(dir, resp.Entry)
}
return
}
func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) {
// invariants
// prefix and marker should be under dir, marker may contain "/"

1
weed/s3api/s3api_server.go

@ -32,6 +32,7 @@ type S3ApiServerOption struct {
GrpcDialOption grpc.DialOption
AllowEmptyFolder bool
AllowDeleteBucketNotEmpty bool
AllowListRecursive bool
LocalFilerSocket string
DataCenter string
FilerGroup string

Loading…
Cancel
Save