From c359a5117f9400a08b18f01daaaea13b09e3f235 Mon Sep 17 00:00:00 2001 From: chrislusf Date: Wed, 30 Mar 2016 12:34:11 -0700 Subject: [PATCH] add lock to dir children map fix https://github.com/chrislusf/seaweedfs/issues/273 --- go/filer/embedded_filer/directory_in_map.go | 60 +++++++++++++++------ 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/go/filer/embedded_filer/directory_in_map.go b/go/filer/embedded_filer/directory_in_map.go index a1679f93e..b142639ab 100644 --- a/go/filer/embedded_filer/directory_in_map.go +++ b/go/filer/embedded_filer/directory_in_map.go @@ -17,12 +17,43 @@ import ( var writeLock sync.Mutex //serialize changes to dir.log type DirectoryEntryInMap struct { + sync.Mutex Name string Parent *DirectoryEntryInMap - SubDirectories map[string]*DirectoryEntryInMap + subDirectories map[string]*DirectoryEntryInMap Id filer.DirectoryId } +func (de *DirectoryEntryInMap) getChild(dirName string) (*DirectoryEntryInMap, bool) { + de.Lock() + defer de.Unlock() + child, ok := de.subDirectories[dirName] + return child, ok +} +func (de *DirectoryEntryInMap) addChild(dirName string, child *DirectoryEntryInMap) { + de.Lock() + defer de.Unlock() + de.subDirectories[dirName] = child +} +func (de *DirectoryEntryInMap) removeChild(dirName string) { + de.Lock() + defer de.Unlock() + delete(de.subDirectories, dirName) +} +func (de *DirectoryEntryInMap) hasChildren() bool { + de.Lock() + defer de.Unlock() + return len(de.subDirectories) > 0 +} +func (de *DirectoryEntryInMap) children() (dirNames []filer.DirectoryEntry) { + de.Lock() + defer de.Unlock() + for k, v := range de.subDirectories { + dirNames = append(dirNames, filer.DirectoryEntry{Name: k, Id: v.Id}) + } + return dirNames +} + type DirectoryManagerInMap struct { Root *DirectoryEntryInMap max filer.DirectoryId @@ -33,7 +64,7 @@ type DirectoryManagerInMap struct { func (dm *DirectoryManagerInMap) NewDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap, err error) { writeLock.Lock() defer writeLock.Unlock() - d = &DirectoryEntryInMap{Name: name, Parent: parent, SubDirectories: make(map[string]*DirectoryEntryInMap)} + d = &DirectoryEntryInMap{Name: name, Parent: parent, subDirectories: make(map[string]*DirectoryEntryInMap)} var parts []string for p := d; p != nil && p.Name != ""; p = p.Parent { parts = append(parts, p.Name) @@ -60,7 +91,7 @@ func (dm *DirectoryManagerInMap) log(words ...string) { func NewDirectoryManagerInMap(dirLogFile string) (dm *DirectoryManagerInMap, err error) { dm = &DirectoryManagerInMap{} //dm.Root do not use NewDirectoryEntryInMap, since dm.max will be changed - dm.Root = &DirectoryEntryInMap{SubDirectories: make(map[string]*DirectoryEntryInMap)} + dm.Root = &DirectoryEntryInMap{subDirectories: make(map[string]*DirectoryEntryInMap)} if dm.logFile, err = os.OpenFile(dirLogFile, os.O_RDWR|os.O_CREATE, 0644); err != nil { return nil, fmt.Errorf("cannot write directory log file %s: %v", dirLogFile, err) } @@ -135,7 +166,7 @@ func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryI parts := strings.Split(dirPath, "/") dir := dm.Root for i := 1; i < len(parts); i++ { - if sub, ok := dir.SubDirectories[parts[i]]; ok { + if sub, ok := dir.getChild(parts[i]); ok { dir = sub } else { return dm.Root, fmt.Errorf("Directory %s Not Found", dirPath) @@ -159,7 +190,7 @@ func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId filer.Direc parts := strings.Split(dirPath, "/") dir := dm.Root for i := 1; i < len(parts); i++ { - sub, ok := dir.SubDirectories[parts[i]] + sub, ok := dir.getChild(parts[i]) if !ok { if i != len(parts)-1 { return fmt.Errorf("%s should be created after parent %s", dirPath, parts[i]) @@ -172,7 +203,7 @@ func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId filer.Direc if sub.Id != dirId { return fmt.Errorf("%s should be have id %v instead of %v", dirPath, sub.Id, dirId) } - dir.SubDirectories[parts[i]] = sub + dir.addChild(parts[i], sub) } dir = sub } @@ -187,14 +218,14 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn parts := strings.Split(dirPath, "/") dir = dm.Root for i := 1; i < len(parts); i++ { - sub, ok := dir.SubDirectories[parts[i]] + sub, ok := dir.getChild(parts[i]) if !ok { var err error sub, err = dm.NewDirectoryEntryInMap(dir, parts[i]) if err != nil { return nil, false } - dir.SubDirectories[parts[i]] = sub + dir.addChild(parts[i], sub) created = true } dir = sub @@ -219,11 +250,11 @@ func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParent return pe } dm.log("mov", oldDirPath, newParentDirPath, newName) - delete(oldDir.Parent.SubDirectories, oldDir.Name) + oldDir.Parent.removeChild(oldDir.Name) if newName == "" { newName = oldDir.Name } - parentDir.SubDirectories[newName] = oldDir + parentDir.addChild(newName, oldDir) oldDir.Name = newName oldDir.Parent = parentDir return nil @@ -234,10 +265,7 @@ func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []fil if e != nil { return dirNames, e } - for k, v := range d.SubDirectories { - dirNames = append(dirNames, filer.DirectoryEntry{Name: k, Id: v.Id}) - } - return dirNames, nil + return d.children(), nil } func (dm *DirectoryManagerInMap) DeleteDirectory(dirPath string) error { writeLock.Lock() @@ -249,10 +277,10 @@ func (dm *DirectoryManagerInMap) DeleteDirectory(dirPath string) error { if e != nil { return e } - if len(d.SubDirectories) != 0 { + if d.hasChildren() { return fmt.Errorf("dir %s still has sub directories", dirPath) } - delete(d.Parent.SubDirectories, d.Name) + d.Parent.removeChild(d.Name) d.Parent = nil dm.log("del", dirPath) return nil