diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 66ce24871..ccd14712e 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -76,6 +77,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi MimeType: "", PairMap: nil, Jwt: assignResult.Auth, + HandlerCounter: stats.FilerHandlerCounter, } uploadResult, err := operation.UploadData(data, uploadOption) if err != nil { diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index a1df07d7e..39407b1f7 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/security" @@ -34,6 +35,7 @@ type UploadOption struct { RetryForever bool Md5 string BytesBuffer *bytes.Buffer + HandlerCounter *prometheus.CounterVec } type UploadResult struct { @@ -170,6 +172,9 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR return } glog.Warningf("uploading %d to %s: %v", i, option.UploadUrl, err) + if option.HandlerCounter != nil { + option.HandlerCounter.WithLabelValues(stats.RepeatErrorUploadData).Inc() + } } return } @@ -263,7 +268,6 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult PairMap: option.PairMap, Jwt: option.Jwt, Md5: option.Md5, - BytesBuffer: option.BytesBuffer, }) if uploadResult == nil { return diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 64254b3b8..ade3bd797 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/remote_storage" "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" "sync" @@ -69,6 +70,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser MimeType: "", PairMap: nil, Jwt: security.EncodedJwt(req.Auth), + HandlerCounter: stats.VolumeServerHandlerCounter, } if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { if err == nil { diff --git a/weed/stats/metrics_names.go b/weed/stats/metrics_names.go index c0a6e99be..ae8842c06 100644 --- a/weed/stats/metrics_names.go +++ b/weed/stats/metrics_names.go @@ -14,9 +14,12 @@ const ( ErrorGetInternal = "errorGetInternal" // master topology - ErrorWriteToLocalDisk = "errorWriteToLocalDisk" - ErrorUnmarshalPairs = "errorUnmarshalPairs" - ErrorWriteToReplicas = "errorWriteToReplicas" + ErrorWriteToLocalDisk = "errorWriteToLocalDisk" + ErrorWriteVolumeNotFound = "errorWriteVolumeNotFound" + ErrorWriteReplicasNotFound = "errorWriteReplicasNotFound" + + ErrorUnmarshalPairs = "errorUnmarshalPairs" + ErrorWriteToReplicas = "errorWriteToReplicas" // master client FailedToKeepConnected = "failedToKeepConnected" @@ -40,6 +43,7 @@ const ( ErrorReadInternal = "read.internal.error" ErrorWriteEntry = "write.entry.failed" RepeatErrorUploadContent = "upload.content.repeat.failed" + RepeatErrorUploadData = "upload.data.repeat.failed" ErrorChunkAssign = "chunkAssign.failed" ErrorReadCache = "read.cache.failed" ErrorReadStream = "read.stream.failed" diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 82c2db79c..ccd916a1f 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "net/http" "net/url" @@ -23,6 +24,11 @@ import ( ) func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) { + if s.GetVolume(volumeId) == nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteVolumeNotFound).Inc() + err = fmt.Errorf("volume %d not found", volumeId) + return true, err + } //check JWT jwt := security.GetJwt(r) @@ -34,7 +40,8 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt remoteLocations, err = GetWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) - return + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteReplicasNotFound).Inc() + return true, err } } @@ -55,69 +62,84 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt return } } - + group := errgroup.Group{} if len(remoteLocations) > 0 { //send to other replica locations - start := time.Now() - err = DistributedOperation(remoteLocations, func(location operation.Location) error { - u := url.URL{ - Scheme: "http", - Host: location.Url, - Path: r.URL.Path, - } - q := url.Values{ - "type": {"replicate"}, - "ttl": {n.Ttl.String()}, - } - if n.LastModified > 0 { - q.Set("ts", strconv.FormatUint(n.LastModified, 10)) - } - if n.IsChunkedManifest() { - q.Set("cm", "true") - } - u.RawQuery = q.Encode() - - pairMap := make(map[string]string) - if n.HasPairs() { - tmpMap := make(map[string]string) - err := json.Unmarshal(n.Pairs, &tmpMap) - if err != nil { - stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc() - glog.V(0).Infoln("Unmarshal pairs error:", err) + group.Go(func() error { + startWriteToReplicas := time.Now() + defer stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(startWriteToReplicas).Seconds()) + return DistributedOperation(remoteLocations, func(location operation.Location) error { + u := url.URL{ + Scheme: "http", + Host: location.Url, + Path: r.URL.Path, } - for k, v := range tmpMap { - pairMap[needle.PairNamePrefix+k] = v + q := url.Values{ + "type": {"replicate"}, + "ttl": {n.Ttl.String()}, + } + if n.LastModified > 0 { + q.Set("ts", strconv.FormatUint(n.LastModified, 10)) + } + if n.IsChunkedManifest() { + q.Set("cm", "true") + } + u.RawQuery = q.Encode() + + pairMap := make(map[string]string) + if n.HasPairs() { + tmpMap := make(map[string]string) + jsonErr := json.Unmarshal(n.Pairs, &tmpMap) + if jsonErr != nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc() + glog.V(0).Infoln("Unmarshal pairs error:", jsonErr) + } + for k, v := range tmpMap { + pairMap[needle.PairNamePrefix+k] = v + } } - } - bytesBuffer := buffer_pool.SyncPoolGetBuffer() - defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) - - // volume server do not know about encryption - // TODO optimize here to compress data only once - uploadOption := &operation.UploadOption{ - UploadUrl: u.String(), - Filename: string(n.Name), - Cipher: false, - IsInputCompressed: n.IsCompressed(), - MimeType: string(n.Mime), - PairMap: pairMap, - Jwt: jwt, - Md5: contentMd5, - BytesBuffer: bytesBuffer, - } - _, err := operation.UploadData(n.Data, uploadOption) - if err != nil { - glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) - } - return err + bytesBuffer := buffer_pool.SyncPoolGetBuffer() + defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) + + // volume server do not know about encryption + // TODO optimize here to compress data only once + uploadOption := &operation.UploadOption{ + UploadUrl: u.String(), + Filename: string(n.Name), + Cipher: false, + IsInputCompressed: n.IsCompressed(), + MimeType: string(n.Mime), + PairMap: pairMap, + Jwt: jwt, + Md5: contentMd5, + BytesBuffer: bytesBuffer, + HandlerCounter: stats.VolumeServerHandlerCounter, + } + + _, errUload := operation.UploadData(n.Data, uploadOption) + if errUload != nil { + glog.Errorf("replication-UploadData, err:%v, url:%s", errUload, u.String()) + } + return errUload + }) }) - stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds()) - if err != nil { - stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc() - err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) - glog.V(0).Infoln(err) - return false, err - } + } + + start := time.Now() + isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) + stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds()) + if err != nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc() + err = fmt.Errorf("failed to write to local disk: %v", err) + glog.Errorln(err) + return true, err + } + + if err = group.Wait(); err != nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc() + err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) + glog.Errorln(err) + return } return }