Browse Source

Merge branch 'master' into rust-volume-server

rust-volume-server
Chris Lu 1 day ago
committed by GitHub
parent
commit
6ff96b9880
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 44
      .github/workflows/claude-code-review.yml
  2. 50
      .github/workflows/claude.yml
  3. 8
      k8s/charts/seaweedfs/templates/all-in-one/all-in-one-deployment.yaml
  4. 2
      k8s/charts/seaweedfs/templates/all-in-one/all-in-one-service.yml
  5. 6
      k8s/charts/seaweedfs/templates/shared/post-install-bucket-hook.yaml
  6. 10
      k8s/charts/seaweedfs/templates/volume/volume-ingress.yaml
  7. 4
      k8s/charts/seaweedfs/templates/volume/volume-statefulset.yaml
  8. 13
      k8s/charts/seaweedfs/values.yaml
  9. 83
      weed/admin/dash/cluster_topology.go
  10. 13
      weed/admin/dash/volume_management.go
  11. 14
      weed/command/fuse_std.go
  12. 6
      weed/command/mount.go
  13. 2
      weed/command/mount_darwin.go
  14. 2
      weed/command/mount_freebsd.go
  15. 10
      weed/command/mount_linux.go
  16. 12
      weed/command/mount_std.go
  17. 20
      weed/glog/glog.go
  18. 35
      weed/glog/glog_file.go
  19. 57
      weed/glog/glog_test.go
  20. 28
      weed/s3api/s3api_object_handlers_put.go
  21. 8
      weed/server/master_ui/master.html
  22. 8
      weed/server/master_ui/masterNewRaft.html

44
.github/workflows/claude-code-review.yml

@ -0,0 +1,44 @@
name: Claude Code Review
on:
pull_request:
types: [opened, synchronize, ready_for_review, reopened]
# Optional: Only run on specific file changes
# paths:
# - "src/**/*.ts"
# - "src/**/*.tsx"
# - "src/**/*.js"
# - "src/**/*.jsx"
jobs:
claude-review:
# Optional: Filter by PR author
# if: |
# github.event.pull_request.user.login == 'external-contributor' ||
# github.event.pull_request.user.login == 'new-developer' ||
# github.event.pull_request.author_association == 'FIRST_TIME_CONTRIBUTOR'
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
issues: read
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Run Claude Code Review
id: claude-review
uses: anthropics/claude-code-action@v1
with:
claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
plugin_marketplaces: 'https://github.com/anthropics/claude-code.git'
plugins: 'code-review@claude-code-plugins'
prompt: '/code-review:code-review ${{ github.repository }}/pull/${{ github.event.pull_request.number }}'
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://code.claude.com/docs/en/cli-reference for available options

50
.github/workflows/claude.yml

@ -0,0 +1,50 @@
name: Claude Code
on:
issue_comment:
types: [created]
pull_request_review_comment:
types: [created]
issues:
types: [opened, assigned]
pull_request_review:
types: [submitted]
jobs:
claude:
if: |
(github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) ||
(github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude')))
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
issues: read
id-token: write
actions: read # Required for Claude to read CI results on PRs
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Run Claude Code
id: claude
uses: anthropics/claude-code-action@v1
with:
claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
# This is an optional setting that allows Claude to read CI results on PRs
additional_permissions: |
actions: read
# Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it.
# prompt: 'Update the pull request description to include a summary of changes.'
# Optional: Add claude_args to customize behavior and configuration
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://code.claude.com/docs/en/cli-reference for available options
# claude_args: '--allowed-tools Bash(gh pr:*)'

8
k8s/charts/seaweedfs/templates/all-in-one/all-in-one-deployment.yaml

@ -168,8 +168,12 @@ spec:
{{- if .Values.allInOne.disableHttp }}
-disableHttp={{ .Values.allInOne.disableHttp }} \
{{- end }}
{{- if and (.Values.volume.dataDirs) (index .Values.volume.dataDirs 0 "maxVolumes") }}
-volume.max={{ index .Values.volume.dataDirs 0 "maxVolumes" }} \
{{- if .Values.volume.dataDirs }}
{{- with (first .Values.volume.dataDirs) }}
{{- if and (hasKey . "maxVolumes") (ne .maxVolumes nil) }}
-volume.max={{ .maxVolumes }} \
{{- end }}
{{- end }}
{{- end }}
-master.port={{ .Values.master.port }} \
{{- if .Values.global.enableReplication }}

2
k8s/charts/seaweedfs/templates/all-in-one/all-in-one-service.yml

@ -73,7 +73,7 @@ spec:
targetPort: {{ .Values.allInOne.sftp.port | default .Values.sftp.port }}
protocol: TCP
{{- end }}
# Server metrics port (single metrics endpoint for all services)
{{- if .Values.allInOne.metricsPort }}
- name: "server-metrics"

6
k8s/charts/seaweedfs/templates/shared/post-install-bucket-hook.yaml

@ -64,9 +64,11 @@ spec:
{{- if .Values.filer.podSecurityContext.enabled }}
securityContext: {{- omit .Values.filer.podSecurityContext "enabled" | toYaml | nindent 8 }}
{{- end }}
{{- include "seaweedfs.imagePullSecrets" $ | nindent 6 }}
containers:
- name: post-install-job
image: {{ template "master.image" . }}
imagePullPolicy: {{ $.Values.global.imagePullPolicy | default "IfNotPresent" }}
env:
- name: WEED_CLUSTER_DEFAULT
value: "sw"
@ -187,6 +189,10 @@ spec:
{{- end }}
- containerPort: {{ .Values.master.grpcPort }}
#name: swfs-master-grpc
{{- with coalesce .Values.allInOne.s3.createBucketsHook.resources .Values.s3.createBucketsHook.resources .Values.filer.s3.createBucketsHook.resources }}
resources:
{{- toYaml . | nindent 10 }}
{{- end }}
{{- if .Values.filer.containerSecurityContext.enabled }}
securityContext: {{- omit .Values.filer.containerSecurityContext "enabled" | toYaml | nindent 12 }}
{{- end }}

10
k8s/charts/seaweedfs/templates/volume/volume-ingress.yaml

@ -1,4 +1,8 @@
{{- if and .Values.volume.enabled .Values.volume.ingress.enabled }}
{{- /* Volume ingress works for both normal mode (volume.enabled) and all-in-one mode (allInOne.enabled) */}}
{{- $volumeEnabled := or .Values.volume.enabled .Values.allInOne.enabled }}
{{- if and $volumeEnabled .Values.volume.ingress.enabled }}
{{- /* Determine service name based on deployment mode */}}
{{- $serviceName := ternary (include "seaweedfs.componentName" (list . "all-in-one")) (include "seaweedfs.componentName" (list . "volume")) .Values.allInOne.enabled }}
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion }}
apiVersion: networking.k8s.io/v1
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion }}
@ -42,11 +46,11 @@ spec:
backend:
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion }}
service:
name: {{ include "seaweedfs.componentName" (list . "volume") }}
name: {{ $serviceName }}
port:
number: {{ .Values.volume.port }}
{{- else }}
serviceName: {{ include "seaweedfs.componentName" (list . "volume") }}
serviceName: {{ $serviceName }}
servicePort: {{ .Values.volume.port }}
{{- end }}
{{- end }}

4
k8s/charts/seaweedfs/templates/volume/volume-statefulset.yaml

@ -86,6 +86,10 @@ spec:
- name: {{ $dir.name }}
mountPath: /{{ $dir.name }}
{{- end }}
{{- with $volume.idxVolMoveResources }}
resources:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- if $volume.containerSecurityContext.enabled }}
securityContext: {{- omit $volume.containerSecurityContext "enabled" | toYaml | nindent 12 }}
{{- end }}

13
k8s/charts/seaweedfs/values.yaml

@ -393,6 +393,12 @@ volume:
idx: {}
# Resource requests, limits, etc. for the vol-move-idx initContainer. This
# should map directly to the value of the resources field for a PodSpec,
# formatted as a multi-line string. By default no direct resource request
# is made.
idxVolMoveResources: {}
logs: {}
# limit background compaction or copying speed in mega bytes per second
@ -909,6 +915,8 @@ filer:
# versioning: Enabled
# - name: bucket-b
# anonymousRead: false
createBucketsHook:
resources: {}
s3:
enabled: false
@ -1076,6 +1084,9 @@ s3:
failureThreshold: 100
timeoutSeconds: 10
createBucketsHook:
resources: {}
ingress:
enabled: false
className: ""
@ -1479,6 +1490,8 @@ allInOne:
# versioning: Enabled
# - name: bucket-b
# anonymousRead: false
createBucketsHook:
resources: {}
# SFTP server configuration
# Note: Most parameters below default to null, which means they inherit from

83
weed/admin/dash/cluster_topology.go

@ -2,13 +2,20 @@ package dash
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
var dirStatusClient = &http.Client{
Timeout: 5 * time.Second,
}
// GetClusterTopology returns the current cluster topology with caching
func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) {
now := time.Now()
@ -35,8 +42,71 @@ func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) {
return topology, nil
}
// fetchPublicUrlMap queries the master's /dir/status HTTP endpoint and returns
// a map from data node ID (ip:port) to its PublicUrl.
func (s *AdminServer) fetchPublicUrlMap() map[string]string {
currentMaster := s.masterClient.GetMaster(context.Background())
if currentMaster == "" {
return nil
}
url := fmt.Sprintf("http://%s/dir/status", currentMaster.ToHttpAddress())
resp, err := dirStatusClient.Get(url)
if err != nil {
glog.V(1).Infof("Failed to fetch /dir/status from %s: %v", currentMaster, err)
return nil
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
glog.V(1).Infof("Non-OK response from /dir/status: %d", resp.StatusCode)
return nil
}
body, err := io.ReadAll(resp.Body)
if err != nil {
glog.V(1).Infof("Failed to read /dir/status response body: %v", err)
return nil
}
// Parse the JSON response to extract PublicUrl for each data node
var status struct {
Topology struct {
DataCenters []struct {
Racks []struct {
DataNodes []struct {
Url string `json:"Url"`
PublicUrl string `json:"PublicUrl"`
} `json:"DataNodes"`
} `json:"Racks"`
} `json:"DataCenters"`
} `json:"Topology"`
}
if err := json.Unmarshal(body, &status); err != nil {
glog.V(1).Infof("Failed to parse /dir/status response: %v", err)
return nil
}
publicUrls := make(map[string]string)
for _, dc := range status.Topology.DataCenters {
for _, rack := range dc.Racks {
for _, dn := range rack.DataNodes {
if dn.PublicUrl != "" {
publicUrls[dn.Url] = dn.PublicUrl
}
}
}
}
return publicUrls
}
// getTopologyViaGRPC gets topology using gRPC (original method)
func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error {
// Fetch public URL mapping from master HTTP API
// The gRPC DataNodeInfo does not include PublicUrl, so we supplement it.
publicUrls := s.fetchPublicUrlMap()
// Get cluster status from master
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
@ -85,12 +155,23 @@ func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error {
}
}
// Look up PublicUrl from master HTTP API
// Use node.Address (ip:port) as the key, matching the Url field in /dir/status
nodeAddr := node.Address
if nodeAddr == "" {
nodeAddr = node.Id
}
publicUrl := publicUrls[nodeAddr]
if publicUrl == "" {
publicUrl = nodeAddr
}
vs := VolumeServer{
ID: node.Id,
Address: node.Id,
DataCenter: dc.Id,
Rack: rack.Id,
PublicURL: node.Id,
PublicURL: publicUrl,
Volumes: int(totalVolumes),
MaxVolumes: int(totalMaxVolumes),
DiskUsage: totalSize,

13
weed/admin/dash/volume_management.go

@ -413,6 +413,9 @@ func (s *AdminServer) VacuumVolume(volumeID int, server string) error {
func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, error) {
var volumeServerMap map[string]*VolumeServer
// Fetch public URL mapping from master HTTP API
publicUrls := s.fetchPublicUrlMap()
// Make only ONE VolumeList call and use it for both topology building AND EC shard processing
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
@ -436,8 +439,18 @@ func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, erro
for _, node := range rack.DataNodeInfos {
// Initialize volume server if not exists
if volumeServerMap[node.Id] == nil {
// Look up PublicUrl from master HTTP API
nodeAddr := node.Address
if nodeAddr == "" {
nodeAddr = node.Id
}
publicUrl := publicUrls[nodeAddr]
if publicUrl == "" {
publicUrl = nodeAddr
}
volumeServerMap[node.Id] = &VolumeServer{
Address: node.Id,
PublicURL: publicUrl,
DataCenter: dc.Id,
Rack: rack.Id,
Volumes: 0,

14
weed/command/fuse_std.go

@ -12,6 +12,7 @@ import (
"syscall"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
@ -267,6 +268,19 @@ func runFuse(cmd *Command, args []string) bool {
fmt.Fprintf(os.Stderr, "failed to parse 'sys.novncache' value %q: %v\n", parameter.value, err)
return false
}
case "autofs":
if parsed, err := strconv.ParseBool(parameter.value); err == nil {
mountOptions.hasAutofs = &parsed
} else {
fmt.Fprintf(os.Stderr, "failed to parse 'autofs' value %q: %v\n", parameter.value, err)
return false
}
case "_netdev":
// _netdev is used for systemd/fstab parser to signify that this is a network mount but systemd
// mount sometimes can't strip them off. Meanwhile, fuse3 would refuse to run with _netdev, we
// strip them here if it fails to be stripped by the caller.
//(See https://github.com/seaweedfs/seaweedfs/wiki/fstab/948a70df5c0d9d2d27561b96de53bde07a29d2db)
glog.V(0).Infof("ignoring _netdev mount option")
default:
t := parameter.name
if parameter.value != "true" {

6
weed/command/mount.go

@ -58,6 +58,11 @@ type MountOptions struct {
// macOS-specific FUSE options
novncache *bool
// if true, we assume autofs exists over current mount point. Autofs (the kernel one, used by systemd automount)
// is expected to be mounted as a shim between auto-mounted fs and original mount point to provide auto mount.
// with this option, we ignore autofs mounted on the same point.
hasAutofs *bool
}
var (
@ -98,6 +103,7 @@ func init() {
mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging")
mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-<mount_dir_hash>.sock")
mountOptions.disableXAttr = cmdMount.Flag.Bool("disableXAttr", false, "disable xattr")
mountOptions.hasAutofs = cmdMount.Flag.Bool("autofs", false, "ignore autofs mounted on the same mountpoint (useful when systemd.automount and autofs is used)")
mountOptions.fuseCommandPid = 0
// Periodic metadata flush to protect against orphan chunk cleanup

2
weed/command/mount_darwin.go

@ -1,5 +1,5 @@
package command
func checkMountPointAvailable(dir string) bool {
func checkMountPointAvailable(dir string, skipAutofs bool) bool {
return true
}

2
weed/command/mount_freebsd.go

@ -1,5 +1,5 @@
package command
func checkMountPointAvailable(dir string) bool {
func checkMountPointAvailable(dir string, skipAutofs bool) bool {
return true
}

10
weed/command/mount_linux.go

@ -69,7 +69,7 @@ type Info struct {
// Mounted determines if a specified mountpoint has been mounted.
// On Linux it looks at /proc/self/mountinfo and on Solaris at mnttab.
func mounted(mountPoint string) (bool, error) {
func mounted(mountPoint string, skipAutofs bool) (bool, error) {
entries, err := parseMountTable()
if err != nil {
return false, err
@ -78,6 +78,10 @@ func mounted(mountPoint string) (bool, error) {
// Search the table for the mountPoint
for _, e := range entries {
if e.Mountpoint == mountPoint {
// Check if the mountpoint is autofs
if skipAutofs && e.Fstype == "autofs" {
continue
}
return true, nil
}
}
@ -137,13 +141,13 @@ func parseInfoFile(r io.Reader) ([]*Info, error) {
return out, nil
}
func checkMountPointAvailable(dir string) bool {
func checkMountPointAvailable(dir string, skipAutofs bool) bool {
mountPoint := dir
if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") {
mountPoint = mountPoint[0 : len(mountPoint)-1]
}
if mounted, err := mounted(mountPoint); err != nil || mounted {
if mounted, err := mounted(mountPoint, skipAutofs); err != nil || mounted {
if err != nil {
glog.Errorf("check %s: %v", mountPoint, err)
}

12
weed/command/mount_std.go

@ -223,13 +223,21 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
skipAutofs := option.hasAutofs != nil && *option.hasAutofs
if isValid := checkMountPointAvailable(dir, skipAutofs); !isValid {
glog.Fatalf("Target mount point is not available: %s, please check!", dir)
return true
}
serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+")
// When autofs/systemd-mount is used, FsName must be "fuse" so util-linux/mount can recognize
// it as a pseudo filesystem. Otherwise, preserve the descriptive name for mount/df output.
fsName := serverFriendlyName + ":" + filerMountRootPath
if skipAutofs {
fsName = "fuse"
}
// mount fuse
fuseMountOptions := &fuse.MountOptions{
AllowOther: *option.allowOthers,
@ -239,7 +247,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
MaxReadAhead: 1024 * 1024 * 2,
IgnoreSecurityLabels: false,
RememberInodes: false,
FsName: serverFriendlyName + ":" + filerMountRootPath,
FsName: fsName,
Name: "seaweedfs",
SingleThreaded: false,
DisableXAttrs: *option.disableXAttr,

20
weed/glog/glog.go

@ -811,11 +811,12 @@ func (l *loggingT) exit(err error) {
// file rotation. There are conflicting methods, so the file cannot be embedded.
// l.mu is held for all its methods.
type syncBuffer struct {
logger *loggingT
logger *loggingT
*bufio.Writer
file *os.File
sev severity
nbytes uint64 // The number of bytes written to this file
file *os.File
sev severity
nbytes uint64 // The number of bytes written to this file
createdAt time.Time // When the current log file was opened (used for time-based rotation)
}
func (sb *syncBuffer) Sync() error {
@ -830,8 +831,14 @@ func (sb *syncBuffer) Write(p []byte) (n int, err error) {
if sb.Writer == nil {
return 0, errors.New("log writer is nil")
}
if sb.nbytes+uint64(len(p)) >= MaxSize {
if err := sb.rotateFile(time.Now()); err != nil {
now := timeNow()
// Size-based rotation: rotate when the file would exceed MaxSize.
sizeRotation := sb.nbytes+uint64(len(p)) >= MaxSize
// Time-based rotation: rotate when the file is older than --log_rotate_hours.
h := LogRotateHours()
timeRotation := h > 0 && !sb.createdAt.IsZero() && now.Sub(sb.createdAt) >= time.Duration(h)*time.Hour
if sizeRotation || timeRotation {
if err := sb.rotateFile(now); err != nil {
sb.logger.exit(err)
return 0, err
}
@ -853,6 +860,7 @@ func (sb *syncBuffer) rotateFile(now time.Time) error {
var err error
sb.file, _, err = create(severityName[sb.sev], now)
sb.nbytes = 0
sb.createdAt = now
if err != nil {
return err
}

35
weed/glog/glog_file.go

@ -33,7 +33,11 @@ import (
)
// MaxSize is the maximum size of a log file in bytes.
// It is initialized from the --log_max_size_mb flag when the first log file is created.
var MaxSize uint64 = 1024 * 1024 * 1800
// MaxFileCount is the maximum number of log files retained per severity level.
// It is initialized from the --log_max_files flag when the first log file is created.
var MaxFileCount = 5
// logDirs lists the candidate directories for new log files.
@ -43,7 +47,32 @@ var logDirs []string
// See createLogDirs for the full list of possible destinations.
var logDir = flag.String("logdir", "", "If non-empty, write log files in this directory")
// logMaxSizeMB controls the maximum size of each log file in megabytes.
// When a log file reaches this size it is closed and a new file is created.
// Defaults to 1800 MB. Set to 0 to use the compiled-in default.
var logMaxSizeMB = flag.Uint64("log_max_size_mb", 1800, "Maximum size in megabytes of each log file before it is rotated (0 = use default of 1800 MB)")
// logMaxFiles controls how many log files are kept per severity level.
// Older files are deleted when the limit is exceeded.
// Defaults to 5.
var logMaxFiles = flag.Int("log_max_files", 5, "Maximum number of log files to keep per severity level before older ones are deleted (0 = use default of 5)")
// logRotateHours controls time-based log rotation.
// When non-zero, each log file is rotated after the given number of hours
// regardless of its size. This prevents log files from accumulating in
// long-running deployments even when log volume is low.
// The default is 168 hours (7 days). Set to 0 to disable time-based rotation.
var logRotateHours = flag.Int("log_rotate_hours", 168, "Rotate log files after this many hours (default: 168 = 7 days, 0 = disabled)")
func createLogDirs() {
// Apply flag values now that flags have been parsed.
if *logMaxSizeMB > 0 {
MaxSize = *logMaxSizeMB * 1024 * 1024
}
if *logMaxFiles > 0 {
MaxFileCount = *logMaxFiles
}
if *logDir != "" {
logDirs = append(logDirs, *logDir)
} else {
@ -51,6 +80,12 @@ func createLogDirs() {
}
}
// LogRotateHours returns the configured time-based rotation interval.
// This is used by syncBuffer to decide when to rotate open log files.
func LogRotateHours() int {
return *logRotateHours
}
var (
pid = os.Getpid()
program = filepath.Base(os.Args[0])

57
weed/glog/glog_test.go

@ -333,10 +333,13 @@ func TestRollover(t *testing.T) {
logExitFunc = func(e error) {
err = e
}
Info("x") // Be sure we have a file (also triggers createLogDirs via sync.Once).
// Set MaxSize after the first Info call so that createLogDirs (which
// overwrites MaxSize from the flag default) has already executed.
defer func(previous uint64) { MaxSize = previous }(MaxSize)
MaxSize = 512
Info("x") // Be sure we have a file.
info, ok := logging.file[infoLog].(*syncBuffer)
if !ok {
t.Fatal("info wasn't created")
@ -371,6 +374,56 @@ func TestRollover(t *testing.T) {
}
}
func TestTimeBasedRollover(t *testing.T) {
setFlags()
var err error
defer func(previous func(error)) { logExitFunc = previous }(logExitFunc)
logExitFunc = func(e error) {
err = e
}
// Disable size-based rotation by setting a very large MaxSize.
defer func(previous uint64) { MaxSize = previous }(MaxSize)
MaxSize = 1024 * 1024 * 1024
// Enable time-based rotation with a 1-hour interval.
defer func(previous int) { *logRotateHours = previous }(*logRotateHours)
*logRotateHours = 1
Info("x") // Create initial file.
info, ok := logging.file[infoLog].(*syncBuffer)
if !ok {
t.Fatal("info wasn't created")
}
if err != nil {
t.Fatalf("info has initial error: %v", err)
}
fname0 := info.file.Name()
createdAt := info.createdAt
// Mock time to 30 minutes after file creation — should NOT rotate.
defer func(previous func() time.Time) { timeNow = previous }(timeNow)
timeNow = func() time.Time { return createdAt.Add(30 * time.Minute) }
Info("still within interval")
if err != nil {
t.Fatalf("error after write within interval: %v", err)
}
if info.file.Name() != fname0 {
t.Error("file rotated before interval elapsed")
}
// Advance mock time past the 1-hour interval — should rotate.
timeNow = func() time.Time { return createdAt.Add(61 * time.Minute) }
Info("past interval")
if err != nil {
t.Fatalf("error after time-based rotation: %v", err)
}
fname1 := info.file.Name()
if fname0 == fname1 {
t.Error("file did not rotate after interval elapsed")
}
}
func TestLogBacktraceAt(t *testing.T) {
setFlags()
defer logging.swap(logging.newBuffers())

28
weed/s3api/s3api_object_handlers_put.go

@ -139,6 +139,22 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
fullDirPath = fullDirPath + "/" + dirName
}
// Read any content through dataReader (handles chunked encoding properly)
var dirContent []byte
if r.ContentLength != 0 {
var readErr error
dirContent, readErr = io.ReadAll(dataReader)
if readErr != nil {
glog.Errorf("PutObjectHandler: failed to read directory marker content %s/%s: %v", bucket, object, readErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
// Compute MD5 for ETag (md5.Sum of nil/empty = MD5 of empty content)
dirMd5 := md5.Sum(dirContent)
dirEtag := fmt.Sprintf("%x", dirMd5)
glog.Infof("PutObjectHandler: explicit directory marker %s/%s (contentType=%q, len=%d)",
bucket, object, objectContentType, r.ContentLength)
if err := s3a.mkdir(
@ -147,10 +163,17 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
if objectContentType == "" {
objectContentType = s3_constants.FolderMimeType
}
if r.ContentLength > 0 {
entry.Content, _ = io.ReadAll(r.Body)
if len(dirContent) > 0 {
entry.Content = dirContent
}
entry.Attributes.Mime = objectContentType
entry.Attributes.Md5 = dirMd5[:]
// Store ETag in extended attributes for consistency with regular objects
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtETagKey] = []byte(dirEtag)
// Set object owner for directory objects (same as regular objects)
s3a.setObjectOwnerFromRequest(r, bucket, entry)
@ -158,6 +181,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
setEtag(w, dirEtag)
} else {
// Get detailed versioning state for the bucket
versioningState, err := s3a.getVersioningState(bucket)

8
weed/server/master_ui/master.html

@ -88,9 +88,11 @@
<tr>
<td><code>{{ $dc.Id }}</code></td>
<td>{{ $rack.Id }}</td>
<td><a href="{{ url $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
{{ if ne $dn.PublicUrl $dn.Url }}
/ <a href="{{ url $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
<td>
{{ if and (ne $dn.PublicUrl "") (ne $dn.PublicUrl $dn.Url) }}
<a href="{{ url $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
{{ else }}
<a href="{{ url $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
{{ end }}
</td>
<td>{{ $dn.Volumes }}</td>

8
weed/server/master_ui/masterNewRaft.html

@ -101,9 +101,11 @@
<tr>
<td><code>{{ $dc.Id }}</code></td>
<td>{{ $rack.Id }}</td>
<td><a href="{{ url $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
{{ if ne $dn.PublicUrl $dn.Url }}
/ <a href="{{ url $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
<td>
{{ if and (ne $dn.PublicUrl "") (ne $dn.PublicUrl $dn.Url) }}
<a href="{{ url $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
{{ else }}
<a href="{{ url $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
{{ end }}
</td>
<td>{{ $dn.Volumes }}</td>

Loading…
Cancel
Save