|
|
@ -3,10 +3,13 @@ package weed_server |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/operation" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/security" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/types" |
|
|
|
"sync" |
|
|
|
) |
|
|
|
|
|
|
|
func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) { |
|
|
@ -30,16 +33,48 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser |
|
|
|
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) |
|
|
|
} |
|
|
|
|
|
|
|
n := new(needle.Needle) |
|
|
|
n.Id = types.NeedleId(req.NeedleId) |
|
|
|
n.Cookie = types.Cookie(req.Cookie) |
|
|
|
n.Data, n.DataSize = data, uint32(len(data)) |
|
|
|
// copied from *Needle.prepareWriteBuffer()
|
|
|
|
n.Size = 4 + types.Size(n.DataSize) + 1 |
|
|
|
n.Checksum = needle.NewCRC(n.Data) |
|
|
|
if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil { |
|
|
|
return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err) |
|
|
|
var wg sync.WaitGroup |
|
|
|
wg.Add(1) |
|
|
|
go func() { |
|
|
|
defer wg.Done() |
|
|
|
n := new(needle.Needle) |
|
|
|
n.Id = types.NeedleId(req.NeedleId) |
|
|
|
n.Cookie = types.Cookie(req.Cookie) |
|
|
|
n.Data, n.DataSize = data, uint32(len(data)) |
|
|
|
// copied from *Needle.prepareWriteBuffer()
|
|
|
|
n.Size = 4 + types.Size(n.DataSize) + 1 |
|
|
|
n.Checksum = needle.NewCRC(n.Data) |
|
|
|
if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil { |
|
|
|
if err == nil { |
|
|
|
err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err) |
|
|
|
} |
|
|
|
} |
|
|
|
}() |
|
|
|
if len(req.Replicas)>0{ |
|
|
|
fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie) |
|
|
|
for _, replica := range req.Replicas { |
|
|
|
wg.Add(1) |
|
|
|
go func(targetVolumeServer string) { |
|
|
|
defer wg.Done() |
|
|
|
uploadOption := &operation.UploadOption{ |
|
|
|
UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()), |
|
|
|
Filename: "", |
|
|
|
Cipher: false, |
|
|
|
IsInputCompressed: false, |
|
|
|
MimeType: "", |
|
|
|
PairMap: nil, |
|
|
|
Jwt: security.EncodedJwt(req.Auth), |
|
|
|
} |
|
|
|
if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { |
|
|
|
if err == nil { |
|
|
|
err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err) |
|
|
|
} |
|
|
|
} |
|
|
|
}(replica.Url) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return resp, nil |
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
return resp, err |
|
|
|
} |