Browse Source

Merge remote-tracking branch 'upstream/master' into iamapi

pull/1658/head
Konstantin Lebedev 5 years ago
parent
commit
30e45b58b8
  1. 2
      weed/filer/entry.go
  2. 39
      weed/filer/read_write.go
  3. 25
      weed/pb/filer_pb/filer_client.go
  4. 32
      weed/s3api/auth_credentials.go
  5. 3
      weed/s3api/auth_credentials_subscribe.go
  6. 3
      weed/s3api/s3api_objects_list_handlers.go
  7. 2
      weed/server/filer_grpc_server.go
  8. 10
      weed/server/filer_server_handlers_read.go
  9. 5
      weed/shell/command_s3_configure.go

2
weed/filer/entry.go

@ -44,7 +44,7 @@ type Entry struct {
}
func (entry *Entry) Size() uint64 {
return maxUint64(TotalSize(entry.Chunks), entry.FileSize)
return maxUint64(maxUint64(TotalSize(entry.Chunks), entry.FileSize), uint64(len(entry.Content)))
}
func (entry *Entry) Timestamp() time.Time {

39
weed/filer/read_write.go

@ -9,6 +9,7 @@ import (
"io/ioutil"
"math"
"net/http"
"time"
)
func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.SeaweedFilerClient, dir, name string, byteBuffer *bytes.Buffer) error {
@ -75,3 +76,41 @@ func SaveAs(host string, port int, dir, name string, contentType string, byteBuf
return nil
}
func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error {
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
})
if err == filer_pb.ErrNotFound {
err = filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0644),
Collection: "",
Replication: "",
FileSize: uint64(len(content)),
},
Content: content,
},
})
} else if err == nil {
entry := resp.Entry
entry.Content = content
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileSize = uint64(len(content))
err = filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
})
}
return err
}

25
weed/pb/filer_pb/filer_client.go

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"strings"
"time"
@ -61,14 +60,32 @@ type EachEntryFunciton func(entry *Entry, isLast bool) error
func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
var counter uint32
var startFrom string
var counterFunc = func(entry *Entry, isLast bool) error {
counter++
startFrom = entry.Name
return fn(entry, isLast)
}
var paginationLimit uint32 = 10000
if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
return err
}
for counter == paginationLimit {
counter = 0
if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
return err
}
}
return nil
}
func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
}
func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {

32
weed/s3api/auth_credentials.go

@ -38,45 +38,40 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
iam := &IdentityAccessManagement{
domain: option.DomainName,
}
if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil {
glog.Warningf("fail to load config: %v", err)
}
if len(iam.identities) == 0 && option.Config != "" {
if option.Config != "" {
if err := iam.loadS3ApiConfigurationFromFile(option.Config); err != nil {
glog.Fatalf("fail to load config file %s: %v", option.Config, err)
}
} else {
if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil {
glog.Warningf("fail to load config: %v", err)
}
}
return iam
}
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error {
s3ApiConfiguration := &iam_pb.S3ApiConfiguration{}
content, err := filer.ReadContent(option.Filer, filer.IamConfigDirecotry, filer.IamIdentityFile)
if err != nil {
return fmt.Errorf("read S3 config: %v", err)
}
if err = filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil {
return fmt.Errorf("parse S3 config: %v", err)
}
if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil {
return fmt.Errorf("laod S3 config: %v", err)
}
glog.V(0).Infof("loaded %d s3 identities", len(iam.identities))
return nil
return iam.loadS3ApiConfigurationFromBytes(content)
}
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error {
s3ApiConfiguration := &iam_pb.S3ApiConfiguration{}
rawData, readErr := ioutil.ReadFile(fileName)
content, readErr := ioutil.ReadFile(fileName)
if readErr != nil {
glog.Warningf("fail to read %s : %v", fileName, readErr)
return fmt.Errorf("fail to read %s : %v", fileName, readErr)
}
return iam.loadS3ApiConfigurationFromBytes(content)
}
glog.V(1).Infof("load s3 config: %v", fileName)
if err := filer.ParseS3ConfigurationFromBytes(rawData, s3ApiConfiguration); err != nil {
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []byte) error {
s3ApiConfiguration := &iam_pb.S3ApiConfiguration{}
if err := filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil {
glog.Warningf("unmarshal error: %v", err)
return fmt.Errorf("unmarshal %s error: %v", fileName, err)
return fmt.Errorf("unmarshal error: %v", err)
}
if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil {
return err
@ -84,6 +79,7 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName str
return nil
}
func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error {
var identities []*Identity
for _, ident := range config.Identities {

3
weed/s3api/auth_credentials_subscribe.go

@ -25,9 +25,10 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
dir = message.NewParentPath
}
if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile {
if err := s3a.iam.loadS3ApiConfigurationFromFiler(s3a.option); err != nil {
if err := s3a.iam.loadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil {
return err
}
glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile)
}
return nil

3
weed/s3api/s3api_objects_list_handlers.go

@ -197,11 +197,12 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
sepIndex := strings.Index(marker, "/")
subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
// println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
if subErr != nil {
err = subErr
return
}
isTruncated = isTruncated || subIsTruncated
maxKeys -= subCounter
nextMarker = subDir + "/" + subNextMarker
counter += subCounter

2
weed/server/filer_grpc_server.go

@ -170,6 +170,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
Extended: req.Entry.Extended,
HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
HardLinkCounter: req.Entry.HardLinkCounter,
Content: req.Entry.Content,
}, req.OExcl, req.IsFromOtherCluster, req.Signatures)
if createErr == nil {
@ -204,6 +205,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
Chunks: chunks,
HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
HardLinkCounter: req.Entry.HardLinkCounter,
Content: req.Entry.Content,
}
glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",

10
weed/server/filer_server_handlers_read.go

@ -60,7 +60,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
if len(entry.Chunks) == 0 {
if len(entry.Chunks) == 0 && len(entry.Content) == 0 {
glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
w.WriteHeader(http.StatusNoContent)
@ -123,13 +123,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
filename := entry.Name()
adjustHeaderContentDisposition(w, r, filename)
totalSize := int64(entry.Size())
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10))
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return
}
totalSize := int64(entry.Size())
if rangeReq := r.Header.Get("Range"); rangeReq == "" {
ext := filepath.Ext(filename)
width, height, mode, shouldResize := shouldResizeImages(ext, r)
@ -148,7 +148,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset:offset+size])
_, err := writer.Write(entry.Content[offset : offset+size])
return err
}
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)

5
weed/shell/command_s3_configure.go

@ -169,10 +169,11 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
fmt.Fprintf(writer, string(buf.Bytes()))
fmt.Fprintln(writer)
if *apply {
if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.IamConfigDirecotry, filer.IamIdentityFile, "text/plain; charset=utf-8", &buf); err != nil {
if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes())
}); err != nil {
return err
}

Loading…
Cancel
Save