|
@ -2,6 +2,7 @@ package storage |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"errors" |
|
|
"errors" |
|
|
|
|
|
"fmt" |
|
|
"log" |
|
|
"log" |
|
|
"os" |
|
|
"os" |
|
|
"path" |
|
|
"path" |
|
@ -10,6 +11,7 @@ import ( |
|
|
|
|
|
|
|
|
const ( |
|
|
const ( |
|
|
SuperBlockSize = 8 |
|
|
SuperBlockSize = 8 |
|
|
|
|
|
Version = 1 |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type Volume struct { |
|
|
type Volume struct { |
|
@ -20,6 +22,8 @@ type Volume struct { |
|
|
|
|
|
|
|
|
replicaType ReplicationType |
|
|
replicaType ReplicationType |
|
|
|
|
|
|
|
|
|
|
|
version uint8 |
|
|
|
|
|
|
|
|
accessLock sync.Mutex |
|
|
accessLock sync.Mutex |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -28,23 +32,26 @@ func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v |
|
|
v.load() |
|
|
v.load() |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
func (v *Volume) load() { |
|
|
|
|
|
|
|
|
func (v *Volume) load() error { |
|
|
var e error |
|
|
var e error |
|
|
fileName := path.Join(v.dir, v.Id.String()) |
|
|
fileName := path.Join(v.dir, v.Id.String()) |
|
|
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) |
|
|
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) |
|
|
if e != nil { |
|
|
if e != nil { |
|
|
log.Fatalf("New Volume [ERROR] %s\n", e) |
|
|
|
|
|
|
|
|
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) |
|
|
} |
|
|
} |
|
|
if v.replicaType == CopyNil { |
|
|
if v.replicaType == CopyNil { |
|
|
v.readSuperBlock() |
|
|
|
|
|
|
|
|
if e = v.readSuperBlock(); e != nil { |
|
|
|
|
|
return e |
|
|
|
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
v.maybeWriteSuperBlock() |
|
|
v.maybeWriteSuperBlock() |
|
|
} |
|
|
} |
|
|
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) |
|
|
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) |
|
|
if ie != nil { |
|
|
if ie != nil { |
|
|
log.Fatalf("Write Volume Index [ERROR] %s\n", ie) |
|
|
|
|
|
|
|
|
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) |
|
|
} |
|
|
} |
|
|
v.nm = LoadNeedleMap(indexFile) |
|
|
v.nm = LoadNeedleMap(indexFile) |
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
func (v *Volume) Size() int64 { |
|
|
func (v *Volume) Size() int64 { |
|
|
stat, e := v.dataFile.Stat() |
|
|
stat, e := v.dataFile.Stat() |
|
@ -61,17 +68,23 @@ func (v *Volume) maybeWriteSuperBlock() { |
|
|
stat, _ := v.dataFile.Stat() |
|
|
stat, _ := v.dataFile.Stat() |
|
|
if stat.Size() == 0 { |
|
|
if stat.Size() == 0 { |
|
|
header := make([]byte, SuperBlockSize) |
|
|
header := make([]byte, SuperBlockSize) |
|
|
header[0] = 1 |
|
|
|
|
|
|
|
|
header[0] = Version |
|
|
header[1] = v.replicaType.Byte() |
|
|
header[1] = v.replicaType.Byte() |
|
|
v.dataFile.Write(header) |
|
|
v.dataFile.Write(header) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
func (v *Volume) readSuperBlock() { |
|
|
|
|
|
|
|
|
func (v *Volume) readSuperBlock() error { |
|
|
v.dataFile.Seek(0, 0) |
|
|
v.dataFile.Seek(0, 0) |
|
|
header := make([]byte, SuperBlockSize) |
|
|
header := make([]byte, SuperBlockSize) |
|
|
if _, error := v.dataFile.Read(header); error == nil { |
|
|
|
|
|
v.replicaType, _ = NewReplicationTypeFromByte(header[1]) |
|
|
|
|
|
|
|
|
if _, e := v.dataFile.Read(header); e != nil { |
|
|
|
|
|
return fmt.Errorf("cannot read superblock: %s", e) |
|
|
} |
|
|
} |
|
|
|
|
|
v.version = header[0] |
|
|
|
|
|
var err error |
|
|
|
|
|
if v.replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { |
|
|
|
|
|
return fmt.Errorf("cannot read replica type: %s", err) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
func (v *Volume) NeedToReplicate() bool { |
|
|
func (v *Volume) NeedToReplicate() bool { |
|
|
return v.replicaType.GetCopyCount() > 1 |
|
|
return v.replicaType.GetCopyCount() > 1 |
|
@ -123,9 +136,16 @@ func (v *Volume) commitCompact() (int, error) { |
|
|
v.accessLock.Lock() |
|
|
v.accessLock.Lock() |
|
|
defer v.accessLock.Unlock() |
|
|
defer v.accessLock.Unlock() |
|
|
v.dataFile.Close() |
|
|
v.dataFile.Close() |
|
|
os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")) |
|
|
|
|
|
os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")) |
|
|
|
|
|
v.load() |
|
|
|
|
|
|
|
|
var e error |
|
|
|
|
|
if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil { |
|
|
|
|
|
return 0, e |
|
|
|
|
|
} |
|
|
|
|
|
if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil { |
|
|
|
|
|
return 0, e |
|
|
|
|
|
} |
|
|
|
|
|
if e = v.load(); e != nil { |
|
|
|
|
|
return 0, e |
|
|
|
|
|
} |
|
|
return 0, nil |
|
|
return 0, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|