You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
288 lines
7.9 KiB
288 lines
7.9 KiB
package weed_server
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/blockapi"
|
|
)
|
|
|
|
// blockTestServer creates a MasterServer + httptest.Server with block routes registered.
|
|
func blockTestServer(t *testing.T) (*MasterServer, *httptest.Server) {
|
|
t.Helper()
|
|
ms := &MasterServer{
|
|
blockRegistry: NewBlockVolumeRegistry(),
|
|
blockAssignmentQueue: NewBlockAssignmentQueue(),
|
|
}
|
|
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
|
|
return &blockAllocResult{
|
|
Path: fmt.Sprintf("/data/%s.blk", name),
|
|
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
|
|
ISCSIAddr: server + ":3260",
|
|
}, nil
|
|
}
|
|
ms.blockVSDelete = func(ctx context.Context, server string, name string) error {
|
|
return nil
|
|
}
|
|
ms.blockRegistry.MarkBlockCapable("vs1:9333")
|
|
|
|
r := mux.NewRouter()
|
|
r.HandleFunc("/block/volume", ms.blockVolumeCreateHandler).Methods("POST")
|
|
r.HandleFunc("/block/volume/{name}", ms.blockVolumeDeleteHandler).Methods("DELETE")
|
|
r.HandleFunc("/block/volume/{name}", ms.blockVolumeLookupHandler).Methods("GET")
|
|
r.HandleFunc("/block/volumes", ms.blockVolumeListHandler).Methods("GET")
|
|
r.HandleFunc("/block/assign", ms.blockAssignHandler).Methods("POST")
|
|
r.HandleFunc("/block/servers", ms.blockServersHandler).Methods("GET")
|
|
|
|
ts := httptest.NewServer(r)
|
|
t.Cleanup(ts.Close)
|
|
return ms, ts
|
|
}
|
|
|
|
func TestBlockVolumeCreateHandler(t *testing.T) {
|
|
_, ts := blockTestServer(t)
|
|
|
|
body, _ := json.Marshal(blockapi.CreateVolumeRequest{
|
|
Name: "vol1",
|
|
SizeBytes: 1 << 30,
|
|
})
|
|
resp, err := http.Post(ts.URL+"/block/volume", "application/json", bytes.NewReader(body))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
var info blockapi.VolumeInfo
|
|
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if info.Name != "vol1" {
|
|
t.Errorf("expected name vol1, got %s", info.Name)
|
|
}
|
|
if info.VolumeServer == "" {
|
|
t.Error("expected non-empty volume server")
|
|
}
|
|
}
|
|
|
|
func TestBlockVolumeListHandler(t *testing.T) {
|
|
ms, ts := blockTestServer(t)
|
|
|
|
// Create two volumes via gRPC.
|
|
for _, name := range []string{"alpha", "beta"} {
|
|
ms.blockRegistry.Register(&BlockVolumeEntry{
|
|
Name: name,
|
|
VolumeServer: "vs1:9333",
|
|
SizeBytes: 1 << 20,
|
|
Status: StatusActive,
|
|
})
|
|
}
|
|
|
|
resp, err := http.Get(ts.URL + "/block/volumes")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
var infos []blockapi.VolumeInfo
|
|
if err := json.NewDecoder(resp.Body).Decode(&infos); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(infos) != 2 {
|
|
t.Fatalf("expected 2 volumes, got %d", len(infos))
|
|
}
|
|
if infos[0].Name != "alpha" || infos[1].Name != "beta" {
|
|
t.Errorf("expected sorted [alpha, beta], got [%s, %s]", infos[0].Name, infos[1].Name)
|
|
}
|
|
}
|
|
|
|
func TestBlockVolumeLookupHandler(t *testing.T) {
|
|
ms, ts := blockTestServer(t)
|
|
|
|
ms.blockRegistry.Register(&BlockVolumeEntry{
|
|
Name: "vol1",
|
|
VolumeServer: "vs1:9333",
|
|
SizeBytes: 1 << 30,
|
|
Epoch: 3,
|
|
Role: 1, // RolePrimary
|
|
Status: StatusActive,
|
|
ReplicaServer: "vs2:9333",
|
|
})
|
|
|
|
resp, err := http.Get(ts.URL + "/block/volume/vol1")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
var info blockapi.VolumeInfo
|
|
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if info.Epoch != 3 {
|
|
t.Errorf("expected epoch 3, got %d", info.Epoch)
|
|
}
|
|
if info.Role != "primary" {
|
|
t.Errorf("expected role primary, got %s", info.Role)
|
|
}
|
|
if info.ReplicaServer != "vs2:9333" {
|
|
t.Errorf("expected replica vs2:9333, got %s", info.ReplicaServer)
|
|
}
|
|
|
|
// Not found case.
|
|
resp2, err := http.Get(ts.URL + "/block/volume/nonexistent")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp2.Body.Close()
|
|
if resp2.StatusCode != http.StatusNotFound {
|
|
t.Fatalf("expected 404, got %d", resp2.StatusCode)
|
|
}
|
|
}
|
|
|
|
func TestBlockVolumeDeleteHandler(t *testing.T) {
|
|
ms, ts := blockTestServer(t)
|
|
|
|
ms.blockRegistry.Register(&BlockVolumeEntry{
|
|
Name: "vol1",
|
|
VolumeServer: "vs1:9333",
|
|
SizeBytes: 1 << 30,
|
|
Status: StatusActive,
|
|
})
|
|
|
|
req, _ := http.NewRequest("DELETE", ts.URL+"/block/volume/vol1", nil)
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
// Verify it's gone.
|
|
if _, ok := ms.blockRegistry.Lookup("vol1"); ok {
|
|
t.Error("expected vol1 to be unregistered")
|
|
}
|
|
}
|
|
|
|
func TestBlockAssignHandler(t *testing.T) {
|
|
ms, ts := blockTestServer(t)
|
|
|
|
ms.blockRegistry.Register(&BlockVolumeEntry{
|
|
Name: "vol1",
|
|
VolumeServer: "vs1:9333",
|
|
Path: "/data/vol1.blk",
|
|
SizeBytes: 1 << 30,
|
|
Status: StatusActive,
|
|
})
|
|
|
|
body, _ := json.Marshal(blockapi.AssignRequest{
|
|
Name: "vol1",
|
|
Epoch: 5,
|
|
Role: "primary",
|
|
LeaseTTLMs: 30000,
|
|
})
|
|
resp, err := http.Post(ts.URL+"/block/assign", "application/json", bytes.NewReader(body))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
// Verify assignment was queued.
|
|
pending := ms.blockAssignmentQueue.Peek("vs1:9333")
|
|
if len(pending) != 1 {
|
|
t.Fatalf("expected 1 pending assignment, got %d", len(pending))
|
|
}
|
|
if pending[0].Epoch != 5 {
|
|
t.Errorf("expected epoch 5, got %d", pending[0].Epoch)
|
|
}
|
|
|
|
// Not-found case.
|
|
body2, _ := json.Marshal(blockapi.AssignRequest{Name: "missing", Epoch: 1, Role: "primary"})
|
|
resp2, err := http.Post(ts.URL+"/block/assign", "application/json", bytes.NewReader(body2))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp2.Body.Close()
|
|
if resp2.StatusCode != http.StatusNotFound {
|
|
t.Fatalf("expected 404, got %d", resp2.StatusCode)
|
|
}
|
|
}
|
|
|
|
func TestBlockServersHandler(t *testing.T) {
|
|
_, ts := blockTestServer(t)
|
|
|
|
resp, err := http.Get(ts.URL + "/block/servers")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
var servers []blockapi.ServerInfo
|
|
if err := json.NewDecoder(resp.Body).Decode(&servers); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(servers) != 1 {
|
|
t.Fatalf("expected 1 server, got %d", len(servers))
|
|
}
|
|
if servers[0].Address != "vs1:9333" {
|
|
t.Errorf("expected vs1:9333, got %s", servers[0].Address)
|
|
}
|
|
}
|
|
|
|
func TestListAll(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.Register(&BlockVolumeEntry{Name: "charlie", VolumeServer: "vs1:9333"})
|
|
r.Register(&BlockVolumeEntry{Name: "alpha", VolumeServer: "vs1:9333"})
|
|
r.Register(&BlockVolumeEntry{Name: "bravo", VolumeServer: "vs1:9333"})
|
|
|
|
entries := r.ListAll()
|
|
if len(entries) != 3 {
|
|
t.Fatalf("expected 3 entries, got %d", len(entries))
|
|
}
|
|
if entries[0].Name != "alpha" || entries[1].Name != "bravo" || entries[2].Name != "charlie" {
|
|
t.Errorf("expected sorted order, got %s %s %s", entries[0].Name, entries[1].Name, entries[2].Name)
|
|
}
|
|
}
|
|
|
|
func TestServerSummaries(t *testing.T) {
|
|
r := NewBlockVolumeRegistry()
|
|
r.MarkBlockCapable("vs1:9333")
|
|
r.MarkBlockCapable("vs2:9333")
|
|
r.Register(&BlockVolumeEntry{Name: "vol1", VolumeServer: "vs1:9333"})
|
|
r.Register(&BlockVolumeEntry{Name: "vol2", VolumeServer: "vs1:9333"})
|
|
r.Register(&BlockVolumeEntry{Name: "vol3", VolumeServer: "vs2:9333"})
|
|
|
|
summaries := r.ServerSummaries()
|
|
if len(summaries) != 2 {
|
|
t.Fatalf("expected 2 summaries, got %d", len(summaries))
|
|
}
|
|
// Sorted by address.
|
|
if summaries[0].Address != "vs1:9333" || summaries[0].VolumeCount != 2 {
|
|
t.Errorf("vs1: got %+v", summaries[0])
|
|
}
|
|
if summaries[1].Address != "vs2:9333" || summaries[1].VolumeCount != 1 {
|
|
t.Errorf("vs2: got %+v", summaries[1])
|
|
}
|
|
}
|