Browse Source

generate write load

worker-execute-ec-tasks
chrislu 5 months ago
parent
commit
688a91039d
  1. 68
      docker/admin_integration/admin-entrypoint.sh
  2. 27
      docker/admin_integration/load-generator.go

68
docker/admin_integration/admin-entrypoint.sh

@ -26,7 +26,6 @@ import (
"log" "log"
"net/http" "net/http"
"os" "os"
"strconv"
"time" "time"
) )
@ -76,6 +75,71 @@ func (s *AdminServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}) })
} }
func (s *AdminServer) registerWorkerHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var worker Worker
if err := json.NewDecoder(r.Body).Decode(&worker); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
worker.LastSeen = time.Now()
worker.Status = "active"
// Check if worker already exists, update if so
found := false
for i, w := range s.workers {
if w.ID == worker.ID {
s.workers[i] = worker
found = true
break
}
}
if !found {
s.workers = append(s.workers, worker)
log.Printf("Registered new worker: %s with capabilities: %v", worker.ID, worker.Capabilities)
} else {
log.Printf("Updated worker: %s", worker.ID)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "registered"})
}
func (s *AdminServer) heartbeatHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var heartbeat struct {
WorkerID string `json:"worker_id"`
Status string `json:"status"`
}
if err := json.NewDecoder(r.Body).Decode(&heartbeat); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Update worker last seen time
for i, w := range s.workers {
if w.ID == heartbeat.WorkerID {
s.workers[i].LastSeen = time.Now()
s.workers[i].Status = heartbeat.Status
break
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}
func (s *AdminServer) detectVolumesForEC() { func (s *AdminServer) detectVolumesForEC() {
// Simulate volume detection logic // Simulate volume detection logic
// In real implementation, this would query the master for volume status // In real implementation, this would query the master for volume status
@ -134,6 +198,8 @@ func main() {
http.HandleFunc("/health", server.healthHandler) http.HandleFunc("/health", server.healthHandler)
http.HandleFunc("/status", server.statusHandler) http.HandleFunc("/status", server.statusHandler)
http.HandleFunc("/register", server.registerWorkerHandler)
http.HandleFunc("/heartbeat", server.heartbeatHandler)
// Start volume detection // Start volume detection
server.detectVolumesForEC() server.detectVolumesForEC()

27
docker/admin_integration/load-generator.go

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"mime/multipart"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@ -83,12 +84,34 @@ func (lg *LoadGenerator) uploadFile(filename string, data []byte) error {
url = fmt.Sprintf("http://%s/%s/%s", lg.filerAddr, lg.collection, filename) url = fmt.Sprintf("http://%s/%s/%s", lg.filerAddr, lg.collection, filename)
} }
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
// Create multipart form data
var b bytes.Buffer
writer := multipart.NewWriter(&b)
// Create form file field
part, err := writer.CreateFormFile("file", filename)
if err != nil {
return err
}
// Write file data
_, err = part.Write(data)
if err != nil {
return err
}
// Close the multipart writer
err = writer.Close()
if err != nil {
return err
}
req, err := http.NewRequest("POST", url, &b)
if err != nil { if err != nil {
return err return err
} }
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Type", writer.FormDataContentType())
client := &http.Client{Timeout: 30 * time.Second} client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req) resp, err := client.Do(req)

Loading…
Cancel
Save