Browse Source

Merge branch 'master' into mq

pull/5890/head
chrislu 9 months ago
parent
commit
4093115ca9
  1. 4
      k8s/charts/seaweedfs/templates/filer-statefulset.yaml
  2. 4
      k8s/charts/seaweedfs/templates/master-statefulset.yaml
  3. 6
      weed/mount/inode_to_path.go
  4. 23
      weed/mount/weedfs.go
  5. 6
      weed/mount/weedfs_file_sync.go
  6. 7
      weed/server/master_grpc_server.go

4
k8s/charts/seaweedfs/templates/filer-statefulset.yaml

@ -129,7 +129,7 @@ spec:
- "-ec" - "-ec"
- | - |
exec /usr/bin/weed \ exec /usr/bin/weed \
{{- if or (eq .Values.filer.logs.type "hostPath") (eq .Values.filer.logs.type "emptyDir") }}
{{- if or (eq .Values.filer.logs.type "hostPath") (eq .Values.filer.logs.type "persistentVolumeClaim") (eq .Values.filer.logs.type "emptyDir") }}
-logdir=/logs \ -logdir=/logs \
{{- else }} {{- else }}
-logtostderr=true \ -logtostderr=true \
@ -197,7 +197,7 @@ spec:
{{- end }} {{- end }}
-master={{ if .Values.global.masterServer }}{{.Values.global.masterServer}}{{ else }}{{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master.{{ $.Release.Namespace }}:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}{{ end }} -master={{ if .Values.global.masterServer }}{{.Values.global.masterServer}}{{ else }}{{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master.{{ $.Release.Namespace }}:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}{{ end }}
volumeMounts: volumeMounts:
{{- if or (eq .Values.filer.logs.type "hostPath") (eq .Values.filer.logs.type "emptyDir") }}
{{- if (or (eq .Values.filer.logs.type "hostPath") (eq .Values.filer.logs.type "persistentVolumeClaim") (eq .Values.filer.logs.type "emptyDir")) }}
- name: seaweedfs-filer-log-volume - name: seaweedfs-filer-log-volume
mountPath: "/logs/" mountPath: "/logs/"
{{- end }} {{- end }}

4
k8s/charts/seaweedfs/templates/master-statefulset.yaml

@ -110,7 +110,7 @@ spec:
- "-ec" - "-ec"
- | - |
exec /usr/bin/weed \ exec /usr/bin/weed \
{{- if or (eq .Values.master.logs.type "hostPath") (eq .Values.master.logs.type "emptyDir") }}
{{- if or (eq .Values.master.logs.type "hostPath") (eq .Values.master.logs.type "persistentVolumeClaim") (eq .Values.master.logs.type "emptyDir") }}
-logdir=/logs \ -logdir=/logs \
{{- else }} {{- else }}
-logtostderr=true \ -logtostderr=true \
@ -158,7 +158,7 @@ spec:
volumeMounts: volumeMounts:
- name : data-{{ .Release.Namespace }} - name : data-{{ .Release.Namespace }}
mountPath: /data mountPath: /data
{{- if or (eq .Values.master.logs.type "hostPath") (eq .Values.master.logs.type "emptyDir") }}
{{- if or (eq .Values.master.logs.type "hostPath") (eq .Values.master.logs.type "persistentVolumeClaim") (eq .Values.master.logs.type "emptyDir") }}
- name: seaweedfs-master-log-volume - name: seaweedfs-master-log-volume
mountPath: "/logs/" mountPath: "/logs/"
{{- end }} {{- end }}

6
weed/mount/inode_to_path.go

@ -114,9 +114,9 @@ func (i *InodeToPath) AllocateInode(path util.FullPath, unixTime int64) uint64 {
return inode return inode
} }
func (i *InodeToPath) GetInode(path util.FullPath) uint64 {
func (i *InodeToPath) GetInode(path util.FullPath) (uint64, bool) {
if path == "/" { if path == "/" {
return 1
return 1, true
} }
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
@ -125,7 +125,7 @@ func (i *InodeToPath) GetInode(path util.FullPath) uint64 {
// glog.Fatalf("GetInode unknown inode for %s", path) // glog.Fatalf("GetInode unknown inode for %s", path)
// this could be the parent for mount point // this could be the parent for mount point
} }
return inode
return inode, found
} }
func (i *InodeToPath) GetPath(inode uint64) (util.FullPath, fuse.Status) { func (i *InodeToPath) GetPath(inode uint64) (util.FullPath, fuse.Status) {

23
weed/mount/weedfs.go

@ -105,6 +105,29 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}, func(path util.FullPath) bool { }, func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path) return wfs.inodeToPath.IsChildrenCached(path)
}, func(filePath util.FullPath, entry *filer_pb.Entry) { }, func(filePath util.FullPath, entry *filer_pb.Entry) {
// Find inode if it is not a deleted path
if inode, inode_found := wfs.inodeToPath.GetInode(filePath); inode_found {
// Find open file handle
if fh, fh_found := wfs.fhmap.FindFileHandle(inode); fh_found {
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
// Recreate dirty pages
fh.dirtyPages.Destroy()
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
// Update handle entry
newentry, status := wfs.maybeLoadEntry(filePath)
if status == fuse.OK {
if fh.GetEntry() != newentry {
fh.SetEntry(newentry)
}
}
}
}
}) })
grace.OnInterrupt(func() { grace.OnInterrupt(func() {
wfs.metaCache.Shutdown() wfs.metaCache.Shutdown()

6
weed/mount/weedfs_file_sync.go

@ -104,9 +104,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
} }
} }
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("doFlush", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
if !fh.dirtyMetadata { if !fh.dirtyMetadata {
return fuse.OK return fuse.OK
} }
@ -115,6 +112,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
return fuse.Status(syscall.ENOSPC) return fuse.Status(syscall.ENOSPC)
} }
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("doFlush", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
fh.entryLock.Lock() fh.entryLock.Lock()
defer fh.entryLock.Unlock() defer fh.entryLock.Unlock()

7
weed/server/master_grpc_server.go

@ -361,8 +361,7 @@ func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress p
// the KeepConnected loop is no longer listening on this channel but we're // the KeepConnected loop is no longer listening on this channel but we're
// trying to send to it in SendHeartbeat and so we can't lock the // trying to send to it in SendHeartbeat and so we can't lock the
// clientChansLock to remove the channel and we're stuck writing to it // clientChansLock to remove the channel and we're stuck writing to it
// 100 is probably overkill
messageChan = make(chan *master_pb.KeepConnectedResponse, 100)
messageChan = make(chan *master_pb.KeepConnectedResponse, 10000)
ms.clientChansLock.Lock() ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan ms.clientChans[clientName] = messageChan
@ -374,8 +373,10 @@ func (ms *MasterServer) deleteClient(clientName string) {
glog.V(0).Infof("- client %v", clientName) glog.V(0).Infof("- client %v", clientName)
ms.clientChansLock.Lock() ms.clientChansLock.Lock()
// close message chan, so that the KeepConnected go routine can exit // close message chan, so that the KeepConnected go routine can exit
close(ms.clientChans[clientName])
if clientChan, ok := ms.clientChans[clientName]; ok {
close(clientChan)
delete(ms.clientChans, clientName) delete(ms.clientChans, clientName)
}
ms.clientChansLock.Unlock() ms.clientChansLock.Unlock()
} }

Loading…
Cancel
Save