diff --git a/test/s3/iam/iam_config.json b/test/s3/iam/iam_config.json index 0f77c8ded..1dcc7390c 100644 --- a/test/s3/iam/iam_config.json +++ b/test/s3/iam/iam_config.json @@ -217,7 +217,29 @@ { "Effect": "Allow", "Action": [ - "s3:*" + "s3:CreateBucket", + "s3:DeleteBucket", + "s3:PutObject", + "s3:PutObjectAcl", + "s3:DeleteObject", + "s3:DeleteObjectVersion", + "s3:InitiateMultipartUpload", + "s3:UploadPart", + "s3:CompleteMultipartUpload", + "s3:AbortMultipartUpload", + "s3:ListMultipartUploadParts" + ], + "Resource": [ + "arn:seaweed:s3:::*", + "arn:seaweed:s3:::*/*" + ] + }, + { + "Effect": "Deny", + "Action": [ + "s3:GetObject", + "s3:ListObjects", + "s3:ListBucket" ], "Resource": [ "arn:seaweed:s3:::*", diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index f7af4e0a5..2a71c6e23 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -184,11 +184,22 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum //find main datacenter and other data centers rp := option.ReplicaPlacement + // Track tentative reservations to make the process atomic + var tentativeReservation *VolumeGrowReservation + // Select appropriate functions based on useReservations flag var availableSpaceFunc func(Node, *VolumeGrowOption) int64 var reserveOneVolumeFunc func(Node, int64, *VolumeGrowOption) (*DataNode, error) if useReservations { + // Initialize tentative reservation tracking + tentativeReservation = &VolumeGrowReservation{ + servers: make([]*DataNode, 0), + reservationIds: make([]string, 0), + diskType: option.DiskType, + } + + // For reservations, we make actual reservations during node selection availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 { return node.AvailableSpaceForReservation(option) } @@ -206,8 +217,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum // Ensure cleanup of partial reservations on error defer func() { - if err != nil && reservation != nil { - reservation.releaseAllReservations() + if err != nil && tentativeReservation != nil { + tentativeReservation.releaseAllReservations() } }() mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error { @@ -273,7 +284,21 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } - if availableSpaceFunc(node, option) < 1 { + + if useReservations { + // For reservations, atomically check and reserve capacity + if node.IsDataNode() { + reservationId, success := node.TryReserveCapacity(option.DiskType, 1) + if !success { + return fmt.Errorf("Cannot reserve capacity on node %s", node.Id()) + } + // Track the reservation for later cleanup if needed + tentativeReservation.servers = append(tentativeReservation.servers, node.(*DataNode)) + tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId) + } else if availableSpaceFunc(node, option) < 1 { + return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1) + } + } else if availableSpaceFunc(node, option) < 1 { return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1) } return nil @@ -290,6 +315,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum r := rand.Int64N(availableSpaceFunc(rack, option)) if server, e := reserveOneVolumeFunc(rack, r, option); e == nil { servers = append(servers, server) + + // If using reservations, also make a reservation on the selected server + if useReservations { + reservationId, success := server.TryReserveCapacity(option.DiskType, 1) + if !success { + return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other rack", server.Id()) + } + tentativeReservation.servers = append(tentativeReservation.servers, server) + tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId) + } } else { return servers, nil, e } @@ -298,28 +333,24 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum r := rand.Int64N(availableSpaceFunc(datacenter, option)) if server, e := reserveOneVolumeFunc(datacenter, r, option); e == nil { servers = append(servers, server) + + // If using reservations, also make a reservation on the selected server + if useReservations { + reservationId, success := server.TryReserveCapacity(option.DiskType, 1) + if !success { + return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other datacenter", server.Id()) + } + tentativeReservation.servers = append(tentativeReservation.servers, server) + tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId) + } } else { return servers, nil, e } } - // If reservations are requested, try to reserve capacity on each server - if useReservations { - reservation = &VolumeGrowReservation{ - servers: servers, - reservationIds: make([]string, len(servers)), - diskType: option.DiskType, - } - - // Try to reserve capacity on each server - for i, server := range servers { - reservationId, success := server.TryReserveCapacity(option.DiskType, 1) - if !success { - return servers, nil, fmt.Errorf("failed to reserve capacity on server %s", server.Id()) - } - reservation.reservationIds[i] = reservationId - } - + // If reservations were made, return the tentative reservation + if useReservations && tentativeReservation != nil { + reservation = tentativeReservation glog.V(1).Infof("Successfully reserved capacity on %d servers for volume creation", len(servers)) }