From 688a91039de5bbd61932673145566ea8bcc521d0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 24 Jul 2025 12:08:19 -0700 Subject: [PATCH] generate write load --- docker/admin_integration/admin-entrypoint.sh | 68 +++++++++++++++++++- docker/admin_integration/load-generator.go | 27 +++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/docker/admin_integration/admin-entrypoint.sh b/docker/admin_integration/admin-entrypoint.sh index bafda7d47..9deb0076d 100755 --- a/docker/admin_integration/admin-entrypoint.sh +++ b/docker/admin_integration/admin-entrypoint.sh @@ -26,7 +26,6 @@ import ( "log" "net/http" "os" - "strconv" "time" ) @@ -76,6 +75,71 @@ func (s *AdminServer) statusHandler(w http.ResponseWriter, r *http.Request) { }) } +func (s *AdminServer) registerWorkerHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var worker Worker + if err := json.NewDecoder(r.Body).Decode(&worker); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + worker.LastSeen = time.Now() + worker.Status = "active" + + // Check if worker already exists, update if so + found := false + for i, w := range s.workers { + if w.ID == worker.ID { + s.workers[i] = worker + found = true + break + } + } + + if !found { + s.workers = append(s.workers, worker) + log.Printf("Registered new worker: %s with capabilities: %v", worker.ID, worker.Capabilities) + } else { + log.Printf("Updated worker: %s", worker.ID) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "registered"}) +} + +func (s *AdminServer) heartbeatHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var heartbeat struct { + WorkerID string `json:"worker_id"` + Status string `json:"status"` + } + + if err := json.NewDecoder(r.Body).Decode(&heartbeat); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Update worker last seen time + for i, w := range s.workers { + if w.ID == heartbeat.WorkerID { + s.workers[i].LastSeen = time.Now() + s.workers[i].Status = heartbeat.Status + break + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} + func (s *AdminServer) detectVolumesForEC() { // Simulate volume detection logic // In real implementation, this would query the master for volume status @@ -134,6 +198,8 @@ func main() { http.HandleFunc("/health", server.healthHandler) http.HandleFunc("/status", server.statusHandler) + http.HandleFunc("/register", server.registerWorkerHandler) + http.HandleFunc("/heartbeat", server.heartbeatHandler) // Start volume detection server.detectVolumesForEC() diff --git a/docker/admin_integration/load-generator.go b/docker/admin_integration/load-generator.go index c52939459..6fca233d4 100644 --- a/docker/admin_integration/load-generator.go +++ b/docker/admin_integration/load-generator.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "mime/multipart" "net/http" "os" "strconv" @@ -83,12 +84,34 @@ func (lg *LoadGenerator) uploadFile(filename string, data []byte) error { url = fmt.Sprintf("http://%s/%s/%s", lg.filerAddr, lg.collection, filename) } - req, err := http.NewRequest("POST", url, bytes.NewReader(data)) + // Create multipart form data + var b bytes.Buffer + writer := multipart.NewWriter(&b) + + // Create form file field + part, err := writer.CreateFormFile("file", filename) + if err != nil { + return err + } + + // Write file data + _, err = part.Write(data) + if err != nil { + return err + } + + // Close the multipart writer + err = writer.Close() + if err != nil { + return err + } + + req, err := http.NewRequest("POST", url, &b) if err != nil { return err } - req.Header.Set("Content-Type", "application/octet-stream") + req.Header.Set("Content-Type", writer.FormDataContentType()) client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req)