Browse Source
Merge branch 'refs/heads/master' into list_recursive_prefixed_entries
Merge branch 'refs/heads/master' into list_recursive_prefixed_entries
# Conflicts: # .github/workflows/s3tests.yml # weed/s3api/s3api_object_handlers.go # weed/server/filer_server_handlers_write_autochunk.gopull/5580/head
Konstantin Lebedev
6 months ago
69 changed files with 2129 additions and 1166 deletions
-
6.github/workflows/container_dev.yml
-
6.github/workflows/container_latest.yml
-
6.github/workflows/container_release1.yml
-
6.github/workflows/container_release2.yml
-
6.github/workflows/container_release3.yml
-
6.github/workflows/container_release4.yml
-
6.github/workflows/container_release5.yml
-
4.github/workflows/s3tests.yml
-
3README.md
-
60go.mod
-
128go.sum
-
2k8s/charts/seaweedfs/Chart.yaml
-
45k8s/charts/seaweedfs/templates/_helpers.tpl
-
35k8s/charts/seaweedfs/templates/cluster-role.yaml
-
2k8s/charts/seaweedfs/templates/filer-statefulset.yaml
-
37k8s/charts/seaweedfs/templates/service-account.yaml
-
14k8s/charts/seaweedfs/values.yaml
-
BINnote/keepsec.png
-
BINnote/piknik.png
-
12other/java/client/src/main/proto/filer.proto
-
2unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
-
2unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
-
3weed/command/filer.go
-
4weed/command/filer_meta_backup.go
-
6weed/command/master.go
-
1weed/command/server.go
-
24weed/filer/filer.go
-
4weed/filer/filerstore_wrapper.go
-
38weed/filer/meta_replay.go
-
2weed/iamapi/iamapi_server.go
-
2weed/iamapi/iamapi_test.go
-
2weed/operation/chunked_file.go
-
2weed/operation/upload_content.go
-
12weed/pb/filer.proto
-
1052weed/pb/filer_pb/filer.pb.go
-
36weed/pb/filer_pb/filer_client_bfs.go
-
68weed/pb/filer_pb/filer_grpc.pb.go
-
62weed/remote_storage/traverse_bfs.go
-
1weed/s3api/AmazonS3.xsd
-
8weed/s3api/README.txt
-
18weed/s3api/auto_signature_v4_test.go
-
24weed/s3api/s3api_bucket_handlers.go
-
29weed/s3api/s3api_bucket_handlers_test.go
-
5weed/s3api/s3api_object_handlers.go
-
12weed/s3api/s3api_object_handlers_delete.go
-
5weed/s3api/s3api_object_handlers_list.go
-
2weed/s3api/s3api_object_handlers_list_test.go
-
2weed/s3api/s3api_object_handlers_put.go
-
96weed/s3api/s3api_server.go
-
1066weed/s3api/s3api_xsd_generated.go
-
10weed/s3api/s3api_xsd_generated_helper.go
-
2weed/server/common.go
-
84weed/server/filer_grpc_server_traverse_meta.go
-
31weed/server/filer_grpc_server_traverse_meta_test.go
-
6weed/server/filer_server.go
-
2weed/server/filer_server_handlers_read.go
-
8weed/server/filer_server_handlers_write_autochunk.go
-
3weed/server/filer_ui/breadcrumb.go
-
86weed/server/filer_ui/breadcrumb_test.go
-
2weed/server/filer_ui/filer.html
-
8weed/server/volume_server_handlers_read.go
-
2weed/shell/command_fs_merge_volumes.go
-
5weed/shell/command_volume_tier_upload.go
-
2weed/storage/disk_location.go
-
2weed/storage/needle/needle_parse_upload.go
-
2weed/util/constants.go
-
14weed/util/http_util.go
-
30weed/util/queue.go
-
22weed/util/queue_test.go
@ -1,6 +1,6 @@ |
|||
apiVersion: v1 |
|||
description: SeaweedFS |
|||
name: seaweedfs |
|||
appVersion: "3.68" |
|||
appVersion: "3.69" |
|||
# Dev note: Trigger a helm chart release by `git tag -a helm-<version>` |
|||
version: 4.0.0 |
@ -0,0 +1,35 @@ |
|||
{{- if .Values.global.createClusterRole }} |
|||
#hack for delete pod master after migration |
|||
--- |
|||
kind: ClusterRole |
|||
apiVersion: rbac.authorization.k8s.io/v1 |
|||
metadata: |
|||
name: {{ .Values.global.serviceAccountName }}-rw-cr |
|||
labels: |
|||
app.kubernetes.io/name: {{ template "seaweedfs.name" . }} |
|||
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }} |
|||
app.kubernetes.io/managed-by: {{ .Release.Service }} |
|||
app.kubernetes.io/instance: {{ .Release.Name }} |
|||
rules: |
|||
- apiGroups: [""] |
|||
resources: ["pods"] |
|||
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] |
|||
--- |
|||
kind: ClusterRoleBinding |
|||
apiVersion: rbac.authorization.k8s.io/v1 |
|||
metadata: |
|||
name: system:serviceaccount:{{ .Values.global.serviceAccountName }}:default |
|||
labels: |
|||
app.kubernetes.io/name: {{ template "seaweedfs.name" . }} |
|||
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }} |
|||
app.kubernetes.io/managed-by: {{ .Release.Service }} |
|||
app.kubernetes.io/instance: {{ .Release.Name }} |
|||
subjects: |
|||
- kind: ServiceAccount |
|||
name: {{ .Values.global.serviceAccountName }} |
|||
namespace: {{ .Release.Namespace }} |
|||
roleRef: |
|||
apiGroup: rbac.authorization.k8s.io |
|||
kind: ClusterRole |
|||
name: {{ .Values.global.serviceAccountName }}-rw-cr |
|||
{{- end }} |
After Width: 100 | Height: 36 | Size: 7.8 KiB |
Before Width: 165 | Height: 50 | Size: 7.2 KiB After Width: 156 | Height: 35 | Size: 7.4 KiB |
1052
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,62 +0,0 @@ |
|||
package remote_storage |
|||
|
|||
import ( |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
type ListDirectoryFunc func(parentDir util.FullPath, visitFn VisitFunc) error |
|||
|
|||
func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc) (err error) { |
|||
K := 5 |
|||
|
|||
var dirQueueWg sync.WaitGroup |
|||
dirQueue := util.NewQueue() |
|||
dirQueueWg.Add(1) |
|||
dirQueue.Enqueue(parentPath) |
|||
var isTerminating bool |
|||
|
|||
for i := 0; i < K; i++ { |
|||
go func() { |
|||
for { |
|||
if isTerminating { |
|||
break |
|||
} |
|||
t := dirQueue.Dequeue() |
|||
if t == nil { |
|||
time.Sleep(329 * time.Millisecond) |
|||
continue |
|||
} |
|||
dir := t.(util.FullPath) |
|||
processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg) |
|||
if processErr != nil { |
|||
err = processErr |
|||
} |
|||
dirQueueWg.Done() |
|||
} |
|||
}() |
|||
} |
|||
|
|||
dirQueueWg.Wait() |
|||
isTerminating = true |
|||
return |
|||
|
|||
} |
|||
|
|||
func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) error { |
|||
|
|||
return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { |
|||
if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil { |
|||
return err |
|||
} |
|||
if !isDirectory { |
|||
return nil |
|||
} |
|||
dirQueueWg.Add(1) |
|||
dirQueue.Enqueue(parentPath.Child(name)) |
|||
return nil |
|||
}) |
|||
|
|||
} |
@ -1,7 +1,7 @@ |
|||
see https://blog.aqwari.net/xml-schema-go/ |
|||
|
|||
1. go get aqwari.net/xml/cmd/xsdgen |
|||
2. xsdgen -o s3api_xsd_generated.go -pkg s3api AmazonS3.xsd |
|||
|
|||
|
|||
|
|||
2. Add EncodingType element for ListBucketResult in AmazonS3.xsd |
|||
3. xsdgen -o s3api_xsd_generated.go -pkg s3api AmazonS3.xsd |
|||
4. Remove empty Grantee struct in s3api_xsd_generated.go |
|||
5. Remove xmlns: sed s'/http:\/\/s3.amazonaws.com\/doc\/2006-03-01\/\ //' s3api_xsd_generated.go |
1066
weed/s3api/s3api_xsd_generated.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,10 @@ |
|||
package s3api |
|||
|
|||
type Grantee struct { |
|||
XMLNS string `xml:"xmlns:xsi,attr"` |
|||
XMLXSI string `xml:"xsi:type,attr"` |
|||
Type string `xml:"Type"` |
|||
ID string `xml:"ID,omitempty"` |
|||
DisplayName string `xml:"DisplayName,omitempty"` |
|||
URI string `xml:"URI,omitempty"` |
|||
} |
@ -0,0 +1,84 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/filer" |
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
"github.com/viant/ptrie" |
|||
) |
|||
|
|||
func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error { |
|||
|
|||
glog.V(0).Infof("TraverseBfsMetadata %v", req) |
|||
|
|||
excludedTrie := ptrie.New[bool]() |
|||
for _, excluded := range req.ExcludedPrefixes { |
|||
excludedTrie.Put([]byte(excluded), true) |
|||
} |
|||
|
|||
ctx := stream.Context() |
|||
|
|||
queue := util.NewQueue[*filer.Entry]() |
|||
dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory)) |
|||
if err != nil { |
|||
return fmt.Errorf("find dir %s: %v", req.Directory, err) |
|||
} |
|||
queue.Enqueue(dirEntry) |
|||
|
|||
for item := queue.Dequeue(); item != nil; item = queue.Dequeue() { |
|||
if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool { |
|||
return true |
|||
}) { |
|||
// println("excluded", item.FullPath)
|
|||
continue |
|||
} |
|||
parent, _ := item.FullPath.DirAndName() |
|||
if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{ |
|||
Directory: parent, |
|||
Entry: item.ToProtoEntry(), |
|||
}); err != nil { |
|||
return fmt.Errorf("send traverse bfs metadata response: %v", err) |
|||
} |
|||
|
|||
if !item.IsDirectory() { |
|||
continue |
|||
} |
|||
|
|||
if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error { |
|||
queue.Enqueue(entry) |
|||
return nil |
|||
}); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) { |
|||
var lastFileName string |
|||
var listErr error |
|||
for { |
|||
var hasEntries bool |
|||
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool { |
|||
hasEntries = true |
|||
if fnErr := fn(entry); fnErr != nil { |
|||
err = fnErr |
|||
return false |
|||
} |
|||
return true |
|||
}) |
|||
if listErr != nil { |
|||
return listErr |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if !hasEntries { |
|||
return nil |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/viant/ptrie" |
|||
"testing" |
|||
) |
|||
|
|||
func TestPtrie(t *testing.T) { |
|||
b := []byte("/topics/abc/dev") |
|||
excludedTrie := ptrie.New[bool]() |
|||
excludedTrie.Put([]byte("/topics/abc/d"), true) |
|||
excludedTrie.Put([]byte("/topics/abc"), true) |
|||
|
|||
assert.True(t, excludedTrie.MatchPrefix(b, func(key []byte, value bool) bool { |
|||
println("matched1", string(key)) |
|||
return true |
|||
})) |
|||
|
|||
assert.True(t, excludedTrie.MatchAll(b, func(key []byte, value bool) bool { |
|||
println("matched2", string(key)) |
|||
return true |
|||
})) |
|||
|
|||
assert.False(t, excludedTrie.MatchAll([]byte("/topics/ab"), func(key []byte, value bool) bool { |
|||
println("matched3", string(key)) |
|||
return true |
|||
})) |
|||
|
|||
assert.False(t, excludedTrie.Has(b)) |
|||
} |
@ -0,0 +1,86 @@ |
|||
package filer_ui |
|||
|
|||
import ( |
|||
"reflect" |
|||
"testing" |
|||
) |
|||
|
|||
func TestToBreadcrumb(t *testing.T) { |
|||
type args struct { |
|||
fullpath string |
|||
} |
|||
tests := []struct { |
|||
name string |
|||
args args |
|||
wantCrumbs []Breadcrumb |
|||
}{ |
|||
{ |
|||
name: "empty", |
|||
args: args{ |
|||
fullpath: "", |
|||
}, |
|||
wantCrumbs: []Breadcrumb{ |
|||
{ |
|||
Name: "/", |
|||
Link: "/", |
|||
}, |
|||
}, |
|||
}, |
|||
{ |
|||
name: "test1", |
|||
args: args{ |
|||
fullpath: "/", |
|||
}, |
|||
wantCrumbs: []Breadcrumb{ |
|||
{ |
|||
Name: "/", |
|||
Link: "/", |
|||
}, |
|||
}, |
|||
}, |
|||
{ |
|||
name: "test2", |
|||
args: args{ |
|||
fullpath: "/abc", |
|||
}, |
|||
wantCrumbs: []Breadcrumb{ |
|||
{ |
|||
Name: "/", |
|||
Link: "/", |
|||
}, |
|||
{ |
|||
Name: "abc", |
|||
Link: "/abc/", |
|||
}, |
|||
}, |
|||
}, |
|||
{ |
|||
name: "test3", |
|||
args: args{ |
|||
fullpath: "/abc/def", |
|||
}, |
|||
wantCrumbs: []Breadcrumb{ |
|||
{ |
|||
Name: "/", |
|||
Link: "/", |
|||
}, |
|||
{ |
|||
Name: "abc", |
|||
Link: "/abc/", |
|||
}, |
|||
{ |
|||
Name: "def", |
|||
Link: "/abc/def/", |
|||
}, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
for _, tt := range tests { |
|||
t.Run(tt.name, func(t *testing.T) { |
|||
if gotCrumbs := ToBreadcrumb(tt.args.fullpath); !reflect.DeepEqual(gotCrumbs, tt.wantCrumbs) { |
|||
t.Errorf("ToBreadcrumb() = %v, want %v", gotCrumbs, tt.wantCrumbs) |
|||
} |
|||
}) |
|||
} |
|||
} |
@ -0,0 +1,22 @@ |
|||
package util |
|||
|
|||
import ( |
|||
"github.com/stretchr/testify/assert" |
|||
"testing" |
|||
) |
|||
|
|||
func TestNewQueue(t *testing.T) { |
|||
|
|||
q := NewQueue[int]() |
|||
|
|||
for i := 0; i < 10; i++ { |
|||
q.Enqueue(i) |
|||
} |
|||
|
|||
assert.Equal(t, q.Len(), 10) |
|||
|
|||
for i := 0; i < 10; i++ { |
|||
assert.Equal(t, q.Dequeue(), i) |
|||
} |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue