@ -119,7 +119,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
oldDatCompactRevision , err := fetchCompactRevisionFromDatFile ( oldDatFile )
if err != nil {
return
return fmt . Errorf ( "fetchCompactRevisionFromDatFile src %s failed: %v" , oldDatFile . Name ( ) , err )
}
if oldDatCompactRevision != v . lastCompactRevision {
return fmt . Errorf ( "current old dat file's compact revision %d is not the expected one %d" , oldDatCompactRevision , v . lastCompactRevision )
@ -137,6 +137,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt . Errorf ( "readIndexEntry %s at offset %d failed: %v" , oldIdxFileName , idx_offset , err )
}
key , offset , size := idxFileEntry ( IdxEntry )
glog . V ( 0 ) . Infof ( "key %d offset %d size %d" , key , offset , size )
if _ , found := incrementedHasUpdatedIndexEntry [ key ] ; ! found {
incrementedHasUpdatedIndexEntry [ key ] = keyField {
offset : offset ,
@ -145,77 +146,82 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
}
if len ( incrementedHasUpdatedIndexEntry ) > 0 {
var (
dst , idx * os . File
)
if dst , err = os . OpenFile ( newDatFileName , os . O_RDWR , 0644 ) ; err != nil {
return
}
defer dst . Close ( )
// no updates during commit step
if len ( incrementedHasUpdatedIndexEntry ) == 0 {
return nil
}
if idx , err = os . OpenFile ( newIdxFileName , os . O_RDWR , 0644 ) ; err != nil {
return
}
defer idx . Close ( )
// deal with updates during commit step
var (
dst , idx * os . File
)
if dst , err = os . OpenFile ( newDatFileName , os . O_RDWR , 0644 ) ; err != nil {
return fmt . Errorf ( "open dat file %s failed: %v" , newDatFileName , err )
}
defer dst . Close ( )
var newDatCompactRevision uint16
newDatCompactRevision , err = fetchCompactRevisionFromDatFile ( dst )
if err != nil {
return
}
if oldDatCompactRevision + 1 != newDatCompactRevision {
return fmt . Errorf ( "oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d" , oldDatFileName , oldDatCompactRevision , newDatFileName , newDatCompactRevision )
}
if idx , err = os . OpenFile ( newIdxFileName , os . O_RDWR , 0644 ) ; err != nil {
return fmt . Errorf ( "open idx file %s failed: %v" , newIdxFileName , err )
}
defer idx . Close ( )
var newDatCompactRevision uint16
newDatCompactRevision , err = fetchCompactRevisionFromDatFile ( dst )
if err != nil {
return fmt . Errorf ( "fetchCompactRevisionFromDatFile dst %s failed: %v" , dst . Name ( ) , err )
}
if oldDatCompactRevision + 1 != newDatCompactRevision {
return fmt . Errorf ( "oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d" , oldDatFileName , oldDatCompactRevision , newDatFileName , newDatCompactRevision )
}
idx_entry_bytes := make ( [ ] byte , 16 )
for key , incre_idx_entry := range incrementedHasUpdatedIndexEntry {
util . Uint64toBytes ( idx_entry_bytes [ 0 : 8 ] , key )
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , incre_idx_entry . offset )
util . Uint32toBytes ( idx_entry_bytes [ 12 : 16 ] , incre_idx_entry . size )
idx_entry_bytes := make ( [ ] byte , 16 )
for key , incre_idx_entry := range incrementedHasUpdatedIndexEntry {
util . Uint64toBytes ( idx_entry_bytes [ 0 : 8 ] , key )
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , incre_idx_entry . offset )
util . Uint32toBytes ( idx_entry_bytes [ 12 : 16 ] , incre_idx_entry . size )
var offset int64
if offset , err = dst . Seek ( 0 , 2 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to seek the end of file: %v" , err )
var offset int64
if offset , err = dst . Seek ( 0 , 2 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to seek the end of file: %v" , err )
return
}
//ensure file writing starting from aligned positions
if offset % NeedlePaddingSize != 0 {
offset = offset + ( NeedlePaddingSize - offset % NeedlePaddingSize )
if offset , err = v . dataFile . Seek ( offset , 0 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to align in datafile %s: %v" , v . dataFile . Name ( ) , err )
return
}
//ensure file writing starting from aligned positions
if offset % NeedlePaddingSize != 0 {
offset = offset + ( NeedlePaddingSize - offset % NeedlePaddingSize )
if offset , err = v . dataFile . Seek ( offset , 0 ) ; err != nil {
glog . V ( 0 ) . Infof ( "failed to align in datafile %s: %v" , v . dataFile . Name ( ) , err )
return
}
}
}
//updated needle
if incre_idx_entry . offset != 0 && incre_idx_entry . size != 0 {
//even the needle cache in memory is hit, the need_bytes is correct
var needle_bytes [ ] byte
needle_bytes , err = ReadNeedleBlob ( oldDatFile , int64 ( incre_idx_entry . offset ) * NeedlePaddingSize , incre_idx_entry . size )
if err != nil {
return
}
dst . Write ( needle_bytes )
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , uint32 ( offset / NeedlePaddingSize ) )
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
fakeDelNeedle := new ( Needle )
fakeDelNeedle . Id = key
fakeDelNeedle . Cookie = 0x12345678
_ , _ , err = fakeDelNeedle . Append ( dst , v . Version ( ) )
if err != nil {
return
}
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , uint32 ( 0 ) )
//updated needle
if incre_idx_entry . offset != 0 && incre_idx_entry . size != 0 && incre_idx_entry . size != TombstoneFileSize {
//even the needle cache in memory is hit, the need_bytes is correct
glog . V ( 0 ) . Infof ( "file %d offset %d size %d" , key , int64 ( incre_idx_entry . offset ) * NeedlePaddingSize , incre_idx_entry . size )
var needle_bytes [ ] byte
needle_bytes , err = ReadNeedleBlob ( oldDatFile , int64 ( incre_idx_entry . offset ) * NeedlePaddingSize , incre_idx_entry . size )
if err != nil {
return fmt . Errorf ( "ReadNeedleBlob %s key %d offset %d size %d failed: %v" , oldDatFile . Name ( ) , key , int64 ( incre_idx_entry . offset ) * NeedlePaddingSize , incre_idx_entry . size , err )
}
if _ , err := idx . Seek ( 0 , 2 ) ; err != nil {
return fmt . Errorf ( "cannot seek end of indexfile %s: %v" ,
newIdxFileName , err )
dst . Write ( needle_bytes )
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , uint32 ( offset / NeedlePaddingSize ) )
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
fakeDelNeedle := new ( Needle )
fakeDelNeedle . Id = key
fakeDelNeedle . Cookie = 0x12345678
_ , _ , err = fakeDelNeedle . Append ( dst , v . Version ( ) )
if err != nil {
return fmt . Errorf ( "append deleted %d failed: %v" , key , err )
}
_ , err = idx . Write ( idx_entry_bytes )
util . Uint32toBytes ( idx_entry_bytes [ 8 : 12 ] , uint32 ( 0 ) )
}
if _ , err := idx . Seek ( 0 , 2 ) ; err != nil {
return fmt . Errorf ( "cannot seek end of indexfile %s: %v" ,
newIdxFileName , err )
}
_ , err = idx . Write ( idx_entry_bytes )
}
return nil