@ -180,11 +180,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt . Errorf ( "oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d" , oldDatFileName , oldDatCompactRevision , newDatFileName , newDatCompactRevision )
}
idx_entry_b ytes := make ( [ ] byte , NeedleIdSize + OffsetSize + SizeSize )
for key , incre_idx_e ntry := range incrementedHasUpdatedIndexEntry {
NeedleIdToBytes ( idx_entry_b ytes [ 0 : NeedleIdSize ] , key )
OffsetToBytes ( idx_entry_b ytes [ NeedleIdSize : NeedleIdSize + OffsetSize ] , incre_idx_e ntry . offset )
util . Uint32toBytes ( idx_entry_b ytes [ NeedleIdSize + OffsetSize : NeedleIdSize + OffsetSize + SizeSize ] , incre_idx_e ntry . size )
idxEntryB ytes := make ( [ ] byte , NeedleIdSize + OffsetSize + SizeSize )
for key , increIdxE ntry := range incrementedHasUpdatedIndexEntry {
NeedleIdToBytes ( idxEntryB ytes [ 0 : NeedleIdSize ] , key )
OffsetToBytes ( idxEntryB ytes [ NeedleIdSize : NeedleIdSize + OffsetSize ] , increIdxE ntry . offset )
util . Uint32toBytes ( idxEntryB ytes [ NeedleIdSize + OffsetSize : NeedleIdSize + OffsetSize + SizeSize ] , increIdxE ntry . size )
var offset int64
if offset , err = dst . Seek ( 0 , 2 ) ; err != nil {
@ -201,16 +201,16 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
//updated needle
if incre_idx_e ntry . offset != 0 && incre_idx_e ntry . size != 0 && incre_idx_e ntry . size != TombstoneFileSize {
if increIdxE ntry . offset != 0 && increIdxE ntry . size != 0 && increIdxE ntry . size != TombstoneFileSize {
//even the needle cache in memory is hit, the need_bytes is correct
glog . V ( 4 ) . Infof ( "file %d offset %d size %d" , key , int64 ( incre_idx_e ntry . offset ) * NeedlePaddingSize , incre_idx_e ntry . size )
var needle_b ytes [ ] byte
needle_b ytes , err = ReadNeedleBlob ( oldDatFile , int64 ( incre_idx_e ntry . offset ) * NeedlePaddingSize , incre_idx_e ntry . size , v . Version ( ) )
glog . V ( 4 ) . Infof ( "file %d offset %d size %d" , key , int64 ( increIdxE ntry . offset ) * NeedlePaddingSize , increIdxE ntry . size )
var needleB ytes [ ] byte
needleB ytes , err = ReadNeedleBlob ( oldDatFile , int64 ( increIdxE ntry . offset ) * NeedlePaddingSize , increIdxE ntry . size , v . Version ( ) )
if err != nil {
return fmt . Errorf ( "ReadNeedleBlob %s key %d offset %d size %d failed: %v" , oldDatFile . Name ( ) , key , int64 ( incre_idx_e ntry . offset ) * NeedlePaddingSize , incre_idx_e ntry . size , err )
return fmt . Errorf ( "ReadNeedleBlob %s key %d offset %d size %d failed: %v" , oldDatFile . Name ( ) , key , int64 ( increIdxE ntry . offset ) * NeedlePaddingSize , increIdxE ntry . size , err )
}
dst . Write ( needle_b ytes )
util . Uint32toBytes ( idx_entry_b ytes [ 8 : 12 ] , uint32 ( offset / NeedlePaddingSize ) )
dst . Write ( needleB ytes )
util . Uint32toBytes ( idxEntryB ytes [ 8 : 12 ] , uint32 ( offset / NeedlePaddingSize ) )
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
fakeDelNeedle := new ( Needle )
@ -221,19 +221,59 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if err != nil {
return fmt . Errorf ( "append deleted %d failed: %v" , key , err )
}
util . Uint32toBytes ( idx_entry_b ytes [ 8 : 12 ] , uint32 ( 0 ) )
util . Uint32toBytes ( idxEntryB ytes [ 8 : 12 ] , uint32 ( 0 ) )
}
if _ , err := idx . Seek ( 0 , 2 ) ; err != nil {
return fmt . Errorf ( "cannot seek end of indexfile %s: %v" ,
newIdxFileName , err )
}
_ , err = idx . Write ( idx_entry_b ytes )
_ , err = idx . Write ( idxEntryB ytes )
}
return nil
}
type VolumeFileScanner4Vacuum struct {
version Version
v * Volume
dst * os . File
nm * NeedleMap
newOffset int64
now uint64
}
func ( scanner * VolumeFileScanner4Vacuum ) VisitSuperBlock ( superBlock SuperBlock ) error {
scanner . version = superBlock . Version ( )
superBlock . CompactRevision ++
_ , err := scanner . dst . Write ( superBlock . Bytes ( ) )
scanner . newOffset = int64 ( superBlock . BlockSize ( ) )
return err
}
func ( scanner * VolumeFileScanner4Vacuum ) ReadNeedleBody ( ) bool {
return true
}
func ( scanner * VolumeFileScanner4Vacuum ) VisitNeedle ( n * Needle , offset int64 ) error {
if n . HasTtl ( ) && scanner . now >= n . LastModified + uint64 ( scanner . v . Ttl . Minutes ( ) * 60 ) {
return nil
}
nv , ok := scanner . v . nm . Get ( n . Id )
glog . V ( 4 ) . Infoln ( "needle expected offset " , offset , "ok" , ok , "nv" , nv )
if ok && int64 ( nv . Offset ) * NeedlePaddingSize == offset && nv . Size > 0 {
if err := scanner . nm . Put ( n . Id , Offset ( scanner . newOffset / NeedlePaddingSize ) , n . Size ) ; err != nil {
return fmt . Errorf ( "cannot put needle: %s" , err )
}
if _ , _ , _ , err := n . Append ( scanner . dst , scanner . v . Version ( ) ) ; err != nil {
return fmt . Errorf ( "cannot append needle: %s" , err )
}
scanner . newOffset += n . DiskSize ( scanner . version )
glog . V ( 4 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , scanner . newOffset , "data_size" , n . Size )
}
return nil
}
func ( v * Volume ) copyDataAndGenerateIndexFile ( dstName , idxName string , preallocate int64 ) ( err error ) {
var (
dst , idx * os . File
@ -248,38 +288,13 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
}
defer idx . Close ( )
nm := NewBtreeNeedleMap ( idx )
new_offset := int64 ( 0 )
now := uint64 ( time . Now ( ) . Unix ( ) )
var version Version
err = ScanVolumeFile ( v . dir , v . Collection , v . Id , v . needleMapKind ,
func ( superBlock SuperBlock ) error {
version = superBlock . Version ( )
superBlock . CompactRevision ++
_ , err = dst . Write ( superBlock . Bytes ( ) )
new_offset = int64 ( superBlock . BlockSize ( ) )
return err
} , true , func ( n * Needle , offset int64 ) error {
if n . HasTtl ( ) && now >= n . LastModified + uint64 ( v . Ttl . Minutes ( ) * 60 ) {
return nil
}
nv , ok := v . nm . Get ( n . Id )
glog . V ( 4 ) . Infoln ( "needle expected offset " , offset , "ok" , ok , "nv" , nv )
if ok && int64 ( nv . Offset ) * NeedlePaddingSize == offset && nv . Size > 0 {
if err = nm . Put ( n . Id , Offset ( new_offset / NeedlePaddingSize ) , n . Size ) ; err != nil {
return fmt . Errorf ( "cannot put needle: %s" , err )
}
if _ , _ , _ , err := n . Append ( dst , v . Version ( ) ) ; err != nil {
return fmt . Errorf ( "cannot append needle: %s" , err )
}
new_offset += n . DiskSize ( version )
glog . V ( 4 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , new_offset , "data_size" , n . Size )
}
return nil
} )
scanner := & VolumeFileScanner4Vacuum {
v : v ,
now : uint64 ( time . Now ( ) . Unix ( ) ) ,
nm : NewBtreeNeedleMap ( idx ) ,
dst : dst ,
}
err = ScanVolumeFile ( v . dir , v . Collection , v . Id , v . needleMapKind , scanner )
return
}
@ -307,7 +322,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
v . SuperBlock . CompactRevision ++
dst . Write ( v . SuperBlock . Bytes ( ) )
new_o ffset := int64 ( v . SuperBlock . BlockSize ( ) )
newO ffset := int64 ( v . SuperBlock . BlockSize ( ) )
WalkIndexFile ( oldIndexFile , func ( key NeedleId , offset Offset , size uint32 ) error {
if offset == 0 || size == TombstoneFileSize {
@ -328,14 +343,14 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
glog . V ( 4 ) . Infoln ( "needle expected offset " , offset , "ok" , ok , "nv" , nv )
if nv . Offset == offset && nv . Size > 0 {
if err = nm . Put ( n . Id , Offset ( new_o ffset / NeedlePaddingSize ) , n . Size ) ; err != nil {
if err = nm . Put ( n . Id , Offset ( newO ffset / NeedlePaddingSize ) , n . Size ) ; err != nil {
return fmt . Errorf ( "cannot put needle: %s" , err )
}
if _ , _ , _ , err = n . Append ( dst , v . Version ( ) ) ; err != nil {
return fmt . Errorf ( "cannot append needle: %s" , err )
}
new_o ffset += n . DiskSize ( v . Version ( ) )
glog . V ( 3 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , new_o ffset , "data_size" , n . Size )
newO ffset += n . DiskSize ( v . Version ( ) )
glog . V ( 3 ) . Infoln ( "saving key" , n . Id , "volume offset" , offset , "=>" , newO ffset , "data_size" , n . Size )
}
return nil
} )