From 25975bacfb71554d2e270900b4eff6b23bd0a920 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 3 Jan 2026 14:41:25 -0800 Subject: [PATCH] fix(gcs): resolve credential conflict and improve backup logging (#7951) * fix(gcs): resolve credential conflict and improve backup logging - Workaround GCS SDK's "multiple credential options" error by manually constructing an authenticated HTTP client. - Include source entry path in filer backup error logs for better visibility on missing volumes/404s. * fix: address PR review feedback - Add nil check for EventNotification in getSourceKey - Avoid reassigning google_application_credentials parameter in gcs_sink.go * fix(gcs): return errors instead of calling glog.Fatalf in initialize Adheres to Go best practices and allows for more graceful failure handling by callers. * read from bind ip --- weed/command/filer_backup.go | 18 ++++++++++-- weed/command/mini.go | 14 +++++---- weed/replication/sink/gcssink/gcs_sink.go | 35 ++++++++++++++++------- 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 3387fccbf..0d24f6a24 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -148,13 +148,13 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti } // ignore HTTP 404 from remote reads if errors.Is(err, http.ErrNotFound) { - glog.V(0).Infof("got 404 error, ignore it: %s", err.Error()) + glog.V(0).Infof("got 404 error for %s, ignore it: %s", getSourceKey(resp), err.Error()) return nil } // also ignore missing volume/lookup errors coming from LookupFileId or vid map errStr := err.Error() if strings.Contains(errStr, "LookupFileId") || (strings.Contains(errStr, "volume id") && strings.Contains(errStr, "not found")) { - glog.V(0).Infof("got missing-volume error, ignore it: %s", errStr) + glog.V(0).Infof("got missing-volume error for %s, ignore it: %s", getSourceKey(resp), errStr) return nil } return err @@ -206,3 +206,17 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) } + +func getSourceKey(resp *filer_pb.SubscribeMetadataResponse) string { + if resp == nil || resp.EventNotification == nil { + return "" + } + message := resp.EventNotification + if message.NewEntry != nil { + return string(util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + } + if message.OldEntry != nil { + return string(util.FullPath(resp.Directory).Child(message.OldEntry.Name)) + } + return "" +} diff --git a/weed/command/mini.go b/weed/command/mini.go index 39a271ee9..a75d631d8 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -918,6 +918,9 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { ctx := context.Background() + // Determine bind IP for health checks + bindIp := getBindIp() + // Prepare master address masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port) @@ -961,7 +964,7 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { }() // Wait for admin server's HTTP port to be ready before launching worker - adminAddr := fmt.Sprintf("http://127.0.0.1:%d", *miniAdminOptions.port) + adminAddr := fmt.Sprintf("http://%s:%d", bindIp, *miniAdminOptions.port) glog.V(1).Infof("Waiting for admin server to be ready at %s...", adminAddr) if err := waitForAdminServerReady(adminAddr); err != nil { glog.Fatalf("Admin server readiness check failed: %v", err) @@ -971,20 +974,21 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { startMiniWorker() // Wait for worker to be ready by polling its gRPC port - workerGrpcAddr := fmt.Sprintf("127.0.0.1:%d", *miniAdminOptions.grpcPort) + workerGrpcAddr := fmt.Sprintf("%s:%d", bindIp, *miniAdminOptions.grpcPort) waitForWorkerReady(workerGrpcAddr) } // waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready func waitForAdminServerReady(adminAddr string) error { - maxAttempts := 40 // 40 * 500ms = 20 seconds max wait + healthAddr := fmt.Sprintf("%s/health", adminAddr) + maxAttempts := 60 // 60 * 500ms = 30 seconds max wait attempt := 0 client := &http.Client{ - Timeout: 500 * time.Millisecond, + Timeout: 1 * time.Second, } for attempt < maxAttempts { - resp, err := client.Get(adminAddr) + resp, err := client.Get(healthAddr) if err == nil { resp.Body.Close() glog.V(1).Infof("Admin server is ready at %s", adminAddr) diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index 1a930fd4a..1c4ae9e9b 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -4,14 +4,15 @@ import ( "context" "fmt" "os" - - "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" + "strings" "cloud.google.com/go/storage" + "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" "google.golang.org/api/option" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -60,16 +61,30 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str g.dir = dir // Creates a client. - if google_application_credentials == "" { - var found bool - google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") - if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") + var clientOpts []option.ClientOption + if google_application_credentials != "" { + var data []byte + var err error + if strings.HasPrefix(google_application_credentials, "{") { + data = []byte(google_application_credentials) + } else { + googleCredentialsPath := util.ResolvePath(google_application_credentials) + data, err = os.ReadFile(googleCredentialsPath) + if err != nil { + return fmt.Errorf("failed to read credentials file %s: %v", googleCredentialsPath, err) + } + } + creds, err := google.CredentialsFromJSON(context.Background(), data, storage.ScopeFullControl) + if err != nil { + return fmt.Errorf("failed to parse credentials: %v", err) } + httpClient := oauth2.NewClient(context.Background(), creds.TokenSource) + clientOpts = append(clientOpts, option.WithHTTPClient(httpClient), option.WithoutAuthentication()) } - client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials)) + client, err := storage.NewClient(context.Background(), clientOpts...) if err != nil { - glog.Fatalf("Failed to create client: %v", err) + return fmt.Errorf("failed to create client with credentials \"%s\" env \"%s\": %v", + google_application_credentials, os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"), err) } g.client = client