Browse Source

plugin worker: add admin script handler

pull/8491/head
Chris Lu 2 days ago
parent
commit
5b6ddd6c76
  1. 12
      weed/command/plugin_worker_test.go
  2. 5
      weed/command/worker.go
  3. 6
      weed/command/worker_runtime.go
  4. 4
      weed/command/worker_test.go
  5. 592
      weed/plugin/worker/admin_script_handler.go

12
weed/command/plugin_worker_test.go

@ -123,6 +123,14 @@ func TestParsePluginWorkerJobTypes(t *testing.T) {
if _, err = parsePluginWorkerJobTypes(" , "); err != nil {
t.Fatalf("expected empty list to resolve to default vacuum: %v", err)
}
jobTypes, err = parsePluginWorkerJobTypes("admin-script,script,admin_script")
if err != nil {
t.Fatalf("parsePluginWorkerJobTypes(admin script aliases) err = %v", err)
}
if len(jobTypes) != 1 || jobTypes[0] != "admin_script" {
t.Fatalf("expected admin_script alias to resolve, got %v", jobTypes)
}
}
func TestPluginWorkerDefaultJobTypes(t *testing.T) {
@ -130,8 +138,8 @@ func TestPluginWorkerDefaultJobTypes(t *testing.T) {
if err != nil {
t.Fatalf("parsePluginWorkerJobTypes(default setting) err = %v", err)
}
if len(jobTypes) != 3 {
t.Fatalf("expected default job types to include 3 handlers, got %v", jobTypes)
if len(jobTypes) != 4 {
t.Fatalf("expected default job types to include 4 handlers, got %v", jobTypes)
}
}

5
weed/command/worker.go

@ -7,11 +7,11 @@ import (
)
var cmdWorker = &Command{
UsageLine: "worker -admin=<admin_server> [-id=<worker_id>] [-jobType=vacuum,volume_balance,erasure_coding] [-workingDir=<path>] [-heartbeat=15s] [-reconnect=5s] [-maxDetect=1] [-maxExecute=4] [-metricsPort=<port>] [-metricsIp=<ip>] [-debug]",
UsageLine: "worker -admin=<admin_server> [-id=<worker_id>] [-jobType=vacuum,volume_balance,erasure_coding,admin_script] [-workingDir=<path>] [-heartbeat=15s] [-reconnect=5s] [-maxDetect=1] [-maxExecute=4] [-metricsPort=<port>] [-metricsIp=<ip>] [-debug]",
Short: "start a plugin.proto worker process",
Long: `Start an external plugin worker using weed/pb/plugin.proto over gRPC.
This command provides vacuum, volume_balance, and erasure_coding job type
This command provides vacuum, volume_balance, erasure_coding, and admin_script job type
contracts with the plugin stream runtime, including descriptor delivery,
heartbeat/load reporting, detection, and execution.
@ -25,6 +25,7 @@ Examples:
weed worker -admin=localhost:23646 -jobType=volume_balance
weed worker -admin=localhost:23646 -jobType=vacuum,volume_balance
weed worker -admin=localhost:23646 -jobType=erasure_coding
weed worker -admin=localhost:23646 -jobType=admin_script
weed worker -admin=admin.example.com:23646 -id=plugin-vacuum-a -heartbeat=10s
weed worker -admin=localhost:23646 -workingDir=/var/lib/seaweedfs-plugin
weed worker -admin=localhost:23646 -metricsPort=9327 -metricsIp=0.0.0.0

6
weed/command/worker_runtime.go

@ -23,7 +23,7 @@ import (
"google.golang.org/grpc"
)
const defaultPluginWorkerJobTypes = "vacuum,volume_balance,erasure_coding"
const defaultPluginWorkerJobTypes = "vacuum,volume_balance,erasure_coding,admin_script"
type pluginWorkerRunOptions struct {
AdminServer string
@ -156,6 +156,8 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExe
return pluginworker.NewVolumeBalanceHandler(dialOption), nil
case "erasure_coding":
return pluginworker.NewErasureCodingHandler(dialOption, workingDir), nil
case "admin_script":
return pluginworker.NewAdminScriptHandler(dialOption), nil
default:
return nil, fmt.Errorf("unsupported plugin job type %q", canonicalJobType)
}
@ -220,6 +222,8 @@ func canonicalPluginWorkerJobType(jobType string) (string, error) {
return "volume_balance", nil
case "erasure_coding", "erasure-coding", "erasure.coding", "ec":
return "erasure_coding", nil
case "admin_script", "admin-script", "admin.script", "script", "admin":
return "admin_script", nil
default:
return "", fmt.Errorf("unsupported plugin job type %q", jobType)
}

4
weed/command/worker_test.go

@ -7,7 +7,7 @@ func TestWorkerDefaultJobTypes(t *testing.T) {
if err != nil {
t.Fatalf("parsePluginWorkerJobTypes(default worker flag) err = %v", err)
}
if len(jobTypes) != 3 {
t.Fatalf("expected default worker job types to include 3 handlers, got %v", jobTypes)
if len(jobTypes) != 4 {
t.Fatalf("expected default worker job types to include 4 handlers, got %v", jobTypes)
}
}

592
weed/plugin/worker/admin_script_handler.go

@ -0,0 +1,592 @@
package pluginworker
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"regexp"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/shell"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
adminScriptJobType = "admin_script"
maxAdminScriptOutputBytes = 16 * 1024
)
var adminScriptTokenRegex = regexp.MustCompile(`'.*?'|".*?"|\S+`)
type AdminScriptHandler struct {
grpcDialOption grpc.DialOption
}
func NewAdminScriptHandler(grpcDialOption grpc.DialOption) *AdminScriptHandler {
return &AdminScriptHandler{grpcDialOption: grpcDialOption}
}
func (h *AdminScriptHandler) Capability() *plugin_pb.JobTypeCapability {
return &plugin_pb.JobTypeCapability{
JobType: adminScriptJobType,
CanDetect: true,
CanExecute: true,
MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 1,
DisplayName: "Admin Script",
Description: "Execute custom admin shell scripts",
Weight: 20,
}
}
func (h *AdminScriptHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
return &plugin_pb.JobTypeDescriptor{
JobType: adminScriptJobType,
DisplayName: "Admin Script",
Description: "Run custom admin shell scripts not covered by built-in job types",
Icon: "fas fa-terminal",
DescriptorVersion: 1,
AdminConfigForm: &plugin_pb.ConfigForm{
FormId: "admin-script-admin",
Title: "Admin Script Configuration",
Description: "Define the admin shell script to execute.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "script",
Title: "Script",
Description: "Commands run sequentially by the admin script worker.",
Fields: []*plugin_pb.ConfigField{
{
Name: "script_name",
Label: "Script Name",
Description: "Optional label used in job summaries.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "script",
Label: "Script",
Description: "Admin shell commands to execute (one per line).",
HelpText: "Lock/unlock are handled by the admin server; omit lock/unlock lines.",
Placeholder: "volume.balance -apply\nvolume.fix.replication -apply",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXTAREA,
Required: true,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"script_name": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"script": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
Enabled: false,
DetectionIntervalSeconds: 24 * 60 * 60,
DetectionTimeoutSeconds: 300,
MaxJobsPerDetection: 1,
GlobalExecutionConcurrency: 1,
PerWorkerExecutionConcurrency: 1,
RetryLimit: 0,
RetryBackoffSeconds: 30,
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{},
}
}
func (h *AdminScriptHandler) Detect(ctx context.Context, request *plugin_pb.RunDetectionRequest, sender DetectionSender) error {
if request == nil {
return fmt.Errorf("run detection request is nil")
}
if sender == nil {
return fmt.Errorf("detection sender is nil")
}
if request.JobType != "" && request.JobType != adminScriptJobType {
return fmt.Errorf("job type %q is not handled by admin_script worker", request.JobType)
}
script := normalizeAdminScript(readStringConfig(request.GetAdminConfigValues(), "script", ""))
scriptName := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "script_name", ""))
commands := parseAdminScriptCommands(script)
execCount := countExecutableCommands(commands)
if execCount == 0 {
_ = sender.SendActivity(buildDetectorActivity(
"no_script",
"ADMIN SCRIPT: No executable commands configured",
map[string]*plugin_pb.ConfigValue{
"command_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(execCount)},
},
},
))
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: adminScriptJobType,
Proposals: []*plugin_pb.JobProposal{},
HasMore: false,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: adminScriptJobType,
Success: true,
TotalProposals: 0,
})
}
proposal := buildAdminScriptProposal(script, scriptName, execCount)
proposals := []*plugin_pb.JobProposal{proposal}
hasMore := false
maxResults := int(request.MaxResults)
if maxResults > 0 && len(proposals) > maxResults {
proposals = proposals[:maxResults]
hasMore = true
}
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: adminScriptJobType,
Proposals: proposals,
HasMore: hasMore,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: adminScriptJobType,
Success: true,
TotalProposals: 1,
})
}
func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequest, sender ExecutionSender) error {
if request == nil || request.Job == nil {
return fmt.Errorf("execute job request is nil")
}
if sender == nil {
return fmt.Errorf("execution sender is nil")
}
if request.Job.JobType != "" && request.Job.JobType != adminScriptJobType {
return fmt.Errorf("job type %q is not handled by admin_script worker", request.Job.JobType)
}
script := normalizeAdminScript(readStringConfig(request.Job.Parameters, "script", ""))
scriptName := strings.TrimSpace(readStringConfig(request.Job.Parameters, "script_name", ""))
if script == "" {
script = normalizeAdminScript(readStringConfig(request.GetAdminConfigValues(), "script", ""))
}
if scriptName == "" {
scriptName = strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "script_name", ""))
}
commands := parseAdminScriptCommands(script)
execCommands := filterExecutableCommands(commands)
if len(execCommands) == 0 {
return sender.SendCompleted(&plugin_pb.JobCompleted{
Success: false,
ErrorMessage: "no executable admin script commands configured",
})
}
commandEnv, cancel, err := h.buildAdminScriptCommandEnv(ctx, request.ClusterContext)
if err != nil {
return sender.SendCompleted(&plugin_pb.JobCompleted{
Success: false,
ErrorMessage: err.Error(),
})
}
defer cancel()
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: "admin script job accepted",
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("assigned", "admin script job accepted"),
},
}); err != nil {
return err
}
output := &limitedBuffer{maxBytes: maxAdminScriptOutputBytes}
executed := 0
errorMessages := make([]string, 0)
executedCommands := make([]string, 0, len(execCommands))
for _, cmd := range execCommands {
if ctx.Err() != nil {
errorMessages = append(errorMessages, ctx.Err().Error())
break
}
commandLine := formatAdminScriptCommand(cmd)
executedCommands = append(executedCommands, commandLine)
_, _ = fmt.Fprintf(output, "$ %s\n", commandLine)
found := false
for _, command := range shell.Commands {
if command.Name() != cmd.Name {
continue
}
found = true
if err := command.Do(cmd.Args, commandEnv, output); err != nil {
msg := fmt.Sprintf("%s: %v", cmd.Name, err)
errorMessages = append(errorMessages, msg)
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: percentProgress(executed+1, len(execCommands)),
Stage: "error",
Message: msg,
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("error", msg),
},
})
}
break
}
if !found {
msg := fmt.Sprintf("unknown admin command: %s", cmd.Name)
errorMessages = append(errorMessages, msg)
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: percentProgress(executed+1, len(execCommands)),
Stage: "error",
Message: msg,
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("error", msg),
},
})
}
executed++
progress := percentProgress(executed, len(execCommands))
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: progress,
Stage: "running",
Message: fmt.Sprintf("executed %d/%d command(s)", executed, len(execCommands)),
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("running", commandLine),
},
})
}
scriptHash := hashAdminScript(script)
resultSummary := fmt.Sprintf("admin script executed (%d command(s))", executed)
if scriptName != "" {
resultSummary = fmt.Sprintf("admin script %q executed (%d command(s))", scriptName, executed)
}
outputValues := map[string]*plugin_pb.ConfigValue{
"command_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(executed)},
},
"error_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(errorMessages))},
},
"script_hash": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptHash},
},
}
if scriptName != "" {
outputValues["script_name"] = &plugin_pb.ConfigValue{
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptName},
}
}
if len(executedCommands) > 0 {
outputValues["commands"] = &plugin_pb.ConfigValue{
Kind: &plugin_pb.ConfigValue_StringList{
StringList: &plugin_pb.StringList{Values: executedCommands},
},
}
}
if out := strings.TrimSpace(output.String()); out != "" {
outputValues["output"] = &plugin_pb.ConfigValue{
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: out},
}
}
if output.truncated {
outputValues["output_truncated"] = &plugin_pb.ConfigValue{
Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true},
}
}
success := len(errorMessages) == 0 && ctx.Err() == nil
errorMessage := ""
if !success {
errorMessage = strings.Join(errorMessages, "; ")
if ctx.Err() != nil {
if errorMessage == "" {
errorMessage = ctx.Err().Error()
} else {
errorMessage = fmt.Sprintf("%s; %s", errorMessage, ctx.Err().Error())
}
}
}
return sender.SendCompleted(&plugin_pb.JobCompleted{
Success: success,
ErrorMessage: errorMessage,
Result: &plugin_pb.JobResult{
Summary: resultSummary,
OutputValues: outputValues,
},
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("completed", resultSummary),
},
CompletedAt: timestamppb.Now(),
})
}
type adminScriptCommand struct {
Name string
Args []string
Raw string
}
func normalizeAdminScript(script string) string {
script = strings.ReplaceAll(script, "\r\n", "\n")
return strings.TrimSpace(script)
}
func parseAdminScriptCommands(script string) []adminScriptCommand {
script = normalizeAdminScript(script)
if script == "" {
return nil
}
lines := strings.Split(script, "\n")
commands := make([]adminScriptCommand, 0)
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
for _, chunk := range strings.Split(line, ";") {
chunk = strings.TrimSpace(chunk)
if chunk == "" {
continue
}
parts := adminScriptTokenRegex.FindAllString(chunk, -1)
if len(parts) == 0 {
continue
}
args := make([]string, 0, len(parts)-1)
for _, arg := range parts[1:] {
args = append(args, strings.Trim(arg, "\"'"))
}
commands = append(commands, adminScriptCommand{
Name: strings.TrimSpace(parts[0]),
Args: args,
Raw: chunk,
})
}
}
return commands
}
func filterExecutableCommands(commands []adminScriptCommand) []adminScriptCommand {
exec := make([]adminScriptCommand, 0, len(commands))
for _, cmd := range commands {
if cmd.Name == "" {
continue
}
if isAdminScriptLockCommand(cmd.Name) {
continue
}
exec = append(exec, cmd)
}
return exec
}
func countExecutableCommands(commands []adminScriptCommand) int {
count := 0
for _, cmd := range commands {
if cmd.Name == "" {
continue
}
if isAdminScriptLockCommand(cmd.Name) {
continue
}
count++
}
return count
}
func isAdminScriptLockCommand(name string) bool {
switch strings.ToLower(strings.TrimSpace(name)) {
case "lock", "unlock":
return true
default:
return false
}
}
func buildAdminScriptProposal(script, scriptName string, commandCount int) *plugin_pb.JobProposal {
scriptHash := hashAdminScript(script)
summary := "Run admin script"
if scriptName != "" {
summary = fmt.Sprintf("Run admin script: %s", scriptName)
}
detail := fmt.Sprintf("Admin script with %d command(s)", commandCount)
proposalID := fmt.Sprintf("admin-script-%s-%d", scriptHash[:8], time.Now().UnixNano())
labels := map[string]string{
"script_hash": scriptHash,
}
if scriptName != "" {
labels["script_name"] = scriptName
}
return &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: "admin-script:" + scriptHash,
JobType: adminScriptJobType,
Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL,
Summary: summary,
Detail: detail,
Parameters: map[string]*plugin_pb.ConfigValue{
"script": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: script},
},
"script_name": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptName},
},
"script_hash": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptHash},
},
"command_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(commandCount)},
},
},
Labels: labels,
}
}
func (h *AdminScriptHandler) buildAdminScriptCommandEnv(
ctx context.Context,
clusterContext *plugin_pb.ClusterContext,
) (*shell.CommandEnv, context.CancelFunc, error) {
if clusterContext == nil {
return nil, nil, fmt.Errorf("cluster context is required")
}
masters := normalizeAddressList(clusterContext.MasterGrpcAddresses)
if len(masters) == 0 {
return nil, nil, fmt.Errorf("missing master addresses for admin script")
}
filerGroup := ""
mastersValue := strings.Join(masters, ",")
options := shell.ShellOptions{
Masters: &mastersValue,
GrpcDialOption: h.grpcDialOption,
FilerGroup: &filerGroup,
Directory: "/",
}
filers := normalizeAddressList(clusterContext.FilerGrpcAddresses)
if len(filers) > 0 {
options.FilerAddress = pb.ServerAddress(filers[0])
} else {
glog.V(1).Infof("admin script worker missing filer address; filer-dependent commands may fail")
}
commandEnv := shell.NewCommandEnv(&options)
commandEnv.SetNoLock(true)
ctx, cancel := context.WithCancel(ctx)
go commandEnv.MasterClient.KeepConnectedToMaster(ctx)
return commandEnv, cancel, nil
}
func normalizeAddressList(addresses []string) []string {
normalized := make([]string, 0, len(addresses))
seen := make(map[string]struct{}, len(addresses))
for _, address := range addresses {
address = strings.TrimSpace(address)
if address == "" {
continue
}
if _, exists := seen[address]; exists {
continue
}
seen[address] = struct{}{}
normalized = append(normalized, address)
}
return normalized
}
func hashAdminScript(script string) string {
sum := sha256.Sum256([]byte(script))
return hex.EncodeToString(sum[:])
}
func formatAdminScriptCommand(cmd adminScriptCommand) string {
if len(cmd.Args) == 0 {
return cmd.Name
}
return fmt.Sprintf("%s %s", cmd.Name, strings.Join(cmd.Args, " "))
}
func percentProgress(done, total int) float64 {
if total <= 0 {
return 0
}
if done < 0 {
done = 0
}
if done > total {
done = total
}
return float64(done) / float64(total) * 100
}
type limitedBuffer struct {
buf bytes.Buffer
maxBytes int
truncated bool
}
func (b *limitedBuffer) Write(p []byte) (int, error) {
if b == nil {
return len(p), nil
}
if b.maxBytes <= 0 {
b.truncated = true
return len(p), nil
}
remaining := b.maxBytes - b.buf.Len()
if remaining <= 0 {
b.truncated = true
return len(p), nil
}
if len(p) > remaining {
_, _ = b.buf.Write(p[:remaining])
b.truncated = true
return len(p), nil
}
_, _ = b.buf.Write(p)
return len(p), nil
}
func (b *limitedBuffer) String() string {
if b == nil {
return ""
}
return b.buf.String()
}
Loading…
Cancel
Save