Browse Source

Fix volume.fsck crashing on EC volumes and add multi-volume vacuum support (#8406)

* helm: refine openshift-values.yaml to remove hardcoded UIDs

Remove hardcoded runAsUser, runAsGroup, and fsGroup from the
openshift-values.yaml example. This allows OpenShift's admission
controller to automatically assign a valid UID from the namespace's
allocated range, avoiding "forbidden" errors when UID 1000 is
outside the permissible range.

Updates #8381, #8390.

* helm: fix volume.logs and add consistent security context comments

* Update README.md

* fix volume.fsck crashing on EC volumes and add multi-volume vacuum support

* address comments
pull/8407/head
Chris Lu 2 days ago
committed by GitHub
parent
commit
cd6832249b
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 16
      weed/shell/command_volume_fsck.go
  2. 39
      weed/shell/command_volume_vacuum.go

16
weed/shell/command_volume_fsck.go

@ -53,6 +53,7 @@ type commandVolumeFsck struct {
tempFolder string
verbose *bool
forcePurging *bool
skipEcVolumes *bool
findMissingChunksInFiler *bool
verifyNeedle *bool
filerSigningKey string
@ -96,6 +97,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.verbose = fsckCommand.Bool("v", false, "verbose mode")
c.skipEcVolumes = fsckCommand.Bool("skipEcVolumes", false, "skip erasure coded volumes")
c.findMissingChunksInFiler = fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
c.collection = fsckCommand.String("collection", "", "the collection name")
volumeIds := fsckCommand.String("volumeId", "", "comma separated the volume id")
@ -117,6 +119,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
c.volumeIds = make(map[uint32]bool)
if *volumeIds != "" {
for _, volumeIdStr := range strings.Split(*volumeIds, ",") {
volumeIdStr = strings.TrimSpace(volumeIdStr)
if volumeIdInt, err := strconv.ParseUint(volumeIdStr, 10, 32); err == nil {
c.volumeIds[uint32(volumeIdInt)] = true
} else {
@ -168,6 +171,10 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo
eg.Go(func() error {
for volumeId, vinfo := range volumeIdToVInfo {
if *c.skipEcVolumes && vinfo.isEcVolume {
delete(volumeIdToVInfo, volumeId)
continue
}
if len(c.volumeIds) > 0 {
if _, ok := c.volumeIds[volumeId]; !ok {
delete(volumeIdToVInfo, volumeId)
@ -601,7 +608,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
if err = c.readFilerFileIdFile(volumeId, func(filerNeedleId types.NeedleId, itemPath util.FullPath) {
inUseCount++
if *c.verifyNeedle {
if *c.verifyNeedle && !vinfo.isEcVolume {
if needleValue, ok := volumeFileIdDb.Get(filerNeedleId); ok && !needleValue.Size.IsDeleted() {
if _, err := readNeedleStatus(c.env.option.GrpcDialOption, vinfo.server, volumeId, *needleValue); err != nil {
// files may be deleted during copying filesIds
@ -629,7 +636,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
if n.Size.IsDeleted() {
return nil
}
if cutoffFrom > 0 || modifyFrom > 0 {
if !vinfo.isEcVolume && (cutoffFrom > 0 || modifyFrom > 0) {
return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
@ -649,6 +656,11 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
return nil
})
} else {
if vinfo.isEcVolume && (cutoffFrom > 0 || modifyFrom > 0) {
if *c.verbose {
fmt.Fprintf(c.writer, "skipping time-based filtering for EC volume %d (cutoffFrom=%d, modifyFrom=%d)\n", volumeId, cutoffFrom, modifyFrom)
}
}
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
orphanFileCount++
orphanDataSize += uint64(n.Size)

39
weed/shell/command_volume_vacuum.go

@ -3,7 +3,10 @@ package shell
import (
"context"
"flag"
"fmt"
"io"
"strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
@ -36,7 +39,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ
volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
garbageThreshold := volumeVacuumCommand.Float64("garbageThreshold", 0.3, "vacuum when garbage is more than this limit")
collection := volumeVacuumCommand.String("collection", "", "vacuum this collection")
volumeId := volumeVacuumCommand.Uint("volumeId", 0, "the volume id")
volumeIds := volumeVacuumCommand.String("volumeId", "", "comma-separated list of volume IDs")
if err = volumeVacuumCommand.Parse(args); err != nil {
return nil
}
@ -45,16 +48,32 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ
return
}
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
GarbageThreshold: float32(*garbageThreshold),
VolumeId: uint32(*volumeId),
Collection: *collection,
var volumeIdInts []uint32
if *volumeIds != "" {
for _, volumeIdStr := range strings.Split(*volumeIds, ",") {
volumeIdStr = strings.TrimSpace(volumeIdStr)
if volumeIdInt, err := strconv.ParseUint(volumeIdStr, 10, 32); err == nil {
volumeIdInts = append(volumeIdInts, uint32(volumeIdInt))
} else {
return fmt.Errorf("parse volumeId string %s to int: %v", volumeIdStr, err)
}
}
} else {
volumeIdInts = append(volumeIdInts, 0)
}
for _, volumeId := range volumeIdInts {
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
GarbageThreshold: float32(*garbageThreshold),
VolumeId: volumeId,
Collection: *collection,
})
return err
})
return err
})
if err != nil {
return
if err != nil {
return err
}
}
return nil

Loading…
Cancel
Save