Browse Source

Merge branch 'master' of https://github.com/seaweedfs/seaweedfs

pull/3624/merge
chrislu 2 years ago
parent
commit
f9e3e7d1c4
  1. 2
      docker/README.md
  2. 2
      docker/compose/notification.toml
  3. 2
      weed/command/scaffold.go
  4. 2
      weed/command/scaffold/filer.toml
  5. 2
      weed/command/scaffold/notification.toml
  6. 10
      weed/filer/filer_deletion.go
  7. 2
      weed/mount/meta_cache/cache_config.go
  8. 2
      weed/mount/meta_cache/meta_cache_subscribe.go
  9. 4
      weed/s3api/filer_multipart.go
  10. 6
      weed/server/volume_grpc_client_to_master.go
  11. 2
      weed/topology/store_replicate.go
  12. 8
      weed/topology/volume_layout_test.go
  13. 4
      weed/util/chunk_cache/chunk_cache_on_disk.go
  14. 2
      weed/wdclient/resource_pool/semaphore.go

2
docker/README.md

@ -44,7 +44,7 @@ docker buildx build --pull --push --platform linux/386,linux/amd64,linux/arm64,l
docker buildx stop $BUILDER docker buildx stop $BUILDER
``` ```
## Minio debuging
## Minio debugging
``` ```
mc config host add local http://127.0.0.1:9000 some_access_key1 some_secret_key1 mc config host add local http://127.0.0.1:9000 some_access_key1 some_secret_key1
mc admin trace --all --verbose local mc admin trace --all --verbose local

2
docker/compose/notification.toml

@ -1,5 +1,5 @@
[notification.log] [notification.log]
# this is only for debugging perpose and does not work with "weed filer.replicate"
# this is only for debugging purpose and does not work with "weed filer.replicate"
enabled = false enabled = false

2
weed/command/scaffold.go

@ -22,7 +22,7 @@ var cmdScaffold = &Command{
export WEED_MYSQL_PASSWORD=some_password export WEED_MYSQL_PASSWORD=some_password
Environment variable rules: Environment variable rules:
* Prefix the variable name with "WEED_" * Prefix the variable name with "WEED_"
* Upppercase the reset of variable name.
* Uppercase the reset of variable name.
* Replace '.' with '_' * Replace '.' with '_'
`, `,

2
weed/command/scaffold/filer.toml

@ -314,7 +314,7 @@ dialTimeOut = 10
# To add path-specific filer store: # To add path-specific filer store:
# #
# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp # 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp
# 2. Add a location configuraiton. E.g., location = "/tmp/"
# 2. Add a location configuration. E.g., location = "/tmp/"
# 3. Copy and customize all other configurations. # 3. Copy and customize all other configurations.
# Make sure they are not the same if using the same store type! # Make sure they are not the same if using the same store type!
# 4. Set enabled to true # 4. Set enabled to true

2
weed/command/scaffold/notification.toml

@ -10,7 +10,7 @@
# send and receive filer updates for each file to an external message queue # send and receive filer updates for each file to an external message queue
#################################################### ####################################################
[notification.log] [notification.log]
# this is only for debugging perpose and does not work with "weed filer.replicate"
# this is only for debugging purpose and does not work with "weed filer.replicate"
enabled = false enabled = false

10
weed/filer/filer_deletion.go

@ -94,10 +94,10 @@ func (f *Filer) doDeleteFileIds(fileIds []string) {
} }
func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) { func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) {
var fildIdsToDelete []string
var fileIdsToDelete []string
for _, chunk := range chunks { for _, chunk := range chunks {
if !chunk.IsChunkManifest { if !chunk.IsChunkManifest {
fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString())
fileIdsToDelete = append(fileIdsToDelete, chunk.GetFileIdString())
continue continue
} }
dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
@ -105,12 +105,12 @@ func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
} }
for _, dChunk := range dataChunks { for _, dChunk := range dataChunks {
fildIdsToDelete = append(fildIdsToDelete, dChunk.GetFileIdString())
fileIdsToDelete = append(fileIdsToDelete, dChunk.GetFileIdString())
} }
fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString())
fileIdsToDelete = append(fileIdsToDelete, chunk.GetFileIdString())
} }
f.doDeleteFileIds(fildIdsToDelete)
f.doDeleteFileIds(fileIdsToDelete)
} }
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {

2
weed/mount/meta_cache/cache_config.go

@ -6,7 +6,7 @@ var (
_ = util.Configuration(&cacheConfig{}) _ = util.Configuration(&cacheConfig{})
) )
// implementing util.Configuraion
// implementing util.Configuration
type cacheConfig struct { type cacheConfig struct {
dir string dir string
} }

2
weed/mount/meta_cache/meta_cache_subscribe.go

@ -46,7 +46,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
mc.invalidateFunc(newKey, message.NewEntry) mc.invalidateFunc(newKey, message.NewEntry)
} }
} else if filer_pb.IsCreate(resp) { } else if filer_pb.IsCreate(resp) {
// no need to invaalidate
// no need to invalidate
} else if filer_pb.IsDelete(resp) { } else if filer_pb.IsDelete(resp) {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
mc.invalidateFunc(oldKey, message.OldEntry) mc.invalidateFunc(oldKey, message.OldEntry)

4
weed/s3api/filer_multipart.go

@ -117,7 +117,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
} }
} }
} }
entryName := filepath.Base(*input.Key) entryName := filepath.Base(*input.Key)
dirName := filepath.Dir(*input.Key) dirName := filepath.Dir(*input.Key)
if dirName == "." { if dirName == "." {
@ -147,6 +147,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
} else if mime != "" { } else if mime != "" {
entry.Attributes.Mime = mime entry.Attributes.Mime = mime
} }
entry.Attributes.FileSize = uint64(offset)
}) })
if err != nil { if err != nil {
@ -244,6 +245,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
KeyMarker: input.KeyMarker, KeyMarker: input.KeyMarker,
MaxUploads: input.MaxUploads, MaxUploads: input.MaxUploads,
Prefix: input.Prefix, Prefix: input.Prefix,
IsTruncated: aws.Bool(false),
} }
entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, math.MaxInt32) entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, math.MaxInt32)

6
weed/server/volume_grpc_client_to_master.go

@ -94,13 +94,13 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption)
grpcConnection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption)
if err != nil { if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err) return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err)
} }
defer grpcConection.Close()
defer grpcConnection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
client := master_pb.NewSeaweedClient(grpcConnection)
stream, err := client.SendHeartbeat(ctx) stream, err := client.SendHeartbeat(ctx)
if err != nil { if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err) glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)

2
weed/topology/store_replicate.go

@ -205,7 +205,7 @@ func GetWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOpt
// has one local and has remote replications // has one local and has remote replications
copyCount := v.ReplicaPlacement.GetCopyCount() copyCount := v.ReplicaPlacement.GetCopyCount()
if len(lookupResult.Locations) < copyCount { if len(lookupResult.Locations) < copyCount {
err = fmt.Errorf("replicating opetations [%d] is less than volume %d replication copy count [%d]",
err = fmt.Errorf("replicating operations [%d] is less than volume %d replication copy count [%d]",
len(lookupResult.Locations), volumeId, copyCount) len(lookupResult.Locations), volumeId, copyCount)
} }
} }

8
weed/topology/volume_layout_test.go

@ -54,7 +54,7 @@ func TestVolumesBinaryState(t *testing.T) {
expectResultAfterUpdate []bool expectResultAfterUpdate []bool
}{ }{
{ {
name: "mark true when exist copies",
name: "mark true when copies exist",
state: state_exist, state: state_exist,
expectResult: []bool{true, true, true, false, true}, expectResult: []bool{true, true, true, false, true},
update: func() { update: func() {
@ -67,7 +67,7 @@ func TestVolumesBinaryState(t *testing.T) {
expectResultAfterUpdate: []bool{true, false, true, false, false}, expectResultAfterUpdate: []bool{true, false, true, false, false},
}, },
{ {
name: "mark true when inexist copies",
name: "mark true when no copies exist",
state: state_no, state: state_no,
expectResult: []bool{false, true, true, false, true}, expectResult: []bool{false, true, true, false, true},
update: func() { update: func() {
@ -92,7 +92,7 @@ func TestVolumesBinaryState(t *testing.T) {
} }
for index, val := range result { for index, val := range result {
if val != test.expectResult[index] { if val != test.expectResult[index] {
t.Fatalf("result not matched, index %d, got %v, expect %v\n",
t.Fatalf("result not matched, index %d, got %v, expected %v\n",
index, val, test.expectResult[index]) index, val, test.expectResult[index])
} }
} }
@ -107,7 +107,7 @@ func TestVolumesBinaryState(t *testing.T) {
} }
for index, val := range updateResult { for index, val := range updateResult {
if val != test.expectResultAfterUpdate[index] { if val != test.expectResultAfterUpdate[index] {
t.Fatalf("update result not matched, index %d, got %v, expect %v\n",
t.Fatalf("update result not matched, index %d, got %v, expected %v\n",
index, val, test.expectResultAfterUpdate[index]) index, val, test.expectResultAfterUpdate[index])
} }
} }

4
weed/util/chunk_cache/chunk_cache_on_disk.go

@ -128,7 +128,7 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin
} }
wanted := min(int(length), int(nv.Size)-int(offset)) wanted := min(int(length), int(nv.Size)-int(offset))
if wanted < 0 { if wanted < 0 {
// should never happen, but better than panicing
// should never happen, but better than panicking
return nil, ErrorOutOfBounds return nil, ErrorOutOfBounds
} }
data := make([]byte, wanted) data := make([]byte, wanted)
@ -151,7 +151,7 @@ func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, of
} }
wanted := min(len(data), int(nv.Size)-int(offset)) wanted := min(len(data), int(nv.Size)-int(offset))
if wanted < 0 { if wanted < 0 {
// should never happen, but better than panicing
// should never happen, but better than panicking
return 0, ErrorOutOfBounds return 0, ErrorOutOfBounds
} }
if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil { if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil {

2
weed/wdclient/resource_pool/semaphore.go

@ -105,7 +105,7 @@ func (s *unboundedSemaphore) Release() {
s.lock.Lock() s.lock.Lock()
s.counter += 1 s.counter += 1
if s.counter > 0 { if s.counter > 0 {
// Not broadcasting here since it's unlike we can satify all waiting
// Not broadcasting here since it's unlike we can satisfy all waiting
// goroutines. Instead, we will Signal again if there are left over // goroutines. Instead, we will Signal again if there are left over
// quota after Acquire, in case of lost wakeups. // quota after Acquire, in case of lost wakeups.
s.cond.Signal() s.cond.Signal()

Loading…
Cancel
Save