Browse Source

Merge branch 'master' of https://github.com/seaweedfs/seaweedfs

pull/3834/head
chrislu 2 years ago
parent
commit
8aec786a6d
  1. 5
      other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
  2. 43
      weed/s3api/bucket_metadata.go
  3. 42
      weed/s3api/bucket_metadata_test.go
  4. 3
      weed/s3api/s3_constants/extend_key.go
  5. 14
      weed/server/filer_server_handlers_write_autochunk.go
  6. 1
      weed/storage/needle_map.go
  7. 46
      weed/storage/needle_map_leveldb.go
  8. 4
      weed/storage/needle_map_memory.go
  9. 8
      weed/storage/volume_vacuum.go

5
other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java

@ -119,9 +119,8 @@ public class SeaweedInputStream extends InputStream {
long bytesRead = 0; long bytesRead = 0;
int len = buf.remaining(); int len = buf.remaining();
int start = (int) this.position;
if (start + len <= entry.getContent().size()) {
entry.getContent().substring(start, start + len).copyTo(buf);
if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) {
entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf);
} else { } else {
bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
} }

43
weed/s3api/bucket_metadata.go

@ -1,16 +1,12 @@
package s3api package s3api
import ( import (
"bytes"
"encoding/json" "encoding/json"
"github.com/aws/aws-sdk-go/private/protocol/json/jsonutil"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3account" "github.com/seaweedfs/seaweedfs/weed/s3api/s3account"
//"github.com/seaweedfs/seaweedfs/weed/s3api"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"math" "math"
@ -23,7 +19,7 @@ var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*B
return nil, err return nil, err
} }
return buildBucketMetadata(entry), nil
return buildBucketMetadata(r.s3a.accountManager, entry), nil
} }
type BucketMetaData struct { type BucketMetaData struct {
@ -77,13 +73,13 @@ func (r *BucketRegistry) init() error {
} }
func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) { func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) {
bucketMetadata := buildBucketMetadata(entry)
bucketMetadata := buildBucketMetadata(r.s3a.accountManager, entry)
r.metadataCacheLock.Lock() r.metadataCacheLock.Lock()
defer r.metadataCacheLock.Unlock() defer r.metadataCacheLock.Unlock()
r.metadataCache[entry.Name] = bucketMetadata r.metadataCache[entry.Name] = bucketMetadata
} }
func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData {
func buildBucketMetadata(accountManager *s3account.AccountManager, entry *filer_pb.Entry) *BucketMetaData {
entryJson, _ := json.Marshal(entry) entryJson, _ := json.Marshal(entry)
glog.V(3).Infof("build bucket metadata,entry=%s", entryJson) glog.V(3).Infof("build bucket metadata,entry=%s", entryJson)
bucketMetadata := &BucketMetaData{ bucketMetadata := &BucketMetaData{
@ -112,22 +108,29 @@ func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData {
} }
//access control policy //access control policy
acpBytes, ok := entry.Extended[s3_constants.ExtAcpKey]
if ok {
var acp s3.AccessControlPolicy
err := jsonutil.UnmarshalJSON(&acp, bytes.NewReader(acpBytes))
if err == nil {
//validate owner
if acp.Owner != nil && acp.Owner.ID != nil {
bucketMetadata.Owner = acp.Owner
//owner
acpOwnerBytes, ok := entry.Extended[s3_constants.ExtAmzOwnerKey]
if ok && len(acpOwnerBytes) > 0 {
ownerAccountId := string(acpOwnerBytes)
ownerAccountName, exists := accountManager.IdNameMapping[ownerAccountId]
if !exists {
glog.Warningf("owner[id=%s] is invalid, bucket: %s", ownerAccountId, bucketMetadata.Name)
} else { } else {
glog.Warningf("bucket ownerId is empty! bucket: %s", bucketMetadata.Name)
bucketMetadata.Owner = &s3.Owner{
ID: &ownerAccountId,
DisplayName: &ownerAccountName,
} }
//acl
bucketMetadata.Acl = acp.Grants
}
}
//grants
acpGrantsBytes, ok := entry.Extended[s3_constants.ExtAmzAclKey]
if ok && len(acpGrantsBytes) > 0 {
var grants []*s3.Grant
err := json.Unmarshal(acpGrantsBytes, &grants)
if err == nil {
bucketMetadata.Acl = grants
} else { } else {
glog.Warningf("Unmarshal ACP: %s(%v), bucket: %s", string(acpBytes), err, bucketMetadata.Name)
glog.Warningf("Unmarshal ACP grants: %s(%v), bucket: %s", string(acpGrantsBytes), err, bucketMetadata.Name)
} }
} }
} }

42
weed/s3api/bucket_metadata_test.go

@ -1,8 +1,8 @@
package s3api package s3api
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/aws/aws-sdk-go/private/protocol/json/jsonutil"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
@ -26,18 +26,13 @@ var (
} }
//good entry //good entry
goodEntryAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{
Owner: &s3.Owner{
DisplayName: &s3account.AccountAdmin.Name,
ID: &s3account.AccountAdmin.Id,
},
Grants: s3_constants.PublicRead,
})
goodEntryAcl, _ = json.Marshal(s3_constants.PublicRead)
goodEntry = &filer_pb.Entry{ goodEntry = &filer_pb.Entry{
Name: "entryWithValidAcp", Name: "entryWithValidAcp",
Extended: map[string][]byte{ Extended: map[string][]byte{
s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced), s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced),
s3_constants.ExtAcpKey: goodEntryAcp,
s3_constants.ExtAmzOwnerKey: []byte(s3account.AccountAdmin.Name),
s3_constants.ExtAmzAclKey: goodEntryAcl,
}, },
} }
@ -57,35 +52,28 @@ var (
}, },
} }
//acp is ""
//owner is ""
acpEmptyStr = &filer_pb.Entry{ acpEmptyStr = &filer_pb.Entry{
Name: "acpEmptyStr", Name: "acpEmptyStr",
Extended: map[string][]byte{ Extended: map[string][]byte{
s3_constants.ExtAcpKey: []byte(""),
s3_constants.ExtAmzOwnerKey: []byte(""),
}, },
} }
//acp is empty object
acpEmptyObjectAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{
Owner: nil,
Grants: nil,
})
//owner not exists
acpEmptyObject = &filer_pb.Entry{ acpEmptyObject = &filer_pb.Entry{
Name: "acpEmptyObject", Name: "acpEmptyObject",
Extended: map[string][]byte{ Extended: map[string][]byte{
s3_constants.ExtAcpKey: acpEmptyObjectAcp,
s3_constants.ExtAmzOwnerKey: []byte("xxxxx"),
}, },
} }
//acp owner is nil
acpOwnerNilAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{
Owner: nil,
Grants: make([]*s3.Grant, 1),
})
//grants is nil
acpOwnerNilAcp, _ = json.Marshal(make([]*s3.Grant, 0))
acpOwnerNil = &filer_pb.Entry{ acpOwnerNil = &filer_pb.Entry{
Name: "acpOwnerNil", Name: "acpOwnerNil",
Extended: map[string][]byte{ Extended: map[string][]byte{
s3_constants.ExtAcpKey: acpOwnerNilAcp,
s3_constants.ExtAmzAclKey: acpOwnerNilAcp,
}, },
} }
@ -175,8 +163,14 @@ var tcs = []*BucketMetadataTestCase{
} }
func TestBuildBucketMetadata(t *testing.T) { func TestBuildBucketMetadata(t *testing.T) {
accountManager := &s3account.AccountManager{
IdNameMapping: map[string]string{
s3account.AccountAdmin.Id: s3account.AccountAdmin.Name,
s3account.AccountAnonymous.Id: s3account.AccountAnonymous.Name,
},
}
for _, tc := range tcs { for _, tc := range tcs {
resultBucketMetadata := buildBucketMetadata(tc.filerEntry)
resultBucketMetadata := buildBucketMetadata(accountManager, tc.filerEntry)
if !reflect.DeepEqual(resultBucketMetadata, tc.expectBucketMetadata) { if !reflect.DeepEqual(resultBucketMetadata, tc.expectBucketMetadata) {
t.Fatalf("result is unexpect: \nresult: %v, \nexpect: %v", resultBucketMetadata, tc.expectBucketMetadata) t.Fatalf("result is unexpect: \nresult: %v, \nexpect: %v", resultBucketMetadata, tc.expectBucketMetadata)
} }

3
weed/s3api/s3_constants/extend_key.go

@ -1,6 +1,7 @@
package s3_constants package s3_constants
const ( const (
ExtAcpKey = "Seaweed-X-Amz-Acp"
ExtAmzOwnerKey = "Seaweed-X-Amz-Owner"
ExtAmzAclKey = "Seaweed-X-Amz-Acl"
ExtOwnershipKey = "Seaweed-X-Amz-Ownership" ExtOwnershipKey = "Seaweed-X-Amz-Ownership"
) )

14
weed/server/filer_server_handlers_write_autochunk.go

@ -375,10 +375,16 @@ func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool
} }
} }
//acp
acp := r.Header.Get(s3_constants.ExtAcpKey)
if len(acp) > 0 {
metadata[s3_constants.ExtAcpKey] = []byte(acp)
//acp-owner
acpOwner := r.Header.Get(s3_constants.ExtAmzOwnerKey)
if len(acpOwner) > 0 {
metadata[s3_constants.ExtAmzOwnerKey] = []byte(acpOwner)
}
//acp-grants
acpGrants := r.Header.Get(s3_constants.ExtAmzAclKey)
if len(acpOwner) > 0 {
metadata[s3_constants.ExtAmzAclKey] = []byte(acpGrants)
} }
return return

1
weed/storage/needle_map.go

@ -48,7 +48,6 @@ type TempNeedleMapper interface {
NeedleMapper NeedleMapper
DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error
UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error
UpdateNeedleMapMetric(indexFile *os.File) error
} }
func (nm *baseNeedleMapper) IndexFileSize() uint64 { func (nm *baseNeedleMapper) IndexFileSize() uint64 {

46
weed/storage/needle_map_leveldb.go

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
@ -179,6 +180,7 @@ func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, update
} }
return nil return nil
} }
func levelDbDelete(db *leveldb.DB, key NeedleId) error { func levelDbDelete(db *leveldb.DB, key NeedleId) error {
bytes := make([]byte, NeedleIdSize) bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes, key) NeedleIdToBytes(bytes, key)
@ -305,23 +307,45 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF
} }
err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) { err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
m.mapMetric.FileCounter++
bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
// fresh loading
if startFrom == 0 {
m.mapMetric.FileByteCounter += uint64(size)
e = levelDbWrite(db, key, offset, size, false, 0)
return e
}
// increment loading
data, err := db.Get(bytes, nil)
if err != nil {
if !strings.Contains(strings.ToLower(err.Error()), "not found") {
// unexpected error
return err
}
// new needle, unlikely happen
m.mapMetric.FileByteCounter += uint64(size)
e = levelDbWrite(db, key, offset, size, false, 0)
} else {
// needle is found
oldSize := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
oldOffset := BytesToOffset(data[0:OffsetSize])
if !offset.IsZero() && size.IsValid() { if !offset.IsZero() && size.IsValid() {
// updated needle
m.mapMetric.FileByteCounter += uint64(size)
if !oldOffset.IsZero() && oldSize.IsValid() {
m.mapMetric.DeletionCounter++
m.mapMetric.DeletionByteCounter += uint64(oldSize)
}
e = levelDbWrite(db, key, offset, size, false, 0) e = levelDbWrite(db, key, offset, size, false, 0)
} else { } else {
// deleted needle
m.mapMetric.DeletionCounter++
m.mapMetric.DeletionByteCounter += uint64(oldSize)
e = levelDbDelete(db, key) e = levelDbDelete(db, key)
} }
}
return e return e
}) })
if err != nil {
return err return err
} }
if startFrom != 0 {
return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
}
return nil
}
func (m *LevelDbNeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error {
return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
}

4
weed/storage/needle_map_memory.go

@ -129,7 +129,3 @@ func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom ui
return e return e
} }
func (m *NeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error {
return nil
}

8
weed/storage/volume_vacuum.go

@ -219,16 +219,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
} }
if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset { if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset {
if v.needleMapKind == NeedleMapInMemory {
return nil return nil
} }
newIdx, err := os.OpenFile(newIdxFileName, os.O_RDWR, 0644)
if err != nil {
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
}
defer newIdx.Close()
return v.tmpNm.UpdateNeedleMapMetric(newIdx)
}
// fail if the old .dat file has changed to a new revision // fail if the old .dat file has changed to a new revision
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend) oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend)

Loading…
Cancel
Save