@ -21,8 +21,9 @@ func (v *Volume) Compact() error {
//glog.V(3).Infof("Got Compaction lock...")
filePath := v . FileName ( )
v . lastCompactingIndexOffset = v . nm . IndexFileSize ( )
glog . V ( 3 ) . Infof ( "creating copies for volume %d ,last offset %d..." , v . Id , v . lastCompactingIndexOffset )
v . lastCompactIndexOffset = v . nm . IndexFileSize ( )
v . lastCompactRevision = v . SuperBlock . CompactRevision
glog . V ( 3 ) . Infof ( "creating copies for volume %d ,last offset %d..." , v . Id , v . lastCompactIndexOffset )
return v . copyDataAndGenerateIndexFile ( filePath + ".cpd" , filePath + ".cpx" )
}
@ -71,6 +72,21 @@ func (v *Volume) commitCompact() error {
return nil
}
func fetchCompactRevisionFromDatFile ( file * os . File ) ( compactRevision uint16 , err error ) {
if _ , err = file . Seek ( 0 , 0 ) ; err != nil {
return 0 , fmt . Errorf ( "cannot seek to the beginning of %s: %v" , file . Name ( ) , err )
}
header := make ( [ ] byte , SuperBlockSize )
if _ , e := file . Read ( header ) ; e != nil {
return 0 , fmt . Errorf ( "cannot read file %s 's super block: %v" , file . Name ( ) , e )
}
superBlock , err := ParseSuperBlock ( header )
if err != nil {
return 0 , err
}
return superBlock . CompactRevision , nil
}
func ( v * Volume ) makeupDiff ( newDatFileName , newIdxFileName , oldDatFileName , oldIdxFileName string ) ( err error ) {
var indexSize int64
@ -83,56 +99,67 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if indexSize , err = verifyIndexFileIntegrity ( oldIdxFile ) ; err != nil {
return fmt . Errorf ( "verifyIndexFileIntegrity %s failed: %v" , oldIdxFileName , err )
}
if indexSize == 0 || uint64 ( indexSize ) <= v . lastCompacting IndexOffset {
if indexSize == 0 || uint64 ( indexSize ) <= v . lastCompactIndexOffset {
return nil
}
v . incrementedHasUpdatedIndexEntry = make ( map [ uint64 ] keyField )
for idx_offset := indexSize ; uint64 ( idx_offset ) >= v . lastCompactingIndexOffset ; idx_offset -= NeedleIndexSize {
oldDatCompactRevision , err := fetchCompactRevisionFromDatFile ( oldDatFile )
if err != nil {
return
}
if oldDatCompactRevision != v . lastCompactRevision {
return fmt . Errorf ( "current old dat file's compact revision %d is not the expected one %d" , oldDatCompactRevision , v . lastCompactRevision )
}
type keyField struct {
offset uint32
size uint32
}
incrementedHasUpdatedIndexEntry := make ( map [ uint64 ] keyField )
for idx_offset := indexSize - NeedleIndexSize ; uint64 ( idx_offset ) >= v . lastCompactIndexOffset ; idx_offset -= NeedleIndexSize {
var IdxEntry [ ] byte
if IdxEntry , err = readIndexEntryAtOffset ( oldIdxFile , idx_offset ) ; err != nil {
return fmt . Errorf ( "readIndexEntry %s at offset %d failed: %v" , oldIdxFileName , idx_offset , err )
}
key , offset , size := idxFileEntry ( IdxEntry )
if _ , found := v . incrementedHasUpdatedIndexEntry [ key ] ; ! found {
v . incrementedHasUpdatedIndexEntry [ key ] = keyField {
if _ , found := incrementedHasUpdatedIndexEntry [ key ] ; ! found {
incrementedHasUpdatedIndexEntry [ key ] = keyField {
offset : offset ,
size : size ,
}
} else {
continue
}
}
if len ( v . incrementedHasUpdatedIndexEntry ) > 0 {
if len ( incrementedHasUpdatedIndexEntry ) > 0 {
var (
dst , idx * os . File
)
if dst , err = os . OpenFile ( newDatFileName , os . O_WRONLY , 0644 ) ; err != nil {
if dst , err = os . OpenFile ( newDatFileName , os . O_RD WR , 0644 ) ; err != nil {
return
}
defer dst . Close ( )
if idx , err = os . OpenFile ( newIdxFileName , os . O_WRONLY , 0644 ) ; err != nil {
if idx , err = os . OpenFile ( newIdxFileName , os . O_RD WR , 0644 ) ; err != nil {
return
}
defer idx . Close ( )
var newDatCompactRevision uint16
newDatCompactRevision , err = fetchCompactRevisionFromDatFile ( dst )
if err != nil {
return
}
if oldDatCompactRevision + 1 != newDatCompactRevision {
return fmt . Errorf ( "oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d" , oldDatFileName , oldDatCompactRevision , newDatFileName , newDatCompactRevision )
}
idx_entry_bytes := make ( [ ] byte , 16 )
for key , incre_idx_entry := range v . incrementedHasUpdatedIndexEntry {
for key , incre_idx_entry := range incrementedHasUpdatedIndexEntry {
util . Uint64toBytes ( idx_entry_bytes [ 0 : 8 ] , key )
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , incre_idx_entry . offset )
util . Uint32toBytes ( idx_entry_bytes [ 12 : 16 ] , incre_idx_entry . size )
if _ , err := idx . Seek ( 0 , 2 ) ; err != nil {
return fmt . Errorf ( "cannot seek end of indexfile %s: %v" ,
newIdxFileName , err )
}
_ , err = idx . Write ( idx_entry_bytes )
//even the needle cache in memory is hit, the need_bytes is correct
needle_bytes , _ , _ := ReadNeedleBlob ( dst , int64 ( incre_idx_entry . offset ) * NeedlePaddingSize , incre_idx_entry . size )
var offset int64
if offset , err = dst . Seek ( 0 , 2 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to seek the end of file: %v" , err )
@ -146,7 +173,34 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return
}
}
dst . Write ( needle_bytes )
//updated needle
if incre_idx_entry . offset != 0 && incre_idx_entry . size != 0 {
//even the needle cache in memory is hit, the need_bytes is correct
var needle_bytes [ ] byte
needle_bytes , _ , err = ReadNeedleBlob ( oldDatFile , int64 ( incre_idx_entry . offset ) * NeedlePaddingSize , incre_idx_entry . size )
if err != nil {
return
}
dst . Write ( needle_bytes )
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , uint32 ( offset / NeedlePaddingSize ) )
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
fakeDelNeedle := new ( Needle )
fakeDelNeedle . Id = key
fakeDelNeedle . Cookie = 0x12345678
_ , err = fakeDelNeedle . Append ( dst , v . Version ( ) )
if err != nil {
return
}
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , uint32 ( 0 ) )
}
if _ , err := idx . Seek ( 0 , 2 ) ; err != nil {
return fmt . Errorf ( "cannot seek end of indexfile %s: %v" ,
newIdxFileName , err )
}
_ , err = idx . Write ( idx_entry_bytes )
}
}