Browse Source

fix(volume): to avoid duplicate write a same needle (#6138)

fix WriteNeedleBlob to avoid duplicate write a same needle

Co-authored-by: 邓书东 <shudong_deng@hhnb2024010108.intsig.com>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
pull/6142/head
dsd 2 months ago
committed by GitHub
parent
commit
1e13b6879c
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 16
      weed/storage/volume_write.go

16
weed/storage/volume_write.go

@ -4,11 +4,12 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"os"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
. "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types"
"os"
) )
var ErrorNotFound = errors.New("not found") var ErrorNotFound = errors.New("not found")
@ -323,6 +324,19 @@ func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size
return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
} }
nv, ok := v.nm.Get(needleId)
if ok && nv.Size == size {
oldNeedle := new(needle.Needle)
err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version())
if err == nil {
newNeedle := new(needle.Needle)
err = newNeedle.ReadBytes(needleBlob, nv.Offset.ToActualOffset(), size, v.Version())
if err == nil && oldNeedle.Cookie == newNeedle.Cookie && oldNeedle.Checksum == newNeedle.Checksum && bytes.Equal(oldNeedle.Data, newNeedle.Data) {
glog.V(0).Infof("needle %v already exists", needleId)
return nil
}
}
}
appendAtNs := needle.GetAppendAtNs(v.lastAppendAtNs) appendAtNs := needle.GetAppendAtNs(v.lastAppendAtNs)
offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version()) offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())

Loading…
Cancel
Save