From e63317fb08fd74a0728fe9463a3d80e2cbda2473 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 20 Jun 2019 00:55:30 -0700 Subject: [PATCH] ec deletion code complete, not tested yet --- weed/server/volume_server_handlers_write.go | 15 ++- weed/storage/store_ec_delete.go | 105 ++++++++++++++++++++ 2 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 weed/storage/store_ec_delete.go diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 09b4521fb..38fd970d7 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "errors" "fmt" "net/http" @@ -89,6 +90,14 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { cookie := n.Cookie + ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId) + + if hasEcVolume { + count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie); + writeDeleteResult(err, count, w, r) + return + } + _, ok := vs.store.ReadVolumeNeedle(volumeId, n) if ok != nil { m := make(map[string]uint32) @@ -129,6 +138,11 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { _, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r) + writeDeleteResult(err, count, w, r) + +} + +func writeDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Request) { if err == nil { m := make(map[string]int64) m["size"] = count @@ -136,7 +150,6 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } else { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err)) } - } func setEtag(w http.ResponseWriter, etag string) { diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go new file mode 100644 index 000000000..e027d2887 --- /dev/null +++ b/weed/storage/store_ec_delete.go @@ -0,0 +1,105 @@ +package storage + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { + + count, err := s.ReadEcShardNeedle(ctx, ecVolume.VolumeId, n) + + if err != nil { + return 0, err + } + + if cookie != n.Cookie { + return 0, fmt.Errorf("unexpected cookie %x", cookie) + } + + if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx, ecVolume, n.Id); err != nil { + return 0, err + } + + return int64(count), nil + +} + +func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { + + _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version) + + if len(intervals) == 0 { + return erasure_coding.NotFoundError + } + + shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) + + hasDeletionSuccess := false + err = s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId) + if err == nil { + hasDeletionSuccess = true + } + + for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ { + if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId); parityDeletionError == nil { + hasDeletionSuccess = true + } + } + + if hasDeletionSuccess { + return nil + } + + return err + +} + +func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { + + ecVolume.ShardLocationsLock.RLock() + sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId] + ecVolume.ShardLocationsLock.RUnlock() + + if !hasShardLocations { + return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId) + } + + for _, sourceDataNode := range sourceDataNodes { + glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode) + err := s.doDeleteNeedleFromRemoteEcShard(ctx, sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId) + if err != nil { + return err + } + glog.V(1).Infof("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err) + } + + return nil + +} + +func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { + + return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + // copy data slice + _, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ + VolumeId: uint32(vid), + Collection: collection, + FileKey: uint64(needleId), + Version: uint32(version), + }) + if err != nil { + return fmt.Errorf("failed to delete from ec shard %d on %s: %v", vid, sourceDataNode, err) + } + return nil + }) + +}