Browse Source

chore: simultaneously write to the replica and disk

pull/5494/head
Konstantin Lebedev 1 year ago
parent
commit
229d876842
  1. 2
      weed/filer/filer_notify_append.go
  2. 6
      weed/operation/upload_content.go
  3. 2
      weed/server/volume_grpc_remote.go
  4. 10
      weed/stats/metrics_names.go
  5. 140
      weed/topology/store_replicate.go

2
weed/filer/filer_notify_append.go

@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@ -76,6 +77,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
MimeType: "",
PairMap: nil,
Jwt: assignResult.Auth,
HandlerCounter: stats.FilerHandlerCounter,
}
uploadResult, err := operation.UploadData(data, uploadOption)
if err != nil {

6
weed/operation/upload_content.go

@ -16,6 +16,7 @@ import (
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
@ -34,6 +35,7 @@ type UploadOption struct {
RetryForever bool
Md5 string
BytesBuffer *bytes.Buffer
HandlerCounter *prometheus.CounterVec
}
type UploadResult struct {
@ -170,6 +172,9 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR
return
}
glog.Warningf("uploading %d to %s: %v", i, option.UploadUrl, err)
if option.HandlerCounter != nil {
option.HandlerCounter.WithLabelValues(stats.RepeatErrorUploadData).Inc()
}
}
return
}
@ -263,7 +268,6 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
PairMap: option.PairMap,
Jwt: option.Jwt,
Md5: option.Md5,
BytesBuffer: option.BytesBuffer,
})
if uploadResult == nil {
return

2
weed/server/volume_grpc_remote.go

@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"sync"
@ -69,6 +70,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
MimeType: "",
PairMap: nil,
Jwt: security.EncodedJwt(req.Auth),
HandlerCounter: stats.VolumeServerHandlerCounter,
}
if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
if err == nil {

10
weed/stats/metrics_names.go

@ -14,9 +14,12 @@ const (
ErrorGetInternal = "errorGetInternal"
// master topology
ErrorWriteToLocalDisk = "errorWriteToLocalDisk"
ErrorUnmarshalPairs = "errorUnmarshalPairs"
ErrorWriteToReplicas = "errorWriteToReplicas"
ErrorWriteToLocalDisk = "errorWriteToLocalDisk"
ErrorWriteVolumeNotFound = "errorWriteVolumeNotFound"
ErrorWriteReplicasNotFound = "errorWriteReplicasNotFound"
ErrorUnmarshalPairs = "errorUnmarshalPairs"
ErrorWriteToReplicas = "errorWriteToReplicas"
// master client
FailedToKeepConnected = "failedToKeepConnected"
@ -40,6 +43,7 @@ const (
ErrorReadInternal = "read.internal.error"
ErrorWriteEntry = "write.entry.failed"
RepeatErrorUploadContent = "upload.content.repeat.failed"
RepeatErrorUploadData = "upload.data.repeat.failed"
ErrorChunkAssign = "chunkAssign.failed"
ErrorReadCache = "read.cache.failed"
ErrorReadStream = "read.stream.failed"

140
weed/topology/store_replicate.go

@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"net/http"
"net/url"
@ -23,6 +24,11 @@ import (
)
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
if s.GetVolume(volumeId) == nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteVolumeNotFound).Inc()
err = fmt.Errorf("volume %d not found", volumeId)
return true, err
}
//check JWT
jwt := security.GetJwt(r)
@ -34,7 +40,8 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
remoteLocations, err = GetWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn)
if err != nil {
glog.V(0).Infoln(err)
return
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteReplicasNotFound).Inc()
return true, err
}
}
@ -55,69 +62,84 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
return
}
}
group := errgroup.Group{}
if len(remoteLocations) > 0 { //send to other replica locations
start := time.Now()
err = DistributedOperation(remoteLocations, func(location operation.Location) error {
u := url.URL{
Scheme: "http",
Host: location.Url,
Path: r.URL.Path,
}
q := url.Values{
"type": {"replicate"},
"ttl": {n.Ttl.String()},
}
if n.LastModified > 0 {
q.Set("ts", strconv.FormatUint(n.LastModified, 10))
}
if n.IsChunkedManifest() {
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
pairMap := make(map[string]string)
if n.HasPairs() {
tmpMap := make(map[string]string)
err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc()
glog.V(0).Infoln("Unmarshal pairs error:", err)
group.Go(func() error {
startWriteToReplicas := time.Now()
defer stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(startWriteToReplicas).Seconds())
return DistributedOperation(remoteLocations, func(location operation.Location) error {
u := url.URL{
Scheme: "http",
Host: location.Url,
Path: r.URL.Path,
}
for k, v := range tmpMap {
pairMap[needle.PairNamePrefix+k] = v
q := url.Values{
"type": {"replicate"},
"ttl": {n.Ttl.String()},
}
if n.LastModified > 0 {
q.Set("ts", strconv.FormatUint(n.LastModified, 10))
}
if n.IsChunkedManifest() {
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
pairMap := make(map[string]string)
if n.HasPairs() {
tmpMap := make(map[string]string)
jsonErr := json.Unmarshal(n.Pairs, &tmpMap)
if jsonErr != nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc()
glog.V(0).Infoln("Unmarshal pairs error:", jsonErr)
}
for k, v := range tmpMap {
pairMap[needle.PairNamePrefix+k] = v
}
}
}
bytesBuffer := buffer_pool.SyncPoolGetBuffer()
defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
// volume server do not know about encryption
// TODO optimize here to compress data only once
uploadOption := &operation.UploadOption{
UploadUrl: u.String(),
Filename: string(n.Name),
Cipher: false,
IsInputCompressed: n.IsCompressed(),
MimeType: string(n.Mime),
PairMap: pairMap,
Jwt: jwt,
Md5: contentMd5,
BytesBuffer: bytesBuffer,
}
_, err := operation.UploadData(n.Data, uploadOption)
if err != nil {
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
}
return err
bytesBuffer := buffer_pool.SyncPoolGetBuffer()
defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
// volume server do not know about encryption
// TODO optimize here to compress data only once
uploadOption := &operation.UploadOption{
UploadUrl: u.String(),
Filename: string(n.Name),
Cipher: false,
IsInputCompressed: n.IsCompressed(),
MimeType: string(n.Mime),
PairMap: pairMap,
Jwt: jwt,
Md5: contentMd5,
BytesBuffer: bytesBuffer,
HandlerCounter: stats.VolumeServerHandlerCounter,
}
_, errUload := operation.UploadData(n.Data, uploadOption)
if errUload != nil {
glog.Errorf("replication-UploadData, err:%v, url:%s", errUload, u.String())
}
return errUload
})
})
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds())
if err != nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc()
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err)
return false, err
}
}
start := time.Now()
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds())
if err != nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc()
err = fmt.Errorf("failed to write to local disk: %v", err)
glog.Errorln(err)
return true, err
}
if err = group.Wait(); err != nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc()
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.Errorln(err)
return
}
return
}

Loading…
Cancel
Save