diff --git a/docker/admin_integration/Dockerfile.admin b/docker/admin_integration/Dockerfile.admin new file mode 100644 index 000000000..9aaf69960 --- /dev/null +++ b/docker/admin_integration/Dockerfile.admin @@ -0,0 +1,33 @@ +# Final stage +FROM alpine:latest + +# Install dependencies including Go for the entrypoint script +RUN apk --no-cache add curl ca-certificates go + +WORKDIR /root/ + +# Copy admin server binary (if it exists) or create a simple one +COPY ./docker/admin_integration/admin-entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +# Create directories +RUN mkdir -p /data /config /work + +# Expose admin port +EXPOSE 9900 + +# Set environment variables +ENV MASTER_ADDRESS="master:9333" +ENV ADMIN_PORT="9900" +ENV SCAN_INTERVAL="30s" +ENV WORKER_TIMEOUT="5m" +ENV TASK_TIMEOUT="30m" +ENV MAX_RETRIES="3" +ENV MAX_CONCURRENT_TASKS="5" + +# Health check +HEALTHCHECK --interval=15s --timeout=5s --start-period=30s --retries=3 \ + CMD curl -f http://localhost:9900/health || exit 1 + +# Start admin server +ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/docker/admin_integration/Dockerfile.load b/docker/admin_integration/Dockerfile.load new file mode 100644 index 000000000..897061fac --- /dev/null +++ b/docker/admin_integration/Dockerfile.load @@ -0,0 +1,44 @@ +FROM golang:1.24-alpine AS builder + +# Install dependencies +RUN apk add --no-cache git build-base + +# Set working directory +WORKDIR /app + +# Copy and create load generator +COPY ./docker/admin_integration/load-generator.go . +COPY go.mod go.sum ./ +RUN go mod download +RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o load-generator load-generator.go + +# Final stage +FROM alpine:latest + +# Install dependencies +RUN apk --no-cache add curl ca-certificates openssl + +WORKDIR /root/ + +# Copy the binary +COPY --from=builder /app/load-generator . + +# Copy load generator script +COPY ./docker/admin_integration/load-entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +# Create directories for test data +RUN mkdir -p /test-data /temp + +# Set environment variables +ENV FILER_ADDRESS="filer:8888" +ENV MASTER_ADDRESS="master:9333" +ENV WRITE_RATE="10" +ENV DELETE_RATE="2" +ENV FILE_SIZE_MIN="1MB" +ENV FILE_SIZE_MAX="5MB" +ENV TEST_DURATION="3600" +ENV COLLECTION="" + +# Start load generator +ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/docker/admin_integration/Dockerfile.monitor b/docker/admin_integration/Dockerfile.monitor new file mode 100644 index 000000000..668eeb543 --- /dev/null +++ b/docker/admin_integration/Dockerfile.monitor @@ -0,0 +1,48 @@ +FROM golang:1.24-alpine AS builder + +# Install dependencies +RUN apk add --no-cache git build-base + +# Set working directory +WORKDIR /app + +# Copy and create monitor +COPY ./docker/admin_integration/monitor.go . +COPY go.mod go.sum ./ +RUN go mod download +RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o monitor monitor.go + +# Final stage +FROM alpine:latest + +# Install dependencies +RUN apk --no-cache add curl ca-certificates jq + +WORKDIR /root/ + +# Copy the binary +COPY --from=builder /app/monitor . + +# Copy monitor scripts +COPY ./docker/admin_integration/monitor-entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +# Create monitoring directories +RUN mkdir -p /monitor-data /logs + +# Expose monitor port +EXPOSE 9999 + +# Set environment variables +ENV MASTER_ADDRESS="master:9333" +ENV ADMIN_ADDRESS="admin:9900" +ENV FILER_ADDRESS="filer:8888" +ENV MONITOR_INTERVAL="10s" +ENV LOG_LEVEL="info" + +# Health check +HEALTHCHECK --interval=30s --timeout=5s --start-period=30s --retries=3 \ + CMD curl -f http://localhost:9999/health || exit 1 + +# Start monitor +ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/docker/admin_integration/Dockerfile.worker b/docker/admin_integration/Dockerfile.worker new file mode 100644 index 000000000..1e8de5b2c --- /dev/null +++ b/docker/admin_integration/Dockerfile.worker @@ -0,0 +1,33 @@ +# Final stage +FROM alpine:latest + +# Install dependencies including Go for the entrypoint script +RUN apk --no-cache add curl ca-certificates go + +WORKDIR /root/ + +# Copy worker entrypoint script +COPY ./docker/admin_integration/worker-entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +# Create working directories +RUN mkdir -p /work /tmp/ec_work + +# Expose worker port +EXPOSE 9001 + +# Set environment variables +ENV ADMIN_ADDRESS="admin:9900" +ENV WORKER_ID="worker-1" +ENV WORKER_ADDRESS="worker:9001" +ENV CAPABILITIES="erasure_coding" +ENV MAX_CONCURRENT="2" +ENV WORK_DIR="/work" +ENV HEARTBEAT_INTERVAL="10s" + +# Health check +HEALTHCHECK --interval=15s --timeout=5s --start-period=30s --retries=3 \ + CMD curl -f http://localhost:9001/health || exit 1 + +# Start worker +ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/docker/admin_integration/EC-TESTING-README.md b/docker/admin_integration/EC-TESTING-README.md new file mode 100644 index 000000000..1a74d3a62 --- /dev/null +++ b/docker/admin_integration/EC-TESTING-README.md @@ -0,0 +1,433 @@ +# SeaweedFS EC Worker Testing Environment + +This Docker Compose setup provides a comprehensive testing environment for SeaweedFS Erasure Coding (EC) workers with real workload simulation. + +## πŸ“‚ Directory Structure + +The testing environment is located in `docker/admin_integration/` and includes: + +``` +docker/admin_integration/ +β”œβ”€β”€ Makefile # Main management interface +β”œβ”€β”€ docker-compose-ec-test.yml # Docker compose configuration +β”œβ”€β”€ EC-TESTING-README.md # This documentation +β”œβ”€β”€ Dockerfile.admin # Admin server image +β”œβ”€β”€ Dockerfile.worker # EC worker image +β”œβ”€β”€ Dockerfile.load # Load generator image +β”œβ”€β”€ Dockerfile.monitor # Monitor service image +β”œβ”€β”€ admin-entrypoint.sh # Admin server startup script +β”œβ”€β”€ worker-entrypoint.sh # Worker startup script +β”œβ”€β”€ load-generator.go # Load generator source code +β”œβ”€β”€ load-entrypoint.sh # Load generator startup script +β”œβ”€β”€ monitor.go # Monitor service source code +└── monitor-entrypoint.sh # Monitor startup script +``` + +## πŸ—οΈ Architecture + +The testing environment includes: + +- **1 Master Server** (port 9333) - Coordinates the cluster with 50MB volume size limit +- **6 Volume Servers** (ports 8080-8085) - Distributed across 2 data centers and 3 racks for diversity +- **1 Filer** (port 8888) - Provides file system interface +- **1 Admin Server** (port 9900) - Detects volumes needing EC and manages workers +- **3 EC Workers** - Execute erasure coding tasks with different capabilities +- **1 Load Generator** - Continuously writes and deletes files to trigger EC +- **1 Monitor** (port 9999) - Tracks cluster health and EC progress + +## πŸš€ Quick Start + +### Prerequisites + +- Docker and Docker Compose installed +- GNU Make installed +- At least 4GB RAM available for containers +- Ports 8080-8085, 8888, 9333, 9900, 9999 available + +### Start the Environment + +```bash +# Navigate to the admin integration directory +cd docker/admin_integration/ + +# Show available commands +make help + +# Start the complete testing environment +make start +``` + +The `make start` command will: +1. Build all necessary Docker images +2. Start all services in the correct order +3. Wait for services to be ready +4. Display monitoring URLs and run health checks + +### Alternative Commands + +```bash +# Quick start aliases +make up # Same as 'make start' + +# Development mode (higher load for faster testing) +make dev-start + +# Build images without starting +make build +``` + +## πŸ“‹ Available Make Targets + +Run `make help` to see all available targets: + +### **πŸš€ Main Operations** +- `make start` - Start the complete EC testing environment +- `make stop` - Stop all services +- `make restart` - Restart all services +- `make clean` - Complete cleanup (containers, volumes, images) + +### **πŸ“Š Monitoring & Status** +- `make health` - Check health of all services +- `make status` - Show status of all containers +- `make urls` - Display all monitoring URLs +- `make monitor` - Open monitor dashboard in browser +- `make monitor-status` - Show monitor status via API +- `make volume-status` - Show volume status from master +- `make admin-status` - Show admin server status +- `make cluster-status` - Show complete cluster status + +### **πŸ“‹ Logs Management** +- `make logs` - Show logs from all services +- `make logs-admin` - Show admin server logs +- `make logs-workers` - Show all worker logs +- `make logs-worker1/2/3` - Show specific worker logs +- `make logs-load` - Show load generator logs +- `make logs-monitor` - Show monitor logs +- `make backup-logs` - Backup all logs to files + +### **βš–οΈ Scaling & Testing** +- `make scale-workers WORKERS=5` - Scale workers to 5 instances +- `make scale-load RATE=25` - Increase load generation rate +- `make test-ec` - Run focused EC test scenario + +### **πŸ”§ Development & Debug** +- `make shell-admin` - Open shell in admin container +- `make shell-worker1` - Open shell in worker container +- `make debug` - Show debug information +- `make troubleshoot` - Run troubleshooting checks + +## πŸ“Š Monitoring URLs + +| Service | URL | Description | +|---------|-----|-------------| +| Master UI | http://localhost:9333 | Cluster status and topology | +| Filer | http://localhost:8888 | File operations | +| Admin Server | http://localhost:9900/status | Task management | +| Monitor | http://localhost:9999/status | Complete cluster monitoring | +| Volume Servers | http://localhost:8080-8085/status | Individual volume server stats | + +Quick access: `make urls` or `make monitor` + +## πŸ”„ How EC Testing Works + +### 1. Continuous Load Generation +- **Write Rate**: 10 files/second (1-5MB each) +- **Delete Rate**: 2 files/second +- **Target**: Fill volumes to 50MB limit quickly + +### 2. Volume Detection +- Admin server scans master every 30 seconds +- Identifies volumes >40MB (80% of 50MB limit) +- Queues EC tasks for eligible volumes + +### 3. EC Worker Assignment +- **Worker 1**: EC specialist (max 2 concurrent tasks) +- **Worker 2**: EC + Vacuum hybrid (max 2 concurrent tasks) +- **Worker 3**: EC + Vacuum hybrid (max 1 concurrent task) + +### 4. Comprehensive EC Process +Each EC task follows 6 phases: +1. **Copy Volume Data** (5-15%) - Stream .dat/.idx files locally +2. **Mark Read-Only** (20-25%) - Ensure data consistency +3. **Local Encoding** (30-60%) - Create 14 shards (10+4 Reed-Solomon) +4. **Calculate Placement** (65-70%) - Smart rack-aware distribution +5. **Distribute Shards** (75-90%) - Upload to optimal servers +6. **Verify & Cleanup** (95-100%) - Validate and clean temporary files + +### 5. Real-Time Monitoring +- Volume analysis and EC candidate detection +- Worker health and task progress +- No data loss verification +- Performance metrics + +## πŸ“‹ Key Features Tested + +### βœ… EC Implementation Features +- [x] Local volume data copying with progress tracking +- [x] Local Reed-Solomon encoding (10+4 shards) +- [x] Intelligent shard placement with rack awareness +- [x] Load balancing across available servers +- [x] Backup server selection for redundancy +- [x] Detailed step-by-step progress tracking +- [x] Comprehensive error handling and recovery + +### βœ… Infrastructure Features +- [x] Multi-datacenter topology (dc1, dc2) +- [x] Rack diversity (rack1, rack2, rack3) +- [x] Volume size limits (50MB) +- [x] Worker capability matching +- [x] Health monitoring and alerting +- [x] Continuous workload simulation + +## πŸ› οΈ Common Usage Patterns + +### Basic Testing Workflow +```bash +# Start environment +make start + +# Watch progress +make monitor-status + +# Check for EC candidates +make volume-status + +# View worker activity +make logs-workers + +# Stop when done +make stop +``` + +### High-Load Testing +```bash +# Start with higher load +make dev-start + +# Scale up workers and load +make scale-workers WORKERS=5 +make scale-load RATE=50 + +# Monitor intensive EC activity +make logs-admin +``` + +### Debugging Issues +```bash +# Check port conflicts and system state +make troubleshoot + +# View specific service logs +make logs-admin +make logs-worker1 + +# Get shell access for debugging +make shell-admin +make shell-worker1 + +# Check detailed status +make debug +``` + +### Development Iteration +```bash +# Quick restart after code changes +make restart + +# Rebuild and restart +make clean +make start + +# Monitor specific components +make logs-monitor +``` + +## πŸ“ˆ Expected Results + +### Successful EC Testing Shows: +1. **Volume Growth**: Steady increase in volume sizes toward 50MB limit +2. **EC Detection**: Admin server identifies volumes >40MB for EC +3. **Task Assignment**: Workers receive and execute EC tasks +4. **Shard Distribution**: 14 shards distributed across 6 volume servers +5. **No Data Loss**: All files remain accessible during and after EC +6. **Performance**: EC tasks complete within estimated timeframes + +### Sample Monitor Output: +```bash +# Check current status +make monitor-status + +# Output example: +{ + "monitor": { + "uptime": "15m30s", + "master_addr": "master:9333", + "admin_addr": "admin:9900" + }, + "stats": { + "VolumeCount": 12, + "ECTasksDetected": 3, + "WorkersActive": 3 + } +} +``` + +## πŸ”§ Configuration + +### Environment Variables + +You can customize the environment by setting variables: + +```bash +# High load testing +WRITE_RATE=25 DELETE_RATE=5 make start + +# Extended test duration +TEST_DURATION=7200 make start # 2 hours +``` + +### Scaling Examples + +```bash +# Scale workers +make scale-workers WORKERS=6 + +# Increase load generation +make scale-load RATE=30 + +# Combined scaling +make scale-workers WORKERS=4 +make scale-load RATE=40 +``` + +## 🧹 Cleanup Options + +```bash +# Stop services only +make stop + +# Remove containers but keep volumes +make down + +# Remove data volumes only +make clean-volumes + +# Remove built images only +make clean-images + +# Complete cleanup (everything) +make clean +``` + +## πŸ› Troubleshooting + +### Quick Diagnostics +```bash +# Run complete troubleshooting +make troubleshoot + +# Check specific components +make health +make debug +make status +``` + +### Common Issues + +**Services not starting:** +```bash +# Check port availability +make troubleshoot + +# View startup logs +make logs-master +make logs-admin +``` + +**No EC tasks being created:** +```bash +# Check volume status +make volume-status + +# Increase load to fill volumes faster +make scale-load RATE=30 + +# Check admin detection +make logs-admin +``` + +**Workers not responding:** +```bash +# Check worker registration +make admin-status + +# View worker logs +make logs-workers + +# Restart workers +make restart +``` + +### Performance Tuning + +**For faster testing:** +```bash +make dev-start # Higher default load +make scale-load RATE=50 # Very high load +``` + +**For stress testing:** +```bash +make scale-workers WORKERS=8 +make scale-load RATE=100 +``` + +## πŸ“š Technical Details + +### Network Architecture +- Custom bridge network (172.20.0.0/16) +- Service discovery via container names +- Health checks for all services + +### Storage Layout +- Each volume server: max 100 volumes +- Data centers: dc1, dc2 +- Racks: rack1, rack2, rack3 +- Volume limit: 50MB per volume + +### EC Algorithm +- Reed-Solomon RS(10,4) +- 10 data shards + 4 parity shards +- Rack-aware distribution +- Backup server redundancy + +### Make Integration +- Color-coded output for better readability +- Comprehensive help system (`make help`) +- Parallel execution support +- Error handling and cleanup +- Cross-platform compatibility + +## 🎯 Quick Reference + +```bash +# Essential commands +make help # Show all available targets +make start # Start complete environment +make health # Check all services +make monitor # Open dashboard +make logs-admin # View admin activity +make clean # Complete cleanup + +# Monitoring +make volume-status # Check for EC candidates +make admin-status # Check task queue +make monitor-status # Full cluster status + +# Scaling & Testing +make test-ec # Run focused EC test +make scale-load RATE=X # Increase load +make troubleshoot # Diagnose issues +``` + +This environment provides a realistic testing scenario for SeaweedFS EC workers with actual data operations, comprehensive monitoring, and easy management through Make targets. \ No newline at end of file diff --git a/docker/admin_integration/Makefile b/docker/admin_integration/Makefile new file mode 100644 index 000000000..658f6864b --- /dev/null +++ b/docker/admin_integration/Makefile @@ -0,0 +1,301 @@ +# SeaweedFS EC Worker Testing Environment Makefile +# Usage: make + +.PHONY: help start stop clean logs status monitor health up down restart scale docs test + +# Default target +.DEFAULT_GOAL := help + +# Docker compose file +COMPOSE_FILE := docker-compose-ec-test.yml + +# Color codes for output +GREEN := \033[32m +YELLOW := \033[33m +BLUE := \033[34m +RED := \033[31m +NC := \033[0m # No Color + +help: ## Show this help message + @echo "$(BLUE)πŸ§ͺ SeaweedFS EC Worker Testing Environment$(NC)" + @echo "$(BLUE)===========================================$(NC)" + @echo "" + @echo "$(YELLOW)Available targets:$(NC)" + @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf " $(GREEN)%-15s$(NC) %s\n", $$1, $$2}' $(MAKEFILE_LIST) + @echo "" + @echo "$(YELLOW)Quick start:$(NC) make start" + @echo "$(YELLOW)Monitor:$(NC) make monitor" + @echo "$(YELLOW)Cleanup:$(NC) make clean" + +start: ## Start the complete EC testing environment + @echo "$(GREEN)πŸš€ Starting SeaweedFS EC testing environment...$(NC)" + @echo "$(BLUE)This will start:$(NC)" + @echo " β€’ 1 Master server (port 9333)" + @echo " β€’ 6 Volume servers (ports 8080-8085) with 50MB volume limit" + @echo " β€’ 1 Filer (port 8888)" + @echo " β€’ 1 Admin server (port 9900)" + @echo " β€’ 3 EC Workers" + @echo " β€’ 1 Load generator (continuous read/write)" + @echo " β€’ 1 Monitor (port 9999)" + @echo "" + @mkdir -p monitor-data admin-config + @chmod +x *.sh 2>/dev/null || true + @docker-compose -f $(COMPOSE_FILE) down -v 2>/dev/null || true + @docker-compose -f $(COMPOSE_FILE) up --build -d + @echo "" + @echo "$(GREEN)βœ… Environment started successfully!$(NC)" + @echo "" + @$(MAKE) urls + @echo "" + @echo "$(YELLOW)⏳ Waiting for services to be ready...$(NC)" + @sleep 10 + @$(MAKE) health + +stop: ## Stop all services + @echo "$(YELLOW)πŸ›‘ Stopping all services...$(NC)" + @docker-compose -f $(COMPOSE_FILE) stop + @echo "$(GREEN)βœ… All services stopped$(NC)" + +down: ## Stop and remove all containers + @echo "$(YELLOW)πŸ›‘ Stopping and removing containers...$(NC)" + @docker-compose -f $(COMPOSE_FILE) down + @echo "$(GREEN)βœ… Containers stopped and removed$(NC)" + +clean: ## Stop and remove all containers, networks, volumes, and images + @echo "$(RED)🧹 Cleaning up entire environment...$(NC)" + @docker-compose -f $(COMPOSE_FILE) down -v --rmi all 2>/dev/null || true + @docker system prune -f + @echo "$(GREEN)βœ… Environment cleaned up$(NC)" + +restart: ## Restart all services + @echo "$(YELLOW)πŸ”„ Restarting all services...$(NC)" + @docker-compose -f $(COMPOSE_FILE) restart + @echo "$(GREEN)βœ… All services restarted$(NC)" + +up: start ## Alias for start + +status: ## Show status of all services + @echo "$(BLUE)πŸ“Š Service Status:$(NC)" + @docker-compose -f $(COMPOSE_FILE) ps + +logs: ## Show logs from all services + @echo "$(BLUE)πŸ“‹ Showing logs from all services (Ctrl+C to exit):$(NC)" + @docker-compose -f $(COMPOSE_FILE) logs -f + +logs-admin: ## Show admin server logs + @echo "$(BLUE)πŸ“‹ Admin Server Logs:$(NC)" + @docker-compose -f $(COMPOSE_FILE) logs -f admin + +logs-workers: ## Show all worker logs + @echo "$(BLUE)πŸ“‹ Worker Logs:$(NC)" + @docker-compose -f $(COMPOSE_FILE) logs -f worker1 worker2 worker3 + +logs-worker1: ## Show worker1 logs + @docker-compose -f $(COMPOSE_FILE) logs -f worker1 + +logs-worker2: ## Show worker2 logs + @docker-compose -f $(COMPOSE_FILE) logs -f worker2 + +logs-worker3: ## Show worker3 logs + @docker-compose -f $(COMPOSE_FILE) logs -f worker3 + +logs-load: ## Show load generator logs + @echo "$(BLUE)πŸ“‹ Load Generator Logs:$(NC)" + @docker-compose -f $(COMPOSE_FILE) logs -f load_generator + +logs-monitor: ## Show monitor logs + @echo "$(BLUE)πŸ“‹ Monitor Logs:$(NC)" + @docker-compose -f $(COMPOSE_FILE) logs -f monitor + +logs-master: ## Show master logs + @docker-compose -f $(COMPOSE_FILE) logs -f master + +logs-volumes: ## Show all volume server logs + @echo "$(BLUE)πŸ“‹ Volume Server Logs:$(NC)" + @docker-compose -f $(COMPOSE_FILE) logs -f volume1 volume2 volume3 volume4 volume5 volume6 + +urls: ## Show monitoring URLs + @echo "$(BLUE)πŸ“Š Monitoring URLs:$(NC)" + @echo " β€’ Master UI: http://localhost:9333" + @echo " β€’ Filer: http://localhost:8888" + @echo " β€’ Admin Server: http://localhost:9900/status" + @echo " β€’ Monitor: http://localhost:9999/status" + @echo "" + @echo "$(BLUE)πŸ“ˆ Volume Servers:$(NC)" + @echo " β€’ Volume1: http://localhost:8080/status" + @echo " β€’ Volume2: http://localhost:8081/status" + @echo " β€’ Volume3: http://localhost:8082/status" + @echo " β€’ Volume4: http://localhost:8083/status" + @echo " β€’ Volume5: http://localhost:8084/status" + @echo " β€’ Volume6: http://localhost:8085/status" + +health: ## Check health of all services + @echo "$(BLUE)πŸ” Checking service health...$(NC)" + @echo -n " Master: "; \ + if curl -s http://localhost:9333/cluster/status > /dev/null 2>&1; then \ + echo "$(GREEN)βœ… Healthy$(NC)"; \ + else \ + echo "$(RED)❌ Not responding$(NC)"; \ + fi + @echo -n " Filer: "; \ + if curl -s http://localhost:8888/ > /dev/null 2>&1; then \ + echo "$(GREEN)βœ… Healthy$(NC)"; \ + else \ + echo "$(RED)❌ Not responding$(NC)"; \ + fi + @echo -n " Admin: "; \ + if curl -s http://localhost:9900/health > /dev/null 2>&1; then \ + echo "$(GREEN)βœ… Healthy$(NC)"; \ + else \ + echo "$(RED)❌ Not responding$(NC)"; \ + fi + @echo -n " Monitor: "; \ + if curl -s http://localhost:9999/health > /dev/null 2>&1; then \ + echo "$(GREEN)βœ… Healthy$(NC)"; \ + else \ + echo "$(RED)❌ Not responding$(NC)"; \ + fi + +monitor: ## Open monitor dashboard in browser + @echo "$(BLUE)πŸ“Š Opening monitor dashboard...$(NC)" + @echo "Monitor URL: http://localhost:9999/status" + @command -v open >/dev/null 2>&1 && open http://localhost:9999/status || \ + command -v xdg-open >/dev/null 2>&1 && xdg-open http://localhost:9999/status || \ + echo "Please open http://localhost:9999/status in your browser" + +monitor-status: ## Show current monitoring status via API + @echo "$(BLUE)πŸ“Š Current Monitor Status:$(NC)" + @curl -s http://localhost:9999/status | jq . 2>/dev/null || \ + curl -s http://localhost:9999/status 2>/dev/null || \ + echo "Monitor not available" + +volume-status: ## Show volume status from master + @echo "$(BLUE)πŸ’Ύ Volume Status:$(NC)" + @curl -s http://localhost:9333/vol/status | jq . 2>/dev/null || \ + curl -s http://localhost:9333/vol/status 2>/dev/null || \ + echo "Master not available" + +admin-status: ## Show admin server status + @echo "$(BLUE)🏭 Admin Server Status:$(NC)" + @curl -s http://localhost:9900/status | jq . 2>/dev/null || \ + curl -s http://localhost:9900/status 2>/dev/null || \ + echo "Admin server not available" + +cluster-status: ## Show complete cluster status + @echo "$(BLUE)🌐 Cluster Status:$(NC)" + @curl -s http://localhost:9333/cluster/status | jq . 2>/dev/null || \ + curl -s http://localhost:9333/cluster/status 2>/dev/null || \ + echo "Master not available" + +scale-workers: ## Scale workers (usage: make scale-workers WORKERS=5) + @echo "$(YELLOW)βš–οΈ Scaling workers to $(or $(WORKERS),3)...$(NC)" + @docker-compose -f $(COMPOSE_FILE) up -d --scale worker2=$(or $(WORKERS),3) + +scale-load: ## Restart load generator with higher rate (usage: make scale-load RATE=20) + @echo "$(YELLOW)πŸ“ˆ Scaling load generation to $(or $(RATE),20) files/sec...$(NC)" + @docker-compose -f $(COMPOSE_FILE) stop load_generator + @docker-compose -f $(COMPOSE_FILE) run -d --name temp_load_generator \ + -e WRITE_RATE=$(or $(RATE),20) -e DELETE_RATE=$(or $(shell expr $(or $(RATE),20) / 4),5) \ + load_generator + @echo "$(GREEN)βœ… Load generator restarted with higher rate$(NC)" + +test-ec: ## Run a focused EC test scenario + @echo "$(YELLOW)πŸ§ͺ Running focused EC test...$(NC)" + @$(MAKE) scale-load RATE=25 + @echo "$(BLUE)Monitoring EC detection...$(NC)" + @echo "Watch for volumes >40MB that trigger EC conversion" + @echo "Monitor at: http://localhost:9999/status" + +shell-admin: ## Open shell in admin container + @docker-compose -f $(COMPOSE_FILE) exec admin /bin/sh + +shell-worker1: ## Open shell in worker1 container + @docker-compose -f $(COMPOSE_FILE) exec worker1 /bin/sh + +shell-master: ## Open shell in master container + @docker-compose -f $(COMPOSE_FILE) exec master /bin/sh + +docs: ## Show documentation + @echo "$(BLUE)πŸ“– EC Testing Documentation:$(NC)" + @echo "" + @cat EC-TESTING-README.md + +build: ## Build all Docker images without starting + @echo "$(YELLOW)πŸ”¨ Building all Docker images...$(NC)" + @docker-compose -f $(COMPOSE_FILE) build + @echo "$(GREEN)βœ… All images built$(NC)" + +pull: ## Pull latest SeaweedFS image + @echo "$(YELLOW)πŸ“₯ Pulling latest SeaweedFS image...$(NC)" + @docker pull chrislusf/seaweedfs:latest + @echo "$(GREEN)βœ… Latest image pulled$(NC)" + +debug: ## Show debug information + @echo "$(BLUE)πŸ” Debug Information:$(NC)" + @echo "" + @echo "$(YELLOW)Docker Compose Version:$(NC)" + @docker-compose --version + @echo "" + @echo "$(YELLOW)Docker Version:$(NC)" + @docker --version + @echo "" + @echo "$(YELLOW)Current Directory:$(NC)" + @pwd + @echo "" + @echo "$(YELLOW)Available Files:$(NC)" + @ls -la *.yml *.sh *.md 2>/dev/null || echo "No config files found" + @echo "" + @echo "$(YELLOW)Running Containers:$(NC)" + @docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}" + +# Targets for development and testing +dev-start: ## Start with development settings (faster iteration) + @echo "$(YELLOW)πŸ› οΈ Starting development environment...$(NC)" + @mkdir -p monitor-data admin-config + @WRITE_RATE=50 DELETE_RATE=10 docker-compose -f $(COMPOSE_FILE) up --build -d + @echo "$(GREEN)βœ… Development environment started with high load$(NC)" + +dev-stop: stop ## Stop development environment + +# Clean specific components +clean-volumes: ## Remove only data volumes + @echo "$(YELLOW)πŸ—„οΈ Removing data volumes...$(NC)" + @docker-compose -f $(COMPOSE_FILE) down -v + @echo "$(GREEN)βœ… Data volumes removed$(NC)" + +clean-images: ## Remove built images + @echo "$(YELLOW)πŸ–ΌοΈ Removing built images...$(NC)" + @docker-compose -f $(COMPOSE_FILE) down --rmi local + @echo "$(GREEN)βœ… Built images removed$(NC)" + +# Backup and restore +backup-logs: ## Backup all service logs + @echo "$(YELLOW)πŸ’Ύ Backing up service logs...$(NC)" + @mkdir -p logs-backup + @docker-compose -f $(COMPOSE_FILE) logs admin > logs-backup/admin.log 2>&1 + @docker-compose -f $(COMPOSE_FILE) logs worker1 > logs-backup/worker1.log 2>&1 + @docker-compose -f $(COMPOSE_FILE) logs worker2 > logs-backup/worker2.log 2>&1 + @docker-compose -f $(COMPOSE_FILE) logs worker3 > logs-backup/worker3.log 2>&1 + @docker-compose -f $(COMPOSE_FILE) logs monitor > logs-backup/monitor.log 2>&1 + @docker-compose -f $(COMPOSE_FILE) logs load_generator > logs-backup/load_generator.log 2>&1 + @echo "$(GREEN)βœ… Logs backed up to logs-backup/$(NC)" + +# Quick troubleshooting +troubleshoot: ## Run troubleshooting checks + @echo "$(BLUE)πŸ”§ Running troubleshooting checks...$(NC)" + @echo "" + @echo "$(YELLOW)1. Checking required ports:$(NC)" + @for port in 9333 8888 9900 9999 8080 8081 8082 8083 8084 8085; do \ + echo -n " Port $$port: "; \ + if lsof -i :$$port >/dev/null 2>&1; then \ + echo "$(RED)❌ In use$(NC)"; \ + else \ + echo "$(GREEN)βœ… Available$(NC)"; \ + fi; \ + done + @echo "" + @echo "$(YELLOW)2. Docker resources:$(NC)" + @docker system df + @echo "" + @echo "$(YELLOW)3. Service health:$(NC)" + @$(MAKE) health \ No newline at end of file diff --git a/docker/admin_integration/admin-entrypoint.sh b/docker/admin_integration/admin-entrypoint.sh new file mode 100755 index 000000000..bafda7d47 --- /dev/null +++ b/docker/admin_integration/admin-entrypoint.sh @@ -0,0 +1,153 @@ +#!/bin/sh + +set -e + +echo "Starting SeaweedFS Admin Server..." +echo "Master Address: $MASTER_ADDRESS" +echo "Admin Port: $ADMIN_PORT" +echo "Scan Interval: $SCAN_INTERVAL" + +# Wait for master to be ready +echo "Waiting for master to be ready..." +until curl -f http://$MASTER_ADDRESS/cluster/status > /dev/null 2>&1; do + echo "Master not ready, waiting..." + sleep 5 +done +echo "Master is ready!" + +# For now, use a simple HTTP server to simulate admin functionality +# In a real implementation, this would start the actual admin server +cat > /tmp/admin_server.go << 'EOF' +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "strconv" + "time" +) + +type AdminServer struct { + masterAddr string + port string + startTime time.Time + tasks []Task + workers []Worker +} + +type Task struct { + ID string `json:"id"` + Type string `json:"type"` + VolumeID int `json:"volume_id"` + Status string `json:"status"` + Progress float64 `json:"progress"` + Created time.Time `json:"created"` +} + +type Worker struct { + ID string `json:"id"` + Address string `json:"address"` + Capabilities []string `json:"capabilities"` + Status string `json:"status"` + LastSeen time.Time `json:"last_seen"` +} + +func (s *AdminServer) healthHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "healthy", + "uptime": time.Since(s.startTime).String(), + "tasks": len(s.tasks), + "workers": len(s.workers), + }) +} + +func (s *AdminServer) statusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "admin_server": "running", + "master_addr": s.masterAddr, + "tasks": s.tasks, + "workers": s.workers, + "uptime": time.Since(s.startTime).String(), + }) +} + +func (s *AdminServer) detectVolumesForEC() { + // Simulate volume detection logic + // In real implementation, this would query the master for volume status + ticker := time.NewTicker(30 * time.Second) + go func() { + for range ticker.C { + log.Println("Scanning for volumes requiring EC...") + + // Check master for volume status + resp, err := http.Get(fmt.Sprintf("http://%s/vol/status", s.masterAddr)) + if err != nil { + log.Printf("Error checking master: %v", err) + continue + } + resp.Body.Close() + + // Simulate detecting a volume that needs EC + if len(s.tasks) < 5 { // Don't create too many tasks + taskID := fmt.Sprintf("ec-task-%d", len(s.tasks)+1) + volumeID := 1000 + len(s.tasks) + + task := Task{ + ID: taskID, + Type: "erasure_coding", + VolumeID: volumeID, + Status: "pending", + Progress: 0.0, + Created: time.Now(), + } + + s.tasks = append(s.tasks, task) + log.Printf("Created EC task %s for volume %d", taskID, volumeID) + } + } + }() +} + +func main() { + masterAddr := os.Getenv("MASTER_ADDRESS") + if masterAddr == "" { + masterAddr = "master:9333" + } + + port := os.Getenv("ADMIN_PORT") + if port == "" { + port = "9900" + } + + server := &AdminServer{ + masterAddr: masterAddr, + port: port, + startTime: time.Now(), + tasks: make([]Task, 0), + workers: make([]Worker, 0), + } + + http.HandleFunc("/health", server.healthHandler) + http.HandleFunc("/status", server.statusHandler) + + // Start volume detection + server.detectVolumesForEC() + + log.Printf("Admin server starting on port %s", port) + log.Printf("Master address: %s", masterAddr) + + if err := http.ListenAndServe(":"+port, nil); err != nil { + log.Fatal("Server failed to start:", err) + } +} +EOF + +# Compile and run the admin server +cd /tmp +go mod init admin-server +go run admin_server.go \ No newline at end of file diff --git a/docker/admin_integration/docker-compose-ec-test.yml b/docker/admin_integration/docker-compose-ec-test.yml new file mode 100644 index 000000000..da1ab5603 --- /dev/null +++ b/docker/admin_integration/docker-compose-ec-test.yml @@ -0,0 +1,393 @@ +services: + # Master server - coordinates the cluster + master: + image: chrislusf/seaweedfs:latest + container_name: seaweed-master + ports: + - "9333:9333" + - "19333:19333" + command: > + master + -ip=master + -port=9333 + -volumeSizeLimitMB=50 + -defaultReplication=001 + volumes: + - master_data:/data + networks: + - seaweed_net + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://master:9333/cluster/status"] + interval: 10s + timeout: 5s + retries: 3 + + # Volume Server 1 + volume1: + image: chrislusf/seaweedfs:latest + container_name: seaweed-volume1 + ports: + - "8080:8080" + - "18080:18080" + command: > + volume + -mserver=master:9333 + -ip=volume1 + -port=8080 + -dir=/data + -max=100 + -dataCenter=dc1 + -rack=rack1 + volumes: + - volume1_data:/data + depends_on: + master: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/status"] + interval: 10s + timeout: 5s + retries: 3 + + # Volume Server 2 + volume2: + image: chrislusf/seaweedfs:latest + container_name: seaweed-volume2 + ports: + - "8081:8080" + - "18081:18080" + command: > + volume + -mserver=master:9333 + -ip=volume2 + -port=8080 + -dir=/data + -max=100 + -dataCenter=dc1 + -rack=rack1 + volumes: + - volume2_data:/data + depends_on: + master: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/status"] + interval: 10s + timeout: 5s + retries: 3 + + # Volume Server 3 + volume3: + image: chrislusf/seaweedfs:latest + container_name: seaweed-volume3 + ports: + - "8082:8080" + - "18082:18080" + command: > + volume + -mserver=master:9333 + -ip=volume3 + -port=8080 + -dir=/data + -max=100 + -dataCenter=dc1 + -rack=rack2 + volumes: + - volume3_data:/data + depends_on: + master: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/status"] + interval: 10s + timeout: 5s + retries: 3 + + # Volume Server 4 + volume4: + image: chrislusf/seaweedfs:latest + container_name: seaweed-volume4 + ports: + - "8083:8080" + - "18083:18080" + command: > + volume + -mserver=master:9333 + -ip=volume4 + -port=8080 + -dir=/data + -max=100 + -dataCenter=dc2 + -rack=rack1 + volumes: + - volume4_data:/data + depends_on: + master: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/status"] + interval: 10s + timeout: 5s + retries: 3 + + # Volume Server 5 + volume5: + image: chrislusf/seaweedfs:latest + container_name: seaweed-volume5 + ports: + - "8084:8080" + - "18084:18080" + command: > + volume + -mserver=master:9333 + -ip=volume5 + -port=8080 + -dir=/data + -max=100 + -dataCenter=dc2 + -rack=rack2 + volumes: + - volume5_data:/data + depends_on: + master: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/status"] + interval: 10s + timeout: 5s + retries: 3 + + # Volume Server 6 + volume6: + image: chrislusf/seaweedfs:latest + container_name: seaweed-volume6 + ports: + - "8085:8080" + - "18085:18080" + command: > + volume + -mserver=master:9333 + -ip=volume6 + -port=8080 + -dir=/data + -max=100 + -dataCenter=dc2 + -rack=rack3 + volumes: + - volume6_data:/data + depends_on: + master: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/status"] + interval: 10s + timeout: 5s + retries: 3 + + # Filer for easier data access + filer: + image: chrislusf/seaweedfs:latest + container_name: seaweed-filer + ports: + - "8888:8888" + - "18888:18888" + command: > + filer + -master=master:9333 + -ip=filer + -port=8888 + depends_on: + master: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8888/"] + interval: 10s + timeout: 5s + retries: 3 + + # Admin Server - manages EC tasks + admin: + build: + context: ../../ + dockerfile: docker/admin_integration/Dockerfile.admin + container_name: seaweed-admin + ports: + - "9900:9900" + environment: + - MASTER_ADDRESS=master:9333 + - ADMIN_PORT=9900 + - SCAN_INTERVAL=30s + - WORKER_TIMEOUT=5m + - TASK_TIMEOUT=30m + - MAX_RETRIES=3 + - MAX_CONCURRENT_TASKS=5 + volumes: + - admin_data:/data + - ./admin-config:/config + depends_on: + master: + condition: service_healthy + filer: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9900/health"] + interval: 15s + timeout: 5s + retries: 3 + + # EC Worker 1 + worker1: + build: + context: ../../ + dockerfile: docker/admin_integration/Dockerfile.worker + container_name: seaweed-worker1 + environment: + - ADMIN_ADDRESS=admin:9900 + - WORKER_ID=worker-1 + - WORKER_ADDRESS=worker1:9001 + - CAPABILITIES=erasure_coding + - MAX_CONCURRENT=2 + - WORK_DIR=/work + volumes: + - worker1_data:/work + depends_on: + admin: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9001/health"] + interval: 15s + timeout: 5s + retries: 3 + + # EC Worker 2 + worker2: + build: + context: ../../ + dockerfile: docker/admin_integration/Dockerfile.worker + container_name: seaweed-worker2 + environment: + - ADMIN_ADDRESS=admin:9900 + - WORKER_ID=worker-2 + - WORKER_ADDRESS=worker2:9001 + - CAPABILITIES=erasure_coding,vacuum + - MAX_CONCURRENT=2 + - WORK_DIR=/work + volumes: + - worker2_data:/work + depends_on: + admin: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9001/health"] + interval: 15s + timeout: 5s + retries: 3 + + # EC Worker 3 + worker3: + build: + context: ../../ + dockerfile: docker/admin_integration/Dockerfile.worker + container_name: seaweed-worker3 + environment: + - ADMIN_ADDRESS=admin:9900 + - WORKER_ID=worker-3 + - WORKER_ADDRESS=worker3:9001 + - CAPABILITIES=erasure_coding,vacuum + - MAX_CONCURRENT=1 + - WORK_DIR=/work + volumes: + - worker3_data:/work + depends_on: + admin: + condition: service_healthy + networks: + - seaweed_net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9001/health"] + interval: 15s + timeout: 5s + retries: 3 + + # Continuous Load Generator + load_generator: + build: + context: ../../ + dockerfile: docker/admin_integration/Dockerfile.load + container_name: seaweed-load + environment: + - FILER_ADDRESS=filer:8888 + - MASTER_ADDRESS=master:9333 + - WRITE_RATE=10 # files per second + - DELETE_RATE=2 # files per second + - FILE_SIZE_MIN=1MB + - FILE_SIZE_MAX=5MB + - TEST_DURATION=3600 # 1 hour + depends_on: + filer: + condition: service_healthy + admin: + condition: service_healthy + networks: + - seaweed_net + + # Monitoring and Health Check + monitor: + build: + context: ../../ + dockerfile: docker/admin_integration/Dockerfile.monitor + container_name: seaweed-monitor + ports: + - "9999:9999" + environment: + - MASTER_ADDRESS=master:9333 + - ADMIN_ADDRESS=admin:9900 + - FILER_ADDRESS=filer:8888 + - MONITOR_INTERVAL=10s + depends_on: + admin: + condition: service_healthy + networks: + - seaweed_net + volumes: + - ./monitor-data:/monitor-data + +volumes: + master_data: + volume1_data: + volume2_data: + volume3_data: + volume4_data: + volume5_data: + volume6_data: + admin_data: + worker1_data: + worker2_data: + worker3_data: + +networks: + seaweed_net: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 \ No newline at end of file diff --git a/docker/admin_integration/load-entrypoint.sh b/docker/admin_integration/load-entrypoint.sh new file mode 100755 index 000000000..64bf9c223 --- /dev/null +++ b/docker/admin_integration/load-entrypoint.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +set -e + +echo "Starting Load Generator..." +echo "Filer Address: $FILER_ADDRESS" +echo "Write Rate: $WRITE_RATE files/sec" +echo "Delete Rate: $DELETE_RATE files/sec" +echo "File Size Range: $FILE_SIZE_MIN - $FILE_SIZE_MAX" +echo "Test Duration: $TEST_DURATION seconds" + +# Wait for filer to be ready +echo "Waiting for filer to be ready..." +until curl -f http://$FILER_ADDRESS/ > /dev/null 2>&1; do + echo "Filer not ready, waiting..." + sleep 5 +done +echo "Filer is ready!" + +# Start the load generator +exec ./load-generator \ No newline at end of file diff --git a/docker/admin_integration/load-generator.go b/docker/admin_integration/load-generator.go new file mode 100644 index 000000000..c52939459 --- /dev/null +++ b/docker/admin_integration/load-generator.go @@ -0,0 +1,352 @@ +package main + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "log" + "net/http" + "os" + "strconv" + "strings" + "sync" + "time" +) + +type LoadGenerator struct { + filerAddr string + masterAddr string + writeRate int + deleteRate int + fileSizeMin int64 + fileSizeMax int64 + testDuration int + collection string + + // State tracking + createdFiles []string + mutex sync.RWMutex + stats LoadStats +} + +type LoadStats struct { + FilesWritten int64 + FilesDeleted int64 + BytesWritten int64 + Errors int64 + StartTime time.Time + LastOperation time.Time +} + +// parseSize converts size strings like "1MB", "5MB" to bytes +func parseSize(sizeStr string) int64 { + sizeStr = strings.ToUpper(strings.TrimSpace(sizeStr)) + + var multiplier int64 = 1 + if strings.HasSuffix(sizeStr, "KB") { + multiplier = 1024 + sizeStr = strings.TrimSuffix(sizeStr, "KB") + } else if strings.HasSuffix(sizeStr, "MB") { + multiplier = 1024 * 1024 + sizeStr = strings.TrimSuffix(sizeStr, "MB") + } else if strings.HasSuffix(sizeStr, "GB") { + multiplier = 1024 * 1024 * 1024 + sizeStr = strings.TrimSuffix(sizeStr, "GB") + } + + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return 1024 * 1024 // Default to 1MB + } + + return size * multiplier +} + +// generateRandomData creates random data of specified size +func (lg *LoadGenerator) generateRandomData(size int64) []byte { + data := make([]byte, size) + _, err := rand.Read(data) + if err != nil { + // Fallback to deterministic data + for i := range data { + data[i] = byte(i % 256) + } + } + return data +} + +// uploadFile uploads a file to SeaweedFS via filer +func (lg *LoadGenerator) uploadFile(filename string, data []byte) error { + url := fmt.Sprintf("http://%s/%s", lg.filerAddr, filename) + if lg.collection != "" { + url = fmt.Sprintf("http://%s/%s/%s", lg.filerAddr, lg.collection, filename) + } + + req, err := http.NewRequest("POST", url, bytes.NewReader(data)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/octet-stream") + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return fmt.Errorf("upload failed with status: %d", resp.StatusCode) + } + + return nil +} + +// deleteFile deletes a file from SeaweedFS via filer +func (lg *LoadGenerator) deleteFile(filename string) error { + url := fmt.Sprintf("http://%s/%s", lg.filerAddr, filename) + if lg.collection != "" { + url = fmt.Sprintf("http://%s/%s/%s", lg.filerAddr, lg.collection, filename) + } + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + return err + } + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound { + return fmt.Errorf("delete failed with status: %d", resp.StatusCode) + } + + return nil +} + +// writeFiles continuously writes files at the specified rate +func (lg *LoadGenerator) writeFiles() { + writeInterval := time.Second / time.Duration(lg.writeRate) + ticker := time.NewTicker(writeInterval) + defer ticker.Stop() + + fileCounter := 0 + + for range ticker.C { + fileCounter++ + + // Random file size between min and max + sizeDiff := lg.fileSizeMax - lg.fileSizeMin + randomSize := lg.fileSizeMin + if sizeDiff > 0 { + randomSize += int64(time.Now().UnixNano()) % sizeDiff + } + + // Generate filename + filename := fmt.Sprintf("test-data/file-%d-%d.bin", time.Now().Unix(), fileCounter) + + // Generate random data + data := lg.generateRandomData(randomSize) + + // Upload file + err := lg.uploadFile(filename, data) + if err != nil { + log.Printf("Error uploading file %s: %v", filename, err) + lg.stats.Errors++ + } else { + lg.mutex.Lock() + lg.createdFiles = append(lg.createdFiles, filename) + lg.stats.FilesWritten++ + lg.stats.BytesWritten += randomSize + lg.stats.LastOperation = time.Now() + lg.mutex.Unlock() + + log.Printf("Uploaded file: %s (size: %d bytes, total files: %d)", + filename, randomSize, lg.stats.FilesWritten) + } + } +} + +// deleteFiles continuously deletes files at the specified rate +func (lg *LoadGenerator) deleteFiles() { + deleteInterval := time.Second / time.Duration(lg.deleteRate) + ticker := time.NewTicker(deleteInterval) + defer ticker.Stop() + + for range ticker.C { + lg.mutex.Lock() + if len(lg.createdFiles) == 0 { + lg.mutex.Unlock() + continue + } + + // Pick a random file to delete + index := int(time.Now().UnixNano()) % len(lg.createdFiles) + filename := lg.createdFiles[index] + + // Remove from slice + lg.createdFiles = append(lg.createdFiles[:index], lg.createdFiles[index+1:]...) + lg.mutex.Unlock() + + // Delete file + err := lg.deleteFile(filename) + if err != nil { + log.Printf("Error deleting file %s: %v", filename, err) + lg.stats.Errors++ + } else { + lg.stats.FilesDeleted++ + lg.stats.LastOperation = time.Now() + log.Printf("Deleted file: %s (remaining files: %d)", filename, len(lg.createdFiles)) + } + } +} + +// printStats periodically prints load generation statistics +func (lg *LoadGenerator) printStats() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + uptime := time.Since(lg.stats.StartTime) + writeRate := float64(lg.stats.FilesWritten) / uptime.Seconds() + deleteRate := float64(lg.stats.FilesDeleted) / uptime.Seconds() + + lg.mutex.RLock() + pendingFiles := len(lg.createdFiles) + lg.mutex.RUnlock() + + log.Printf("STATS: Files written=%d, deleted=%d, pending=%d, errors=%d", + lg.stats.FilesWritten, lg.stats.FilesDeleted, pendingFiles, lg.stats.Errors) + log.Printf("RATES: Write=%.2f/sec, Delete=%.2f/sec, Data=%.2f MB written", + writeRate, deleteRate, float64(lg.stats.BytesWritten)/(1024*1024)) + } +} + +// checkClusterHealth periodically checks cluster status +func (lg *LoadGenerator) checkClusterHealth() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + // Check master status + resp, err := http.Get(fmt.Sprintf("http://%s/cluster/status", lg.masterAddr)) + if err != nil { + log.Printf("WARNING: Cannot reach master: %v", err) + continue + } + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + + if err != nil { + log.Printf("WARNING: Cannot read master response: %v", err) + continue + } + + if resp.StatusCode == http.StatusOK { + log.Printf("Cluster health check: OK (response size: %d bytes)", len(body)) + } else { + log.Printf("WARNING: Cluster health check failed with status: %d", resp.StatusCode) + } + } +} + +func main() { + filerAddr := os.Getenv("FILER_ADDRESS") + if filerAddr == "" { + filerAddr = "filer:8888" + } + + masterAddr := os.Getenv("MASTER_ADDRESS") + if masterAddr == "" { + masterAddr = "master:9333" + } + + writeRate, _ := strconv.Atoi(os.Getenv("WRITE_RATE")) + if writeRate <= 0 { + writeRate = 10 + } + + deleteRate, _ := strconv.Atoi(os.Getenv("DELETE_RATE")) + if deleteRate <= 0 { + deleteRate = 2 + } + + fileSizeMin := parseSize(os.Getenv("FILE_SIZE_MIN")) + if fileSizeMin <= 0 { + fileSizeMin = 1024 * 1024 // 1MB + } + + fileSizeMax := parseSize(os.Getenv("FILE_SIZE_MAX")) + if fileSizeMax <= fileSizeMin { + fileSizeMax = 5 * 1024 * 1024 // 5MB + } + + testDuration, _ := strconv.Atoi(os.Getenv("TEST_DURATION")) + if testDuration <= 0 { + testDuration = 3600 // 1 hour + } + + collection := os.Getenv("COLLECTION") + + lg := &LoadGenerator{ + filerAddr: filerAddr, + masterAddr: masterAddr, + writeRate: writeRate, + deleteRate: deleteRate, + fileSizeMin: fileSizeMin, + fileSizeMax: fileSizeMax, + testDuration: testDuration, + collection: collection, + createdFiles: make([]string, 0), + stats: LoadStats{ + StartTime: time.Now(), + }, + } + + log.Printf("Starting load generator...") + log.Printf("Filer: %s", filerAddr) + log.Printf("Master: %s", masterAddr) + log.Printf("Write rate: %d files/sec", writeRate) + log.Printf("Delete rate: %d files/sec", deleteRate) + log.Printf("File size: %d - %d bytes", fileSizeMin, fileSizeMax) + log.Printf("Test duration: %d seconds", testDuration) + log.Printf("Collection: '%s'", collection) + + // Wait for filer to be ready + log.Println("Waiting for filer to be ready...") + for { + resp, err := http.Get(fmt.Sprintf("http://%s/", filerAddr)) + if err == nil && resp.StatusCode == http.StatusOK { + resp.Body.Close() + break + } + if resp != nil { + resp.Body.Close() + } + log.Println("Filer not ready, waiting...") + time.Sleep(5 * time.Second) + } + log.Println("Filer is ready!") + + // Start background goroutines + go lg.writeFiles() + go lg.deleteFiles() + go lg.printStats() + go lg.checkClusterHealth() + + // Run for specified duration + log.Printf("Load test will run for %d seconds...", testDuration) + time.Sleep(time.Duration(testDuration) * time.Second) + + log.Println("Load test completed!") + log.Printf("Final stats: Files written=%d, deleted=%d, errors=%d, total data=%.2f MB", + lg.stats.FilesWritten, lg.stats.FilesDeleted, lg.stats.Errors, + float64(lg.stats.BytesWritten)/(1024*1024)) +} diff --git a/docker/admin_integration/monitor-entrypoint.sh b/docker/admin_integration/monitor-entrypoint.sh new file mode 100755 index 000000000..fbc31fb0a --- /dev/null +++ b/docker/admin_integration/monitor-entrypoint.sh @@ -0,0 +1,38 @@ +#!/bin/sh + +set -e + +echo "Starting Cluster Monitor..." +echo "Master Address: $MASTER_ADDRESS" +echo "Admin Address: $ADMIN_ADDRESS" +echo "Filer Address: $FILER_ADDRESS" +echo "Monitor Interval: $MONITOR_INTERVAL" + +# Wait for core services to be ready +echo "Waiting for core services to be ready..." + +echo "Waiting for master..." +until curl -f http://$MASTER_ADDRESS/cluster/status > /dev/null 2>&1; do + echo "Master not ready, waiting..." + sleep 5 +done +echo "Master is ready!" + +echo "Waiting for admin..." +until curl -f http://$ADMIN_ADDRESS/health > /dev/null 2>&1; do + echo "Admin not ready, waiting..." + sleep 5 +done +echo "Admin is ready!" + +echo "Waiting for filer..." +until curl -f http://$FILER_ADDRESS/ > /dev/null 2>&1; do + echo "Filer not ready, waiting..." + sleep 5 +done +echo "Filer is ready!" + +echo "All services ready! Starting monitor..." + +# Start the monitor +exec ./monitor \ No newline at end of file diff --git a/docker/admin_integration/monitor.go b/docker/admin_integration/monitor.go new file mode 100644 index 000000000..2afeab468 --- /dev/null +++ b/docker/admin_integration/monitor.go @@ -0,0 +1,366 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "time" +) + +type Monitor struct { + masterAddr string + adminAddr string + filerAddr string + interval time.Duration + startTime time.Time + stats MonitorStats +} + +type MonitorStats struct { + TotalChecks int64 + MasterHealthy int64 + AdminHealthy int64 + FilerHealthy int64 + VolumeCount int64 + LastVolumeCheck time.Time + ECTasksDetected int64 + WorkersActive int64 + LastWorkerCheck time.Time +} + +type ClusterStatus struct { + IsLeader bool `json:"IsLeader"` + Leader string `json:"Leader"` + Peers []string `json:"Peers"` +} + +type VolumeStatus struct { + Volumes []VolumeInfo `json:"Volumes"` +} + +type VolumeInfo struct { + Id uint32 `json:"Id"` + Size uint64 `json:"Size"` + Collection string `json:"Collection"` + FileCount int64 `json:"FileCount"` + DeleteCount int64 `json:"DeleteCount"` + DeletedByteCount uint64 `json:"DeletedByteCount"` + ReadOnly bool `json:"ReadOnly"` + CompactRevision uint32 `json:"CompactRevision"` + Version uint32 `json:"Version"` +} + +type AdminStatus struct { + Status string `json:"status"` + Uptime string `json:"uptime"` + Tasks int `json:"tasks"` + Workers int `json:"workers"` +} + +// checkMasterHealth checks the master server health +func (m *Monitor) checkMasterHealth() bool { + resp, err := http.Get(fmt.Sprintf("http://%s/cluster/status", m.masterAddr)) + if err != nil { + log.Printf("ERROR: Cannot reach master %s: %v", m.masterAddr, err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("ERROR: Master returned status %d", resp.StatusCode) + return false + } + + var status ClusterStatus + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("ERROR: Cannot read master response: %v", err) + return false + } + + err = json.Unmarshal(body, &status) + if err != nil { + log.Printf("WARNING: Cannot parse master status: %v", err) + // Still consider it healthy if we got a response + return true + } + + log.Printf("Master status: Leader=%s, IsLeader=%t, Peers=%d", + status.Leader, status.IsLeader, len(status.Peers)) + + m.stats.MasterHealthy++ + return true +} + +// checkAdminHealth checks the admin server health +func (m *Monitor) checkAdminHealth() bool { + resp, err := http.Get(fmt.Sprintf("http://%s/health", m.adminAddr)) + if err != nil { + log.Printf("ERROR: Cannot reach admin %s: %v", m.adminAddr, err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("ERROR: Admin returned status %d", resp.StatusCode) + return false + } + + var status AdminStatus + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("ERROR: Cannot read admin response: %v", err) + return false + } + + err = json.Unmarshal(body, &status) + if err != nil { + log.Printf("WARNING: Cannot parse admin status: %v", err) + return true + } + + log.Printf("Admin status: %s, Uptime=%s, Tasks=%d, Workers=%d", + status.Status, status.Uptime, status.Tasks, status.Workers) + + m.stats.AdminHealthy++ + m.stats.ECTasksDetected += int64(status.Tasks) + m.stats.WorkersActive = int64(status.Workers) + m.stats.LastWorkerCheck = time.Now() + + return true +} + +// checkFilerHealth checks the filer health +func (m *Monitor) checkFilerHealth() bool { + resp, err := http.Get(fmt.Sprintf("http://%s/", m.filerAddr)) + if err != nil { + log.Printf("ERROR: Cannot reach filer %s: %v", m.filerAddr, err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("ERROR: Filer returned status %d", resp.StatusCode) + return false + } + + m.stats.FilerHealthy++ + return true +} + +// checkVolumeStatus checks volume information from master +func (m *Monitor) checkVolumeStatus() { + resp, err := http.Get(fmt.Sprintf("http://%s/vol/status", m.masterAddr)) + if err != nil { + log.Printf("ERROR: Cannot get volume status: %v", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("ERROR: Volume status returned status %d", resp.StatusCode) + return + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("ERROR: Cannot read volume status: %v", err) + return + } + + var volumeStatus VolumeStatus + err = json.Unmarshal(body, &volumeStatus) + if err != nil { + log.Printf("WARNING: Cannot parse volume status: %v", err) + return + } + + m.stats.VolumeCount = int64(len(volumeStatus.Volumes)) + m.stats.LastVolumeCheck = time.Now() + + // Analyze volumes + var readOnlyCount, fullVolumeCount, ecCandidates int + var totalSize, totalFiles uint64 + + for _, vol := range volumeStatus.Volumes { + totalSize += vol.Size + totalFiles += uint64(vol.FileCount) + + if vol.ReadOnly { + readOnlyCount++ + } + + // Volume is close to full (>40MB for 50MB limit) + if vol.Size > 40*1024*1024 { + fullVolumeCount++ + if !vol.ReadOnly { + ecCandidates++ + } + } + } + + log.Printf("Volume analysis: Total=%d, ReadOnly=%d, Full=%d, EC_Candidates=%d", + len(volumeStatus.Volumes), readOnlyCount, fullVolumeCount, ecCandidates) + log.Printf("Storage stats: Total_Size=%.2fMB, Total_Files=%d", + float64(totalSize)/(1024*1024), totalFiles) + + if ecCandidates > 0 { + log.Printf("⚠️ DETECTED %d volumes that should be EC'd!", ecCandidates) + } +} + +// healthHandler provides a health endpoint for the monitor itself +func (m *Monitor) healthHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "healthy", + "uptime": time.Since(m.startTime).String(), + "checks": m.stats.TotalChecks, + "last_check": m.stats.LastVolumeCheck.Format(time.RFC3339), + }) +} + +// statusHandler provides detailed monitoring status +func (m *Monitor) statusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "monitor": map[string]interface{}{ + "uptime": time.Since(m.startTime).String(), + "master_addr": m.masterAddr, + "admin_addr": m.adminAddr, + "filer_addr": m.filerAddr, + "interval": m.interval.String(), + }, + "stats": m.stats, + "health": map[string]interface{}{ + "master_healthy": m.stats.MasterHealthy > 0 && time.Since(m.stats.LastVolumeCheck) < 2*m.interval, + "admin_healthy": m.stats.AdminHealthy > 0 && time.Since(m.stats.LastWorkerCheck) < 2*m.interval, + "filer_healthy": m.stats.FilerHealthy > 0, + }, + }) +} + +// runMonitoring runs the main monitoring loop +func (m *Monitor) runMonitoring() { + ticker := time.NewTicker(m.interval) + defer ticker.Stop() + + log.Printf("Starting monitoring loop every %v", m.interval) + + for { + m.stats.TotalChecks++ + + log.Printf("=== Monitoring Check #%d ===", m.stats.TotalChecks) + + // Check master health + if m.checkMasterHealth() { + // If master is healthy, check volumes + m.checkVolumeStatus() + } + + // Check admin health + m.checkAdminHealth() + + // Check filer health + m.checkFilerHealth() + + // Print summary + log.Printf("Health Summary: Master=%t, Admin=%t, Filer=%t, Volumes=%d, Workers=%d", + m.stats.MasterHealthy > 0, + m.stats.AdminHealthy > 0, + m.stats.FilerHealthy > 0, + m.stats.VolumeCount, + m.stats.WorkersActive) + + log.Printf("=== End Check #%d ===", m.stats.TotalChecks) + + <-ticker.C + } +} + +func main() { + masterAddr := os.Getenv("MASTER_ADDRESS") + if masterAddr == "" { + masterAddr = "master:9333" + } + + adminAddr := os.Getenv("ADMIN_ADDRESS") + if adminAddr == "" { + adminAddr = "admin:9900" + } + + filerAddr := os.Getenv("FILER_ADDRESS") + if filerAddr == "" { + filerAddr = "filer:8888" + } + + intervalStr := os.Getenv("MONITOR_INTERVAL") + interval, err := time.ParseDuration(intervalStr) + if err != nil { + interval = 10 * time.Second + } + + monitor := &Monitor{ + masterAddr: masterAddr, + adminAddr: adminAddr, + filerAddr: filerAddr, + interval: interval, + startTime: time.Now(), + stats: MonitorStats{}, + } + + log.Printf("Starting SeaweedFS Cluster Monitor") + log.Printf("Master: %s", masterAddr) + log.Printf("Admin: %s", adminAddr) + log.Printf("Filer: %s", filerAddr) + log.Printf("Interval: %v", interval) + + // Setup HTTP endpoints + http.HandleFunc("/health", monitor.healthHandler) + http.HandleFunc("/status", monitor.statusHandler) + + // Start HTTP server in background + go func() { + log.Println("Monitor HTTP server starting on :9999") + if err := http.ListenAndServe(":9999", nil); err != nil { + log.Printf("Monitor HTTP server error: %v", err) + } + }() + + // Wait for services to be ready + log.Println("Waiting for services to be ready...") + for { + masterOK := false + adminOK := false + filerOK := false + + if resp, err := http.Get(fmt.Sprintf("http://%s/cluster/status", masterAddr)); err == nil && resp.StatusCode == http.StatusOK { + masterOK = true + resp.Body.Close() + } + + if resp, err := http.Get(fmt.Sprintf("http://%s/health", adminAddr)); err == nil && resp.StatusCode == http.StatusOK { + adminOK = true + resp.Body.Close() + } + + if resp, err := http.Get(fmt.Sprintf("http://%s/", filerAddr)); err == nil && resp.StatusCode == http.StatusOK { + filerOK = true + resp.Body.Close() + } + + if masterOK && adminOK && filerOK { + log.Println("All services are ready!") + break + } + + log.Printf("Services ready: Master=%t, Admin=%t, Filer=%t", masterOK, adminOK, filerOK) + time.Sleep(5 * time.Second) + } + + // Start monitoring + monitor.runMonitoring() +} diff --git a/docker/admin_integration/run-ec-test.sh b/docker/admin_integration/run-ec-test.sh new file mode 100755 index 000000000..f57b5b52e --- /dev/null +++ b/docker/admin_integration/run-ec-test.sh @@ -0,0 +1,106 @@ +#!/bin/bash + +set -e + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +echo -e "${BLUE}πŸ§ͺ SeaweedFS EC Worker Testing Environment${NC}" +echo -e "${BLUE}===========================================${NC}" + +# Check if docker-compose is available +if ! command -v docker-compose &> /dev/null; then + echo -e "${RED}❌ docker-compose is required but not installed${NC}" + exit 1 +fi + +# Create necessary directories +echo -e "${YELLOW}πŸ“ Creating required directories...${NC}" +mkdir -p monitor-data admin-config + +# Make scripts executable +echo -e "${YELLOW}πŸ”§ Making scripts executable...${NC}" +chmod +x *.sh + +# Stop any existing containers +echo -e "${YELLOW}πŸ›‘ Stopping any existing containers...${NC}" +docker-compose -f docker-compose-ec-test.yml down -v 2>/dev/null || true + +# Build and start the environment +echo -e "${GREEN}πŸš€ Starting SeaweedFS EC testing environment...${NC}" +echo -e "${BLUE}This will start:${NC}" +echo -e " β€’ 1 Master server (port 9333)" +echo -e " β€’ 6 Volume servers (ports 8080-8085) with 50MB volume limit" +echo -e " β€’ 1 Filer (port 8888)" +echo -e " β€’ 1 Admin server (port 9900)" +echo -e " β€’ 3 EC Workers" +echo -e " β€’ 1 Load generator (continuous read/write)" +echo -e " β€’ 1 Monitor (port 9999)" +echo "" + +docker-compose -f docker-compose-ec-test.yml up --build -d + +echo -e "${GREEN}βœ… Environment started successfully!${NC}" +echo "" +echo -e "${BLUE}πŸ“Š Monitoring URLs:${NC}" +echo -e " β€’ Master UI: http://localhost:9333" +echo -e " β€’ Filer: http://localhost:8888" +echo -e " β€’ Admin Server: http://localhost:9900/status" +echo -e " β€’ Monitor: http://localhost:9999/status" +echo "" +echo -e "${BLUE}πŸ“ˆ Volume Servers:${NC}" +echo -e " β€’ Volume1: http://localhost:8080/status" +echo -e " β€’ Volume2: http://localhost:8081/status" +echo -e " β€’ Volume3: http://localhost:8082/status" +echo -e " β€’ Volume4: http://localhost:8083/status" +echo -e " β€’ Volume5: http://localhost:8084/status" +echo -e " β€’ Volume6: http://localhost:8085/status" +echo "" + +echo -e "${YELLOW}⏳ Waiting for services to be ready...${NC}" +sleep 10 + +# Check service health +echo -e "${BLUE}πŸ” Checking service health...${NC}" + +check_service() { + local name=$1 + local url=$2 + + if curl -s "$url" > /dev/null 2>&1; then + echo -e " βœ… $name: ${GREEN}Healthy${NC}" + return 0 + else + echo -e " ❌ $name: ${RED}Not responding${NC}" + return 1 + fi +} + +check_service "Master" "http://localhost:9333/cluster/status" +check_service "Filer" "http://localhost:8888/" +check_service "Admin" "http://localhost:9900/health" +check_service "Monitor" "http://localhost:9999/health" + +echo "" +echo -e "${GREEN}🎯 Test Environment is Ready!${NC}" +echo "" +echo -e "${BLUE}What's happening:${NC}" +echo -e " 1. πŸ“ Load generator continuously writes 1-5MB files at 10 files/sec" +echo -e " 2. πŸ—‘οΈ Load generator deletes files at 2 files/sec" +echo -e " 3. πŸ“Š Volumes fill up to 50MB limit and trigger EC conversion" +echo -e " 4. 🏭 Admin server detects volumes needing EC and assigns to workers" +echo -e " 5. ⚑ Workers perform comprehensive EC (copyβ†’encodeβ†’distribute)" +echo -e " 6. πŸ“ˆ Monitor tracks all activity and volume states" +echo "" +echo -e "${YELLOW}πŸ“‹ Useful Commands:${NC}" +echo -e " β€’ View logs: docker-compose -f docker-compose-ec-test.yml logs -f [service]" +echo -e " β€’ Check worker status: docker-compose -f docker-compose-ec-test.yml logs worker1" +echo -e " β€’ Stop environment: docker-compose -f docker-compose-ec-test.yml down -v" +echo -e " β€’ Monitor logs: docker-compose -f docker-compose-ec-test.yml logs -f monitor" +echo "" +echo -e "${GREEN}πŸ”₯ The test will run for 1 hour by default${NC}" +echo -e "${BLUE}Monitor progress at: http://localhost:9999/status${NC}" \ No newline at end of file diff --git a/docker/admin_integration/worker-entrypoint.sh b/docker/admin_integration/worker-entrypoint.sh new file mode 100755 index 000000000..8cafac60a --- /dev/null +++ b/docker/admin_integration/worker-entrypoint.sh @@ -0,0 +1,230 @@ +#!/bin/sh + +set -e + +echo "Starting SeaweedFS EC Worker..." +echo "Worker ID: $WORKER_ID" +echo "Admin Address: $ADMIN_ADDRESS" +echo "Capabilities: $CAPABILITIES" + +# Wait for admin server to be ready +echo "Waiting for admin server to be ready..." +until curl -f http://$ADMIN_ADDRESS/health > /dev/null 2>&1; do + echo "Admin server not ready, waiting..." + sleep 5 +done +echo "Admin server is ready!" + +# Create worker simulation +cat > /tmp/worker.go << 'EOF' +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "strings" + "time" +) + +type Worker struct { + id string + adminAddr string + address string + capabilities []string + maxConcurrent int + workDir string + startTime time.Time + activeTasks map[string]*Task +} + +type Task struct { + ID string `json:"id"` + Type string `json:"type"` + VolumeID int `json:"volume_id"` + Status string `json:"status"` + Progress float64 `json:"progress"` + Started time.Time `json:"started"` +} + +func (w *Worker) healthHandler(res http.ResponseWriter, req *http.Request) { + res.Header().Set("Content-Type", "application/json") + json.NewEncoder(res).Encode(map[string]interface{}{ + "status": "healthy", + "worker_id": w.id, + "uptime": time.Since(w.startTime).String(), + "active_tasks": len(w.activeTasks), + "capabilities": w.capabilities, + }) +} + +func (w *Worker) statusHandler(res http.ResponseWriter, req *http.Request) { + res.Header().Set("Content-Type", "application/json") + json.NewEncoder(res).Encode(map[string]interface{}{ + "worker_id": w.id, + "admin_addr": w.adminAddr, + "capabilities": w.capabilities, + "max_concurrent": w.maxConcurrent, + "active_tasks": w.activeTasks, + "uptime": time.Since(w.startTime).String(), + }) +} + +func (w *Worker) simulateECTask(taskID string, volumeID int) { + log.Printf("Starting EC task %s for volume %d", taskID, volumeID) + + task := &Task{ + ID: taskID, + Type: "erasure_coding", + VolumeID: volumeID, + Status: "running", + Progress: 0.0, + Started: time.Now(), + } + + w.activeTasks[taskID] = task + + // Simulate EC process phases + phases := []struct { + progress float64 + phase string + duration time.Duration + }{ + {5.0, "Copying volume data locally", 10 * time.Second}, + {25.0, "Marking volume read-only", 2 * time.Second}, + {60.0, "Performing local EC encoding", 30 * time.Second}, + {70.0, "Calculating optimal shard placement", 5 * time.Second}, + {90.0, "Distributing shards to servers", 20 * time.Second}, + {100.0, "Verification and cleanup", 3 * time.Second}, + } + + go func() { + for _, phase := range phases { + if task.Status != "running" { + break + } + + time.Sleep(phase.duration) + task.Progress = phase.progress + log.Printf("Task %s: %.1f%% - %s", taskID, phase.progress, phase.phase) + } + + if task.Status == "running" { + task.Status = "completed" + task.Progress = 100.0 + log.Printf("Task %s completed successfully", taskID) + } + + // Remove from active tasks after completion + time.Sleep(5 * time.Second) + delete(w.activeTasks, taskID) + }() +} + +func (w *Worker) registerWithAdmin() { + ticker := time.NewTicker(30 * time.Second) + go func() { + for { + // Register/heartbeat with admin server + log.Printf("Sending heartbeat to admin server...") + + data := map[string]interface{}{ + "worker_id": w.id, + "address": w.address, + "capabilities": w.capabilities, + "max_concurrent": w.maxConcurrent, + "active_tasks": len(w.activeTasks), + "status": "active", + } + + jsonData, _ := json.Marshal(data) + + // In real implementation, this would be a proper gRPC call + resp, err := http.Post( + fmt.Sprintf("http://%s/register-worker", w.adminAddr), + "application/json", + strings.NewReader(string(jsonData)), + ) + if err != nil { + log.Printf("Failed to register with admin: %v", err) + } else { + resp.Body.Close() + log.Printf("Successfully sent heartbeat to admin") + } + + // Simulate requesting new tasks + if len(w.activeTasks) < w.maxConcurrent { + // In real implementation, worker would request tasks from admin + // For simulation, we'll create some tasks periodically + if len(w.activeTasks) == 0 && time.Since(w.startTime) > 1*time.Minute { + taskID := fmt.Sprintf("%s-task-%d", w.id, time.Now().Unix()) + volumeID := 2000 + int(time.Now().Unix()%1000) + w.simulateECTask(taskID, volumeID) + } + } + + <-ticker.C + } + }() +} + +func main() { + workerID := os.Getenv("WORKER_ID") + if workerID == "" { + workerID = "worker-1" + } + + adminAddr := os.Getenv("ADMIN_ADDRESS") + if adminAddr == "" { + adminAddr = "admin:9900" + } + + address := os.Getenv("WORKER_ADDRESS") + if address == "" { + address = "worker:9001" + } + + capabilities := strings.Split(os.Getenv("CAPABILITIES"), ",") + if len(capabilities) == 0 || capabilities[0] == "" { + capabilities = []string{"erasure_coding"} + } + + worker := &Worker{ + id: workerID, + adminAddr: adminAddr, + address: address, + capabilities: capabilities, + maxConcurrent: 2, + workDir: "/work", + startTime: time.Now(), + activeTasks: make(map[string]*Task), + } + + http.HandleFunc("/health", worker.healthHandler) + http.HandleFunc("/status", worker.statusHandler) + + // Start registration and heartbeat + worker.registerWithAdmin() + + log.Printf("Worker %s starting on address %s", workerID, address) + log.Printf("Admin address: %s", adminAddr) + log.Printf("Capabilities: %v", capabilities) + + port := ":9001" + if strings.Contains(address, ":") { + parts := strings.Split(address, ":") + port = ":" + parts[1] + } + + if err := http.ListenAndServe(port, nil); err != nil { + log.Fatal("Worker failed to start:", err) + } +} +EOF + +# Compile and run the worker +cd /tmp +go mod init worker +go run worker.go \ No newline at end of file