diff --git a/weed/command/command.go b/weed/command/command.go index e4695a199..b2cef4540 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -32,6 +32,7 @@ var Commands = []*Command{ cmdIam, cmdMaster, cmdMasterFollower, + cmdMini, cmdMount, cmdMqAgent, cmdMqBroker, diff --git a/weed/command/mini.go b/weed/command/mini.go new file mode 100644 index 000000000..264370069 --- /dev/null +++ b/weed/command/mini.go @@ -0,0 +1,685 @@ +package command + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + iam_pb "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/worker" + "github.com/seaweedfs/seaweedfs/weed/worker/types" + + // Import task packages to trigger their auto-registration + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" +) + +type MiniOptions struct { + cpuprofile *string + memprofile *string + debug *bool + debugPort *int + v VolumeServerOptions +} + +const ( + miniVolumeMaxDataVolumeCounts = "0" // auto-configured based on free disk space + miniVolumeMinFreeSpace = "1" // 1% minimum free space +) + +var ( + miniOptions MiniOptions + miniMasterOptions MasterOptions + miniFilerOptions FilerOptions + miniS3Options S3Options + miniWebDavOptions WebDavOption + miniAdminOptions AdminOptions + createdInitialIAM bool // Track if initial IAM config was created from env vars +) + +func init() { + cmdMini.Run = runMini // break init cycle +} + +var cmdMini = &Command{ + UsageLine: "mini -dir=/tmp", + Short: "start a complete SeaweedFS setup optimized for S3 beginners and small/dev use cases", + Long: `start a complete SeaweedFS setup with all components optimized for small/dev use cases + +This command starts all components in one process (master, volume, filer, +S3 gateway, WebDAV gateway, and Admin UI). + +All settings are optimized for small/dev use cases: +- Volume size limit: 128MB (small files) +- Volume max: 0 (auto-configured based on free disk space) +- Pre-stop seconds: 1 (faster shutdown) +- Master peers: none (single master mode) + +This is perfect for: +- Development and testing +- Learning SeaweedFS +- Small deployments +- Local S3-compatible storage + +Example Usage: + weed mini # Use current directory + weed mini -dir=/data # Custom data directory + weed mini -dir=/data -master.port=9444 # Custom master port + +After starting, you can access: +- Master UI: http://localhost:9333 +- Volume Server: http://localhost:9340 +- Filer UI: http://localhost:8888 +- S3 Endpoint: http://localhost:8333 +- WebDAV: http://localhost:7333 +- Admin UI: http://localhost:23646 + +S3 Access: +The S3 endpoint is available at http://localhost:8333. For client +configuration and IAM setup, see the project documentation or use the +Admin UI (http://localhost:23646) to manage users and policies. + +`, +} + +var ( + miniIp = cmdMini.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") + miniBindIp = cmdMini.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") + miniTimeout = cmdMini.Flag.Int("idleTimeout", 30, "connection idle seconds") + miniDataCenter = cmdMini.Flag.String("dataCenter", "", "current volume server's data center name") + miniRack = cmdMini.Flag.String("rack", "", "current volume server's rack name") + miniWhiteListOption = cmdMini.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + miniDisableHttp = cmdMini.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") + miniDataFolders = cmdMini.Flag.String("dir", ".", "directory to store data files") + miniMetricsHttpPort = cmdMini.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + miniMetricsHttpIp = cmdMini.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.") + miniS3Config = cmdMini.Flag.String("s3.config", "", "path to the S3 config file") + miniIamConfig = cmdMini.Flag.String("s3.iam.config", "", "path to the advanced IAM config file for S3") + miniS3AllowDeleteBucketNotEmpty = cmdMini.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") +) + +// initMiniCommonFlags initializes common mini flags +func initMiniCommonFlags() { + miniOptions.cpuprofile = cmdMini.Flag.String("cpuprofile", "", "cpu profile output file") + miniOptions.memprofile = cmdMini.Flag.String("memprofile", "", "memory profile output file") + miniOptions.debug = cmdMini.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:6060/debug/pprof/goroutine?debug=2") + miniOptions.debugPort = cmdMini.Flag.Int("debug.port", 6060, "http port for debugging") +} + +// initMiniMasterFlags initializes Master server flag options +func initMiniMasterFlags() { + miniMasterOptions.port = cmdMini.Flag.Int("master.port", 9333, "master server http listen port") + miniMasterOptions.portGrpc = cmdMini.Flag.Int("master.port.grpc", 0, "master server grpc listen port") + miniMasterOptions.metaFolder = cmdMini.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") + miniMasterOptions.peers = cmdMini.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list (default: none for single master)") + miniMasterOptions.volumeSizeLimitMB = cmdMini.Flag.Uint("master.volumeSizeLimitMB", 128, "Master stops directing writes to oversized volumes (default: 128MB for mini)") + miniMasterOptions.volumePreallocate = cmdMini.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") + miniMasterOptions.maxParallelVacuumPerServer = cmdMini.Flag.Int("master.maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel on one volume server") + miniMasterOptions.defaultReplication = cmdMini.Flag.String("master.defaultReplication", "", "Default replication type if not specified.") + miniMasterOptions.garbageThreshold = cmdMini.Flag.Float64("master.garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") + miniMasterOptions.metricsAddress = cmdMini.Flag.String("master.metrics.address", "", "Prometheus gateway address") + miniMasterOptions.metricsIntervalSec = cmdMini.Flag.Int("master.metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + miniMasterOptions.raftResumeState = cmdMini.Flag.Bool("master.resumeState", false, "resume previous state on start master server") + miniMasterOptions.heartbeatInterval = cmdMini.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") + miniMasterOptions.electionTimeout = cmdMini.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers") + miniMasterOptions.raftHashicorp = cmdMini.Flag.Bool("master.raftHashicorp", false, "use hashicorp raft") + miniMasterOptions.raftBootstrap = cmdMini.Flag.Bool("master.raftBootstrap", false, "whether to bootstrap the Raft cluster") + miniMasterOptions.telemetryUrl = cmdMini.Flag.String("master.telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL") + miniMasterOptions.telemetryEnabled = cmdMini.Flag.Bool("master.telemetry", false, "enable telemetry reporting") +} + +// initMiniFilerFlags initializes Filer server flag options +func initMiniFilerFlags() { + miniFilerOptions.filerGroup = cmdMini.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup") + miniFilerOptions.collection = cmdMini.Flag.String("filer.collection", "", "all data will be stored in this collection") + miniFilerOptions.port = cmdMini.Flag.Int("filer.port", 8888, "filer server http listen port") + miniFilerOptions.portGrpc = cmdMini.Flag.Int("filer.port.grpc", 0, "filer server grpc listen port") + miniFilerOptions.publicPort = cmdMini.Flag.Int("filer.port.public", 0, "filer server public http listen port") + miniFilerOptions.defaultReplicaPlacement = cmdMini.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.") + miniFilerOptions.disableDirListing = cmdMini.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") + miniFilerOptions.maxMB = cmdMini.Flag.Int("filer.maxMB", 4, "split files larger than the limit") + miniFilerOptions.dirListingLimit = cmdMini.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") + miniFilerOptions.cipher = cmdMini.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") + miniFilerOptions.saveToFilerLimit = cmdMini.Flag.Int("filer.saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") + miniFilerOptions.concurrentUploadLimitMB = cmdMini.Flag.Int("filer.concurrentUploadLimitMB", 0, "limit total concurrent upload size") + miniFilerOptions.concurrentFileUploadLimit = cmdMini.Flag.Int("filer.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads") + miniFilerOptions.localSocket = cmdMini.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-.sock") + miniFilerOptions.showUIDirectoryDelete = cmdMini.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button") + miniFilerOptions.downloadMaxMBps = cmdMini.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second") + miniFilerOptions.diskType = cmdMini.Flag.String("filer.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") + miniFilerOptions.allowedOrigins = cmdMini.Flag.String("filer.allowedOrigins", "*", "comma separated list of allowed origins") + miniFilerOptions.exposeDirectoryData = cmdMini.Flag.Bool("filer.exposeDirectoryData", true, "whether to return directory metadata and content in Filer UI") + miniFilerOptions.tusBasePath = cmdMini.Flag.String("filer.tusBasePath", "/.tus", "TUS resumable upload endpoint base path") +} + +// initMiniVolumeFlags initializes Volume server flag options +func initMiniVolumeFlags() { + miniOptions.v.port = cmdMini.Flag.Int("volume.port", 9340, "volume server http listen port") + miniOptions.v.portGrpc = cmdMini.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") + miniOptions.v.publicPort = cmdMini.Flag.Int("volume.port.public", 0, "volume server public port") + miniOptions.v.id = cmdMini.Flag.String("volume.id", "", "volume server id. If empty, default to ip:port") + miniOptions.v.publicUrl = cmdMini.Flag.String("volume.publicUrl", "", "publicly accessible address") + miniOptions.v.indexType = cmdMini.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") + miniOptions.v.diskType = cmdMini.Flag.String("volume.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") + miniOptions.v.fixJpgOrientation = cmdMini.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") + miniOptions.v.readMode = cmdMini.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") + miniOptions.v.compactionMBPerSecond = cmdMini.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") + miniOptions.v.maintenanceMBPerSecond = cmdMini.Flag.Int("volume.maintenanceMBps", 0, "limit maintenance IO rate in MB/s") + miniOptions.v.fileSizeLimitMB = cmdMini.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") + miniOptions.v.ldbTimeout = cmdMini.Flag.Int64("volume.index.leveldbTimeout", 0, "alive time for leveldb") + miniOptions.v.concurrentUploadLimitMB = cmdMini.Flag.Int("volume.concurrentUploadLimitMB", 0, "limit total concurrent upload size") + miniOptions.v.concurrentDownloadLimitMB = cmdMini.Flag.Int("volume.concurrentDownloadLimitMB", 0, "limit total concurrent download size") + miniOptions.v.pprof = cmdMini.Flag.Bool("volume.pprof", false, "enable pprof http handlers") + miniOptions.v.idxFolder = cmdMini.Flag.String("volume.dir.idx", "", "directory to store .idx files") + miniOptions.v.inflightUploadDataTimeout = cmdMini.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout") + miniOptions.v.inflightDownloadDataTimeout = cmdMini.Flag.Duration("volume.inflightDownloadDataTimeout", 60*time.Second, "inflight download data wait timeout") + miniOptions.v.hasSlowRead = cmdMini.Flag.Bool("volume.hasSlowRead", true, "if true, prevents slow reads from blocking other requests") + miniOptions.v.readBufferSizeMB = cmdMini.Flag.Int("volume.readBufferSizeMB", 4, "read buffer size in MB") + miniOptions.v.preStopSeconds = cmdMini.Flag.Int("volume.preStopSeconds", 1, "number of seconds between stop send heartbeats and stop volume server (default: 1 for mini)") +} + +// initMiniS3Flags initializes S3 server flag options +func initMiniS3Flags() { + miniS3Options.port = cmdMini.Flag.Int("s3.port", 8333, "s3 server http listen port") + miniS3Options.portHttps = cmdMini.Flag.Int("s3.port.https", 0, "s3 server https listen port") + miniS3Options.portGrpc = cmdMini.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") + miniS3Options.domainName = cmdMini.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") + miniS3Options.allowedOrigins = cmdMini.Flag.String("s3.allowedOrigins", "*", "comma separated list of allowed origins") + miniS3Options.tlsPrivateKey = cmdMini.Flag.String("s3.key.file", "", "path to the TLS private key file") + miniS3Options.tlsCertificate = cmdMini.Flag.String("s3.cert.file", "", "path to the TLS certificate file") + miniS3Options.tlsCACertificate = cmdMini.Flag.String("s3.cacert.file", "", "path to the TLS CA certificate file") + miniS3Options.tlsVerifyClientCert = cmdMini.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") + miniS3Options.metricsHttpPort = cmdMini.Flag.Int("s3.metricsPort", 0, "Prometheus metrics listen port") + miniS3Options.metricsHttpIp = cmdMini.Flag.String("s3.metricsIp", "", "metrics listen ip") + miniS3Options.localFilerSocket = cmdMini.Flag.String("s3.localFilerSocket", "", "local filer socket path") + miniS3Options.localSocket = cmdMini.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-.sock") + miniS3Options.idleTimeout = cmdMini.Flag.Int("s3.idleTimeout", 120, "connection idle seconds") + miniS3Options.concurrentUploadLimitMB = cmdMini.Flag.Int("s3.concurrentUploadLimitMB", 0, "limit total concurrent upload size") + miniS3Options.concurrentFileUploadLimit = cmdMini.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads") + miniS3Options.enableIam = cmdMini.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same port") + miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center") + miniS3Options.config = miniS3Config + miniS3Options.iamConfig = miniIamConfig + miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") + miniS3Options.allowDeleteBucketNotEmpty = miniS3AllowDeleteBucketNotEmpty + miniS3Options.debug = cmdMini.Flag.Bool("s3.debug", false, "serves runtime profiling data via pprof") + miniS3Options.debugPort = cmdMini.Flag.Int("s3.debug.port", 6060, "http port for debugging") +} + +// initMiniWebDAVFlags initializes WebDAV server flag options +func initMiniWebDAVFlags() { + miniWebDavOptions.port = cmdMini.Flag.Int("webdav.port", 7333, "webdav server http listen port") + miniWebDavOptions.collection = cmdMini.Flag.String("webdav.collection", "", "collection to create the files") + miniWebDavOptions.replication = cmdMini.Flag.String("webdav.replication", "", "replication to create the files") + miniWebDavOptions.disk = cmdMini.Flag.String("webdav.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") + miniWebDavOptions.tlsPrivateKey = cmdMini.Flag.String("webdav.key.file", "", "path to the TLS private key file") + miniWebDavOptions.tlsCertificate = cmdMini.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") + miniWebDavOptions.cacheDir = cmdMini.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") + miniWebDavOptions.cacheSizeMB = cmdMini.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") + miniWebDavOptions.maxMB = cmdMini.Flag.Int("webdav.maxMB", 4, "split files larger than the limit") + miniWebDavOptions.filerRootPath = cmdMini.Flag.String("webdav.filer.path", "/", "use this remote path from filer server") +} + +// initMiniAdminFlags initializes Admin server flag options +func initMiniAdminFlags() { + miniAdminOptions.port = cmdMini.Flag.Int("admin.port", 23646, "admin server http listen port") + miniAdminOptions.grpcPort = cmdMini.Flag.Int("admin.port.grpc", 0, "admin server grpc listen port (default: admin http port + 10000)") + miniAdminOptions.master = cmdMini.Flag.String("admin.master", "", "master server address (automatically set)") + miniAdminOptions.dataDir = cmdMini.Flag.String("admin.dataDir", "", "directory to store admin configuration and data files") + miniAdminOptions.adminUser = cmdMini.Flag.String("admin.user", "admin", "admin interface username") + miniAdminOptions.adminPassword = cmdMini.Flag.String("admin.password", "", "admin interface password (if empty, auth is disabled)") +} + +func init() { + // Initialize common flags + initMiniCommonFlags() + + // Initialize component-specific flags + initMiniMasterFlags() + initMiniFilerFlags() + initMiniVolumeFlags() + initMiniS3Flags() + initMiniWebDAVFlags() + initMiniAdminFlags() +} + +func runMini(cmd *Command, args []string) bool { + + if *miniOptions.debug { + grace.StartDebugServer(*miniOptions.debugPort) + } + + util.LoadSecurityConfiguration() + util.LoadConfiguration("master", false) + + grace.SetupProfiling(*miniOptions.cpuprofile, *miniOptions.memprofile) + + // Set master.peers to "none" if not specified (single master mode) + if *miniMasterOptions.peers == "" { + *miniMasterOptions.peers = "none" + } + + // Validate and complete the peer list + _, peerList := checkPeers(*miniIp, *miniMasterOptions.port, *miniMasterOptions.portGrpc, *miniMasterOptions.peers) + actualPeersForComponents := strings.Join(pb.ToAddressStrings(peerList), ",") + + if *miniBindIp == "" { + *miniBindIp = *miniIp + } + + if *miniMetricsHttpIp == "" { + *miniMetricsHttpIp = *miniBindIp + } + + // ip address + miniMasterOptions.ip = miniIp + miniMasterOptions.ipBind = miniBindIp + miniFilerOptions.masters = pb.ServerAddresses(actualPeersForComponents).ToServiceDiscovery() + miniFilerOptions.ip = miniIp + miniFilerOptions.bindIp = miniBindIp + miniS3Options.bindIp = miniBindIp + miniWebDavOptions.ipBind = miniBindIp + miniOptions.v.ip = miniIp + miniOptions.v.bindIp = miniBindIp + miniOptions.v.masters = pb.ServerAddresses(actualPeersForComponents).ToAddresses() + miniOptions.v.idleConnectionTimeout = miniTimeout + miniOptions.v.dataCenter = miniDataCenter + miniOptions.v.rack = miniRack + + miniMasterOptions.whiteList = miniWhiteListOption + + miniFilerOptions.dataCenter = miniDataCenter + miniFilerOptions.rack = miniRack + miniS3Options.dataCenter = miniDataCenter + miniFilerOptions.disableHttp = miniDisableHttp + miniMasterOptions.disableHttp = miniDisableHttp + + filerAddress := string(pb.NewServerAddress(*miniIp, *miniFilerOptions.port, *miniFilerOptions.portGrpc)) + miniS3Options.filer = &filerAddress + miniWebDavOptions.filer = &filerAddress + + go stats_collect.StartMetricsServer(*miniMetricsHttpIp, *miniMetricsHttpPort) + + if *miniMasterOptions.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { + glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000") + } + + if *miniMasterOptions.metaFolder == "" { + *miniMasterOptions.metaFolder = *miniDataFolders + } + if err := util.TestFolderWritable(util.ResolvePath(*miniMasterOptions.metaFolder)); err != nil { + glog.Fatalf("Check Meta Folder (-dir=\"%s\") Writable: %s", *miniMasterOptions.metaFolder, err) + } + miniFilerOptions.defaultLevelDbDirectory = miniMasterOptions.metaFolder + + miniWhiteList := util.StringSplit(*miniWhiteListOption, ",") + + // Start all services with proper dependency coordination + // This channel will be closed when all services are fully ready + allServicesReady := make(chan struct{}) + startMiniServices(miniWhiteList, allServicesReady) + + // Wait for all services to be fully running before printing welcome message + <-allServicesReady + + // Print welcome message after all services are running + printWelcomeMessage() + + select {} +} + +// startMiniServices starts all mini services with proper dependency coordination +func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { + // Start Master server (no dependencies) + go startMiniService("Master", func() { + startMaster(miniMasterOptions, miniWhiteList) + }, *miniMasterOptions.port) + + // Wait for master to be ready + waitForServiceReady("Master", *miniMasterOptions.port) + + // Start Volume server (depends on master) + go startMiniService("Volume", func() { + minFreeSpaces := util.MustParseMinFreeSpace(miniVolumeMinFreeSpace, "") + miniOptions.v.startVolumeServer(*miniDataFolders, miniVolumeMaxDataVolumeCounts, *miniWhiteListOption, minFreeSpaces) + }, *miniOptions.v.port) + + // Wait for volume to be ready + waitForServiceReady("Volume", *miniOptions.v.port) + + // Start Filer (depends on master and volume) + go startMiniService("Filer", func() { + miniFilerOptions.startFiler() + }, *miniFilerOptions.port) + + // Wait for filer to be ready + waitForServiceReady("Filer", *miniFilerOptions.port) + + // Start S3 and WebDAV in parallel (both depend on filer) + go startMiniService("S3", func() { + startS3Service() + }, *miniS3Options.port) + + go startMiniService("WebDAV", func() { + miniWebDavOptions.startWebDav() + }, *miniWebDavOptions.port) + + // Wait for both S3 and WebDAV to be ready + waitForServiceReady("S3", *miniS3Options.port) + waitForServiceReady("WebDAV", *miniWebDavOptions.port) + + // Start Admin with worker (depends on master, filer, S3, WebDAV) + go startMiniAdminWithWorker(allServicesReady) +} + +// startMiniService starts a service in a goroutine with logging +func startMiniService(name string, fn func(), port int) { + glog.Infof("%s service starting...", name) + fn() +} + +// waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections +func waitForServiceReady(name string, port int) { + address := fmt.Sprintf("http://127.0.0.1:%d", port) + maxAttempts := 30 // 30 * 200ms = 6 seconds max wait + attempt := 0 + client := &http.Client{ + Timeout: 200 * time.Millisecond, + } + + for attempt < maxAttempts { + resp, err := client.Get(address) + if err == nil { + resp.Body.Close() + glog.Infof("%s service is ready at %s", name, address) + return + } + attempt++ + time.Sleep(200 * time.Millisecond) + } + + // Service failed to become ready, log warning but don't fail startup + // (services may still work even if health check endpoint isn't responding immediately) + glog.Warningf("Health check for %s failed (service may still be functional, retries may succeed)", name) +} + +// startS3Service initializes and starts the S3 server +func startS3Service() { + // Use existing AWS env vars if present (no new env vars). + accessKey := os.Getenv("AWS_ACCESS_KEY_ID") + secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + + if accessKey != "" && secretKey != "" { + user := "mini" + iamCfg := &iam_pb.S3ApiConfiguration{} + ident := &iam_pb.Identity{Name: user} + ident.Credentials = append(ident.Credentials, &iam_pb.Credential{AccessKey: accessKey, SecretKey: secretKey}) + iamCfg.Identities = append(iamCfg.Identities, ident) + + iamPath := filepath.Join(*miniDataFolders, "iam_config.json") + + // Check if IAM config file already exists + if _, err := os.Stat(iamPath); err == nil { + // File exists, skip writing to preserve existing configuration + glog.V(1).Infof("IAM config file already exists at %s, preserving existing configuration", iamPath) + *miniIamConfig = iamPath + } else if os.IsNotExist(err) { + // File does not exist, create and write new configuration + f, err := os.OpenFile(iamPath, os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + glog.Fatalf("failed to create IAM config file %s: %v", iamPath, err) + } + defer f.Close() + if err := filer.ProtoToText(f, iamCfg); err != nil { + glog.Fatalf("failed to write IAM config to %s: %v", iamPath, err) + } + *miniIamConfig = iamPath + createdInitialIAM = true // Mark that we created initial IAM config + glog.V(1).Infof("Created initial IAM config at %s", iamPath) + } else { + // Error checking file existence + glog.Fatalf("failed to check IAM config file existence at %s: %v", iamPath, err) + } + } + + miniS3Options.localFilerSocket = miniFilerOptions.localSocket + miniS3Options.startS3Server() +} + +// startMiniAdminWithWorker starts the admin server with one worker +func startMiniAdminWithWorker(allServicesReady chan struct{}) { + defer close(allServicesReady) // Ensure channel is always closed on all paths + + ctx := context.Background() + + // Prepare master address + masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port) + + // Set admin options + *miniAdminOptions.master = masterAddr + if *miniAdminOptions.grpcPort == 0 { + *miniAdminOptions.grpcPort = *miniAdminOptions.port + 10000 + } + + // Create data directory if specified + if *miniAdminOptions.dataDir == "" { + // Use a subdirectory in the main data folder + *miniAdminOptions.dataDir = filepath.Join(*miniDataFolders, "admin") + } + + // Start admin server in background + go func() { + if err := startAdminServer(ctx, miniAdminOptions); err != nil { + glog.Errorf("Admin server error: %v", err) + } + }() + + // Wait for admin server's HTTP port to be ready before launching worker + adminAddr := fmt.Sprintf("http://127.0.0.1:%d", *miniAdminOptions.port) + glog.V(1).Infof("Waiting for admin server to be ready at %s...", adminAddr) + if err := waitForAdminServerReady(adminAddr); err != nil { + glog.Fatalf("Admin server readiness check failed: %v", err) + } + + // Start worker after admin server is ready + startMiniWorker() + + // Wait for worker to be ready by polling its gRPC port + workerGrpcAddr := fmt.Sprintf("127.0.0.1:%d", *miniAdminOptions.grpcPort) + waitForWorkerReady(workerGrpcAddr) +} + +// waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready +func waitForAdminServerReady(adminAddr string) error { + maxAttempts := 40 // 40 * 500ms = 20 seconds max wait + attempt := 0 + client := &http.Client{ + Timeout: 500 * time.Millisecond, + } + + for attempt < maxAttempts { + resp, err := client.Get(adminAddr) + if err == nil { + resp.Body.Close() + glog.V(1).Infof("Admin server is ready at %s", adminAddr) + return nil + } + attempt++ + time.Sleep(500 * time.Millisecond) + } + + return fmt.Errorf("admin server did not become ready at %s after %d attempts", adminAddr, maxAttempts) +} + +// waitForWorkerReady polls the worker's gRPC port to ensure the worker has fully initialized +func waitForWorkerReady(workerGrpcAddr string) { + maxAttempts := 30 // 30 * 200ms = 6 seconds max wait + attempt := 0 + + // Worker gRPC server doesn't have an HTTP endpoint, so we'll use a simple TCP connection attempt + // as a synchronization point to ensure the worker has started listening + for attempt < maxAttempts { + conn, err := net.DialTimeout("tcp", workerGrpcAddr, 200*time.Millisecond) + if err == nil { + conn.Close() + glog.V(1).Infof("Worker is ready at %s", workerGrpcAddr) + return + } + attempt++ + time.Sleep(200 * time.Millisecond) + } + + // Worker readiness check failed, but log as warning since worker may still be functional + glog.Warningf("Worker readiness check timed out at %s (worker may still be functional)", workerGrpcAddr) +} + +// startMiniWorker starts a single worker for the admin server +func startMiniWorker() { + glog.Infof("Starting maintenance worker for admin server") + + adminAddr := fmt.Sprintf("%s:%d", *miniIp, *miniAdminOptions.port) + capabilities := "vacuum,ec,balance" + + // Use worker directory under main data folder + workerDir := filepath.Join(*miniDataFolders, "worker") + if err := os.MkdirAll(workerDir, 0755); err != nil { + glog.Fatalf("Failed to create worker directory: %v", err) + } + + glog.Infof("Worker connecting to admin server: %s", adminAddr) + glog.Infof("Worker capabilities: %s", capabilities) + glog.Infof("Worker directory: %s", workerDir) + + // Parse capabilities + capabilitiesParsed := parseCapabilities(capabilities) + if len(capabilitiesParsed) == 0 { + glog.Fatalf("No valid capabilities for worker") + } + + // Create task directories + for _, capability := range capabilitiesParsed { + taskDir := filepath.Join(workerDir, string(capability)) + if err := os.MkdirAll(taskDir, 0755); err != nil { + glog.Fatalf("Failed to create task directory %s: %v", taskDir, err) + } + } + + // Load security configuration for gRPC communication + util.LoadConfiguration("security", false) + + // Create gRPC dial option using TLS configuration + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") + + // Create worker configuration + config := &types.WorkerConfig{ + AdminServer: adminAddr, + Capabilities: capabilitiesParsed, + MaxConcurrent: 2, + HeartbeatInterval: 30 * time.Second, + TaskRequestInterval: 5 * time.Second, + BaseWorkingDir: workerDir, + GrpcDialOption: grpcDialOption, + } + + // Create worker instance + workerInstance, err := worker.NewWorker(config) + if err != nil { + glog.Fatalf("Failed to create worker: %v", err) + } + + // Create admin client + adminClient, err := worker.CreateAdminClient(adminAddr, workerInstance.ID(), grpcDialOption) + if err != nil { + glog.Fatalf("Failed to create admin client: %v", err) + } + + // Set admin client + workerInstance.SetAdminClient(adminClient) + + // Start the worker + err = workerInstance.Start() + if err != nil { + glog.Fatalf("Failed to start worker: %v", err) + } + + glog.Infof("Maintenance worker %s started successfully", workerInstance.ID()) +} + +const welcomeMessageTemplate = ` +╔═══════════════════════════════════════════════════════════════════════════════╗ +║ SeaweedFS Mini - All-in-One Mode ║ +╚═══════════════════════════════════════════════════════════════════════════════╝ + + All components are running and ready to use: + + Master UI: http://%s:%d + Filer UI: http://%s:%d + S3 Endpoint: http://%s:%d + WebDAV: http://%s:%d + Admin UI: http://%s:%d + Volume Server: http://%s:%d + + Optimized Settings: + • Volume size limit: 128MB + • Volume max: auto (based on free disk space) + • Pre-stop seconds: 1 (faster shutdown) + • Master peers: none (single master mode) + • Admin UI for management and maintenance tasks + + Data Directory: %s + + Press Ctrl+C to stop all components +` + +const credentialsInstructionTemplate = ` + To create S3 credentials, you have two options: + + Option 1: Use environment variables (recommended for quick setup) + export AWS_ACCESS_KEY_ID=your-access-key + export AWS_SECRET_ACCESS_KEY=your-secret-key + weed mini -dir=/data + This will create initial credentials for the 'mini' user. + + Option 2: Use the Admin UI + Open: http://%s:%d + Add a new identity to create S3 credentials. +` + +const credentialsCreatedMessage = ` + Initial S3 credentials created: + user: mini + Note: credentials have been written to the IAM configuration file. +` + +// printWelcomeMessage prints the welcome message after all services are running +func printWelcomeMessage() { + fmt.Printf(welcomeMessageTemplate, + *miniIp, *miniMasterOptions.port, + *miniIp, *miniFilerOptions.port, + *miniIp, *miniS3Options.port, + *miniIp, *miniWebDavOptions.port, + *miniIp, *miniAdminOptions.port, + *miniIp, *miniOptions.v.port, + *miniDataFolders, + ) + + if createdInitialIAM { + fmt.Print(credentialsCreatedMessage) + } else { + fmt.Printf(credentialsInstructionTemplate, *miniIp, *miniAdminOptions.port) + } + fmt.Println("") +} diff --git a/weed/command/s3.go b/weed/command/s3.go index 940536176..d7140ded5 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -246,7 +246,7 @@ func (s3opt *S3Options) startS3Server() bool { return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filers %v grpc address", filerAddresses) + glog.V(2).Infof("wait to connect to filers %v grpc address", filerAddresses) time.Sleep(time.Second) } else { glog.V(0).Infof("connected to filers %v", filerAddresses) diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 5ad0ca225..31a6f5f47 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -3,13 +3,14 @@ package command import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/version" "net/http" "os" "os/user" "strconv" "time" + "github.com/seaweedfs/seaweedfs/weed/util/version" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -102,7 +103,7 @@ func (wo *WebDavOption) startWebDav() bool { return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress()) + glog.V(2).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress()) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 14b74c037..df6445327 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -201,11 +201,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) for _, v := range newVolumes { - glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url()) + glog.V(1).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url()) message.NewVids = append(message.NewVids, uint32(v.Id)) } for _, v := range deletedVolumes { - glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url()) + glog.V(1).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url()) message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) } } diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 28eabd719..4fcc066fc 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -197,7 +197,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne l.SetVolume(vid, v) size, _, _ := v.FileStat() - glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s disk_id=%d", + glog.V(2).Infof("data file %s, replication=%s v=%d size=%d ttl=%s disk_id=%d", l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String(), diskId) return true } @@ -257,10 +257,10 @@ func (l *DiskLocation) loadExistingVolumesWithId(needleMapKind NeedleMapKind, ld } } l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout, diskId) - glog.V(0).Infof("Store started on dir: %s with %d volumes max %d (disk ID: %d)", l.Directory, len(l.volumes), l.MaxVolumeCount, diskId) + glog.V(2).Infof("Store started on dir: %s with %d volumes max %d (disk ID: %d)", l.Directory, len(l.volumes), l.MaxVolumeCount, diskId) l.loadAllEcShards() - glog.V(0).Infof("Store started on dir: %s with %d ec shards (disk ID: %d)", l.Directory, len(l.ecVolumes), diskId) + glog.V(2).Infof("Store started on dir: %s with %d ec shards (disk ID: %d)", l.Directory, len(l.ecVolumes), diskId) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 4f550a949..65fe143bd 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -108,7 +108,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if err == nil { v.volumeInfo.Version = uint32(v.SuperBlock.Version) } - glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version) + glog.V(2).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version) if v.HasRemoteFile() { // maybe temporary network problem glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err) @@ -149,7 +149,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind // storage tier, and download to local storage, which may cause the // capactiy overloading. if !v.HasRemoteFile() { - glog.V(0).Infof("checking volume data integrity for volume %d", v.Id) + glog.V(2).Infof("checking volume data integrity for volume %d", v.Id) if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil { v.noWriteOrDelete = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err) @@ -164,10 +164,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind switch needleMapKind { case NeedleMapInMemory: if v.tmpNm != nil { - glog.V(0).Infof("updating memory compact index %s ", v.FileName(".idx")) + glog.V(2).Infof("updating memory compact index %s ", v.FileName(".idx")) err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0) } else { - glog.V(0).Infoln("loading memory index", v.FileName(".idx"), "to memory") + glog.V(2).Infoln("loading memory index", v.FileName(".idx"), "to memory") if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err) } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 3200879e5..ecbacef75 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -414,7 +414,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool { return false } } - glog.V(0).Infoln("Volume", vid, "becomes writable") + glog.V(1).Infoln("Volume", vid, "becomes writable") vl.writables = append(vl.writables, vid) return true }