From 784daea1fa99d2e626e39824563b957703f98e70 Mon Sep 17 00:00:00 2001
From: chrislu <chris.lu@gmail.com>
Date: Sun, 19 Mar 2023 18:30:13 -0700
Subject: [PATCH] fix volume not found if marked as read only

fix https://github.com/seaweedfs/seaweedfs/issues/4088
---
 weed/server/master_grpc_server_volume.go | 11 +++++++++--
 weed/topology/volume_layout.go           | 25 ++++++++++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 77154972b..06dbb4bb5 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -309,12 +309,19 @@ func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.V
 	replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(req.ReplicaPlacement))
 	vl := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, needle.LoadTTLFromUint32(req.Ttl), types.ToDiskType(req.DiskType))
 	dataNodes := ms.Topo.Lookup(req.Collection, needle.VolumeId(req.VolumeId))
+	if req.IsReadonly {
+		for _, dn := range dataNodes {
+			if dn.Ip == req.Ip && dn.Port == int(req.Port) {
+
+			}
+		}
+	}
 	for _, dn := range dataNodes {
 		if dn.Ip == req.Ip && dn.Port == int(req.Port) {
 			if req.IsReadonly {
-				vl.SetVolumeUnavailable(dn, needle.VolumeId(req.VolumeId))
+				vl.SetVolumeReadOnly(dn, needle.VolumeId(req.VolumeId))
 			} else {
-				vl.SetVolumeAvailable(dn, needle.VolumeId(req.VolumeId), false)
+				vl.SetVolumeWritable(dn, needle.VolumeId(req.VolumeId))
 			}
 		}
 	}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 377122cae..d1bc28ef8 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -395,6 +395,31 @@ func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool {
 	return true
 }
 
+func (vl *VolumeLayout) SetVolumeReadOnly(dn *DataNode, vid needle.VolumeId) bool {
+	vl.accessLock.Lock()
+	defer vl.accessLock.Unlock()
+
+	if _, ok := vl.vid2location[vid]; ok {
+		vl.readonlyVolumes.Add(vid, dn)
+		return vl.removeFromWritable(vid)
+	}
+	return true
+}
+
+func (vl *VolumeLayout) SetVolumeWritable(dn *DataNode, vid needle.VolumeId) bool {
+	vl.accessLock.Lock()
+	defer vl.accessLock.Unlock()
+
+	if _, ok := vl.vid2location[vid]; ok {
+		vl.readonlyVolumes.Remove(vid, dn)
+	}
+
+	if vl.enoughCopies(vid) {
+		return vl.setVolumeWritable(vid)
+	}
+	return false
+}
+
 func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool {
 	vl.accessLock.Lock()
 	defer vl.accessLock.Unlock()