hilimd
4 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 788 additions and 306 deletions
-
4.github/workflows/release.yml
-
7go.mod
-
14go.sum
-
2k8s/seaweedfs/Chart.yaml
-
2k8s/seaweedfs/values.yaml
-
2other/java/client/pom.xml
-
2other/java/client/pom.xml.deploy
-
2other/java/client/pom_debug.xml
-
15other/java/client/src/main/java/seaweedfs/client/FilerClient.java
-
14other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
-
2other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
-
1other/java/client/src/main/proto/filer.proto
-
4other/java/examples/pom.xml
-
22other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
-
46other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java
-
2other/java/hdfs2/dependency-reduced-pom.xml
-
2other/java/hdfs2/pom.xml
-
2other/java/hdfs3/dependency-reduced-pom.xml
-
2other/java/hdfs3/pom.xml
-
2weed/filer/filer.go
-
139weed/filer/filer_conf.go
-
29weed/filer/filer_conf_test.go
-
10weed/filer/filer_notify_append.go
-
61weed/filer/filer_on_meta_event.go
-
3weed/filer/meta_aggregator.go
-
2weed/filesys/file.go
-
36weed/operation/assign_file_id.go
-
1weed/pb/filer.proto
-
235weed/pb/filer_pb/filer.pb.go
-
26weed/pb/filer_pb/filer_pb_helper.go
-
2weed/s3api/http/header.go
-
30weed/s3api/s3api_bucket_handlers.go
-
1weed/server/common.go
-
60weed/server/filer_grpc_server.go
-
2weed/server/filer_server.go
-
4weed/server/filer_server_handlers.go
-
95weed/server/filer_server_handlers_write.go
-
40weed/server/filer_server_handlers_write_autochunk.go
-
12weed/server/filer_server_handlers_write_cipher.go
-
126weed/shell/command_fs_configure.go
-
2weed/util/constants.go
-
17weed/util/http_util.go
-
10weed/util/retry.go
@ -1,4 +1,4 @@ |
|||
apiVersion: v1 |
|||
description: SeaweedFS |
|||
name: seaweedfs |
|||
version: 2.10 |
|||
version: 2.11 |
@ -0,0 +1,46 @@ |
|||
package com.seaweedfs.examples; |
|||
|
|||
import seaweedfs.client.FilerClient; |
|||
import seaweedfs.client.FilerProto; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.Date; |
|||
import java.util.Iterator; |
|||
|
|||
public class WatchFiles { |
|||
|
|||
public static void main(String[] args) throws IOException { |
|||
|
|||
FilerClient filerClient = new FilerClient("localhost", 18888); |
|||
|
|||
long sinceNs = (System.currentTimeMillis() - 3600 * 1000) * 1000000L; |
|||
|
|||
Iterator<FilerProto.SubscribeMetadataResponse> watch = filerClient.watch( |
|||
"/buckets", |
|||
"exampleClientName", |
|||
sinceNs |
|||
); |
|||
|
|||
System.out.println("Connected to filer, subscribing from " + new Date()); |
|||
|
|||
while (watch.hasNext()) { |
|||
FilerProto.SubscribeMetadataResponse event = watch.next(); |
|||
FilerProto.EventNotification notification = event.getEventNotification(); |
|||
if (!event.getDirectory().equals(notification.getNewParentPath())) { |
|||
// move an entry to a new directory, possibly with a new name |
|||
if (notification.hasOldEntry() && notification.hasNewEntry()) { |
|||
System.out.println("moved " + event.getDirectory() + "/" + notification.getOldEntry().getName() + " to " + notification.getNewParentPath() + "/" + notification.getNewEntry().getName()); |
|||
} else { |
|||
System.out.println("this should not happen."); |
|||
} |
|||
} else if (notification.hasNewEntry() && !notification.hasOldEntry()) { |
|||
System.out.println("created entry " + event.getDirectory() + "/" + notification.getNewEntry().getName()); |
|||
} else if (!notification.hasNewEntry() && notification.hasOldEntry()) { |
|||
System.out.println("deleted entry " + event.getDirectory() + "/" + notification.getOldEntry().getName()); |
|||
} else if (notification.hasNewEntry() && notification.hasOldEntry()) { |
|||
System.out.println("updated entry " + event.getDirectory() + "/" + notification.getNewEntry().getName()); |
|||
} |
|||
} |
|||
|
|||
} |
|||
} |
@ -0,0 +1,139 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"io" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/jsonpb" |
|||
"github.com/golang/protobuf/proto" |
|||
"github.com/viant/ptrie" |
|||
) |
|||
|
|||
const ( |
|||
DirectoryEtc = "/etc" |
|||
FilerConfName = "filer.conf" |
|||
) |
|||
|
|||
type FilerConf struct { |
|||
rules ptrie.Trie |
|||
} |
|||
|
|||
func NewFilerConf() (fc *FilerConf) { |
|||
fc = &FilerConf{ |
|||
rules: ptrie.New(), |
|||
} |
|||
return fc |
|||
} |
|||
|
|||
func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) { |
|||
filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName) |
|||
entry, err := filer.FindEntry(context.Background(), filerConfPath) |
|||
if err != nil { |
|||
if err == filer_pb.ErrNotFound { |
|||
return nil |
|||
} |
|||
glog.Errorf("read filer conf entry %s: %v", filerConfPath, err) |
|||
return |
|||
} |
|||
|
|||
return fc.loadFromChunks(filer, entry.Chunks) |
|||
} |
|||
|
|||
func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) { |
|||
data, err := filer.readEntry(chunks) |
|||
if err != nil { |
|||
glog.Errorf("read filer conf content: %v", err) |
|||
return |
|||
} |
|||
|
|||
return fc.LoadFromBytes(data) |
|||
} |
|||
|
|||
func (fc *FilerConf) LoadFromBytes(data []byte) (err error) { |
|||
conf := &filer_pb.FilerConf{} |
|||
|
|||
if err := jsonpb.Unmarshal(bytes.NewReader(data), conf); err != nil { |
|||
|
|||
err = proto.UnmarshalText(string(data), conf) |
|||
if err != nil { |
|||
glog.Errorf("unable to parse filer conf: %v", err) |
|||
// this is not recoverable
|
|||
return nil |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
return fc.doLoadConf(conf) |
|||
} |
|||
|
|||
func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) { |
|||
for _, location := range conf.Locations { |
|||
err = fc.AddLocationConf(location) |
|||
if err != nil { |
|||
// this is not recoverable
|
|||
return nil |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error) { |
|||
err = fc.rules.Put([]byte(locConf.LocationPrefix), locConf) |
|||
if err != nil { |
|||
glog.Errorf("put location prefix: %v", err) |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (fc *FilerConf) DeleteLocationConf(locationPrefix string) { |
|||
rules := ptrie.New() |
|||
fc.rules.Walk(func(key []byte, value interface{}) bool { |
|||
if string(key) == locationPrefix { |
|||
return true |
|||
} |
|||
rules.Put(key, value) |
|||
return true |
|||
}) |
|||
fc.rules = rules |
|||
return |
|||
} |
|||
|
|||
var ( |
|||
EmptyFilerConfPathConf = &filer_pb.FilerConf_PathConf{} |
|||
) |
|||
|
|||
func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf) { |
|||
fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool { |
|||
pathConf = value.(*filer_pb.FilerConf_PathConf) |
|||
return true |
|||
}) |
|||
if pathConf == nil { |
|||
return EmptyFilerConfPathConf |
|||
} |
|||
return pathConf |
|||
} |
|||
|
|||
func (fc *FilerConf) ToProto() *filer_pb.FilerConf { |
|||
m := &filer_pb.FilerConf{} |
|||
fc.rules.Walk(func(key []byte, value interface{}) bool { |
|||
pathConf := value.(*filer_pb.FilerConf_PathConf) |
|||
m.Locations = append(m.Locations, pathConf) |
|||
return true |
|||
}) |
|||
return m |
|||
} |
|||
|
|||
func (fc *FilerConf) ToText(writer io.Writer) error { |
|||
|
|||
m := jsonpb.Marshaler{ |
|||
EmitDefaults: false, |
|||
Indent: " ", |
|||
} |
|||
|
|||
return m.Marshal(writer, fc.ToProto()) |
|||
} |
@ -0,0 +1,29 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"testing" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestFilerConf(t *testing.T) { |
|||
|
|||
fc := NewFilerConf() |
|||
|
|||
conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{ |
|||
{ |
|||
LocationPrefix: "/buckets/abc", |
|||
Collection: "abc", |
|||
}, |
|||
{ |
|||
LocationPrefix: "/buckets/abcd", |
|||
Collection: "abcd", |
|||
}, |
|||
}} |
|||
fc.doLoadConf(conf) |
|||
|
|||
assert.Equal(t, "abc", fc.MatchStorageRule("/buckets/abc/jasdf").Collection) |
|||
assert.Equal(t, "abcd", fc.MatchStorageRule("/buckets/abcd/jasdf").Collection) |
|||
|
|||
} |
@ -0,0 +1,61 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"bytes" |
|||
"math" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
|
|||
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { |
|||
if DirectoryEtc != event.Directory { |
|||
if DirectoryEtc != event.EventNotification.NewParentPath { |
|||
return |
|||
} |
|||
} |
|||
|
|||
entry := event.EventNotification.NewEntry |
|||
if entry == nil { |
|||
return |
|||
} |
|||
|
|||
glog.V(0).Infof("procesing %v", event) |
|||
if entry.Name == FilerConfName { |
|||
f.reloadFilerConfiguration(entry) |
|||
} |
|||
|
|||
} |
|||
|
|||
func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { |
|||
var buf bytes.Buffer |
|||
err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return buf.Bytes(), nil |
|||
} |
|||
|
|||
func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) { |
|||
fc := NewFilerConf() |
|||
err := fc.loadFromChunks(f, entry.Chunks) |
|||
if err != nil { |
|||
glog.Errorf("read filer conf chunks: %v", err) |
|||
return |
|||
} |
|||
f.FilerConf = fc |
|||
} |
|||
|
|||
func (f *Filer) LoadFilerConf() { |
|||
fc := NewFilerConf() |
|||
err := util.Retry("loadFilerConf", func() error { |
|||
return fc.loadFromFiler(f) |
|||
}) |
|||
if err != nil { |
|||
glog.Errorf("read filer conf: %v", err) |
|||
return |
|||
} |
|||
f.FilerConf = fc |
|||
} |
@ -0,0 +1,126 @@ |
|||
package shell |
|||
|
|||
import ( |
|||
"bytes" |
|||
"flag" |
|||
"fmt" |
|||
"io" |
|||
"math" |
|||
"net/http" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
func init() { |
|||
Commands = append(Commands, &commandFsConfigure{}) |
|||
} |
|||
|
|||
type commandFsConfigure struct { |
|||
} |
|||
|
|||
func (c *commandFsConfigure) Name() string { |
|||
return "fs.configure" |
|||
} |
|||
|
|||
func (c *commandFsConfigure) Help() string { |
|||
return `configure and apply storage options for each location |
|||
|
|||
# see the possible configuration file content |
|||
fs.configure |
|||
|
|||
# trying the changes and see the possible configuration file content |
|||
fs.configure -locationPrfix=/my/folder -collection=abc |
|||
fs.configure -locationPrfix=/my/folder -collection=abc -ttl=7d |
|||
|
|||
# apply the changes |
|||
fs.configure -locationPrfix=/my/folder -collection=abc -apply |
|||
|
|||
# delete the changes |
|||
fs.configure -locationPrfix=/my/folder -delete -apply |
|||
|
|||
` |
|||
} |
|||
|
|||
func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { |
|||
|
|||
fsConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) |
|||
locationPrefix := fsConfigureCommand.String("locationPrefix", "", "path prefix, required to update the path-specific configuration") |
|||
collection := fsConfigureCommand.String("collection", "", "assign writes to this collection") |
|||
replication := fsConfigureCommand.String("replication", "", "assign writes with this replication") |
|||
ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl") |
|||
fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes") |
|||
isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") |
|||
apply := fsConfigureCommand.Bool("apply", false, "update and apply filer configuration") |
|||
if err = fsConfigureCommand.Parse(args); err != nil { |
|||
return nil |
|||
} |
|||
|
|||
var buf bytes.Buffer |
|||
if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
request := &filer_pb.LookupDirectoryEntryRequest{ |
|||
Directory: filer.DirectoryEtc, |
|||
Name: filer.FilerConfName, |
|||
} |
|||
respLookupEntry, err := filer_pb.LookupEntry(client, request) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return filer.StreamContent(commandEnv.MasterClient, &buf, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) |
|||
|
|||
}); err != nil { |
|||
return err |
|||
} |
|||
|
|||
fc := filer.NewFilerConf() |
|||
if err = fc.LoadFromBytes(buf.Bytes()); err != nil { |
|||
return err |
|||
} |
|||
|
|||
if *locationPrefix != "" { |
|||
locConf := &filer_pb.FilerConf_PathConf{ |
|||
LocationPrefix: *locationPrefix, |
|||
Collection: *collection, |
|||
Replication: *replication, |
|||
Ttl: *ttl, |
|||
Fsync: *fsync, |
|||
} |
|||
if *isDelete { |
|||
fc.DeleteLocationConf(*locationPrefix) |
|||
} else { |
|||
fc.AddLocationConf(locConf) |
|||
} |
|||
} |
|||
|
|||
buf.Reset() |
|||
fc.ToText(&buf) |
|||
|
|||
fmt.Fprintf(writer, string(buf.Bytes())) |
|||
fmt.Fprintln(writer) |
|||
|
|||
if *apply { |
|||
|
|||
target := fmt.Sprintf("http://%s:%d%s/%s", commandEnv.option.FilerHost, commandEnv.option.FilerPort, filer.DirectoryEtc, filer.FilerConfName) |
|||
|
|||
// set the HTTP method, url, and request body
|
|||
req, err := http.NewRequest(http.MethodPut, target, &buf) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// set the request header Content-Type for json
|
|||
req.Header.Set("Content-Type", "text/plain; charset=utf-8") |
|||
resp, err := http.DefaultClient.Do(req) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
util.CloseResponse(resp) |
|||
|
|||
} |
|||
|
|||
return nil |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue