package topology

import (
	"encoding/json"
	"fmt"
	"github.com/seaweedfs/seaweedfs/weed/storage/types"
	"testing"

	"github.com/seaweedfs/seaweedfs/weed/sequence"
	"github.com/seaweedfs/seaweedfs/weed/storage"
	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
	"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
)

var topologyLayout = `
{
  "dc1":{
    "rack1":{
      "server111":{
        "volumes":[
          {"id":1, "size":12312},
          {"id":2, "size":12312},
          {"id":3, "size":12312}
        ],
        "limit":3
      },
      "server112":{
        "volumes":[
          {"id":4, "size":12312},
          {"id":5, "size":12312},
          {"id":6, "size":12312}
        ],
        "limit":10
      }
    },
    "rack2":{
      "server121":{
        "volumes":[
          {"id":4, "size":12312},
          {"id":5, "size":12312},
          {"id":6, "size":12312}
        ],
        "limit":4
      },
      "server122":{
        "volumes":[],
        "limit":4
      },
      "server123":{
        "volumes":[
          {"id":2, "size":12312},
          {"id":3, "size":12312},
          {"id":4, "size":12312}
        ],
        "limit":5
      }
    }
  },
  "dc2":{
  },
  "dc3":{
    "rack2":{
      "server321":{
        "volumes":[
          {"id":1, "size":12312},
          {"id":3, "size":12312},
          {"id":5, "size":12312}
        ],
        "limit":4
      }
    }
  }
}
`

func setup(topologyLayout string) *Topology {
	var data interface{}
	err := json.Unmarshal([]byte(topologyLayout), &data)
	if err != nil {
		fmt.Println("error:", err)
	}
	fmt.Println("data:", data)

	//need to connect all nodes first before server adding volumes
	topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
	mTopology := data.(map[string]interface{})
	for dcKey, dcValue := range mTopology {
		dc := NewDataCenter(dcKey)
		dcMap := dcValue.(map[string]interface{})
		topo.LinkChildNode(dc)
		for rackKey, rackValue := range dcMap {
			dcRack := NewRack(rackKey)
			rackMap := rackValue.(map[string]interface{})
			dc.LinkChildNode(dcRack)
			for serverKey, serverValue := range rackMap {
				server := NewDataNode(serverKey)
				serverMap := serverValue.(map[string]interface{})
				if ip, ok := serverMap["ip"]; ok {
					server.Ip = ip.(string)
				}
				dcRack.LinkChildNode(server)
				for _, v := range serverMap["volumes"].([]interface{}) {
					m := v.(map[string]interface{})
					vi := storage.VolumeInfo{
						Id:      needle.VolumeId(int64(m["id"].(float64))),
						Size:    uint64(m["size"].(float64)),
						Version: needle.CurrentVersion,
					}
					if mVal, ok := m["collection"]; ok {
						vi.Collection = mVal.(string)
					}
					if mVal, ok := m["replication"]; ok {
						rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string))
						vi.ReplicaPlacement = rp
					}
					if vi.ReplicaPlacement != nil {
						vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType)
						vl.RegisterVolume(&vi, server)
						vl.setVolumeWritable(vi.Id)
					}
					server.AddOrUpdateVolume(vi)
				}

				disk := server.getOrCreateDisk("")
				deltaDiskUsages := newDiskUsages()
				deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("")
				deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64))
				disk.UpAdjustDiskUsageDelta(deltaDiskUsages)

			}
		}
	}

	return topo
}

func TestFindEmptySlotsForOneVolume(t *testing.T) {
	topo := setup(topologyLayout)
	vg := NewDefaultVolumeGrowth()
	rp, _ := super_block.NewReplicaPlacementFromString("002")
	volumeGrowOption := &VolumeGrowOption{
		Collection:       "",
		ReplicaPlacement: rp,
		DataCenter:       "dc1",
		Rack:             "",
		DataNode:         "",
	}
	servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
	if err != nil {
		fmt.Println("finding empty slots error :", err)
		t.Fail()
	}
	for _, server := range servers {
		fmt.Println("assigned node :", server.Id())
	}
}

var topologyLayout2 = `
{
  "dc1":{
    "rack1":{
      "server111":{
        "volumes":[
          {"id":1, "size":12312},
          {"id":2, "size":12312},
          {"id":3, "size":12312}
        ],
        "limit":300
      },
      "server112":{
        "volumes":[
          {"id":4, "size":12312},
          {"id":5, "size":12312},
          {"id":6, "size":12312}
        ],
        "limit":300
      },
      "server113":{
        "volumes":[],
        "limit":300
      },
      "server114":{
        "volumes":[],
        "limit":300
      },
      "server115":{
        "volumes":[],
        "limit":300
      },
      "server116":{
        "volumes":[],
        "limit":300
      }
    },
    "rack2":{
      "server121":{
        "volumes":[
          {"id":4, "size":12312},
          {"id":5, "size":12312},
          {"id":6, "size":12312}
        ],
        "limit":300
      },
      "server122":{
        "volumes":[],
        "limit":300
      },
      "server123":{
        "volumes":[
          {"id":2, "size":12312},
          {"id":3, "size":12312},
          {"id":4, "size":12312}
        ],
        "limit":300
      },
      "server124":{
        "volumes":[],
        "limit":300
      },
      "server125":{
        "volumes":[],
        "limit":300
      },
      "server126":{
        "volumes":[],
        "limit":300
      }
    },
    "rack3":{
      "server131":{
        "volumes":[],
        "limit":300
      },
      "server132":{
        "volumes":[],
        "limit":300
      },
      "server133":{
        "volumes":[],
        "limit":300
      },
      "server134":{
        "volumes":[],
        "limit":300
      },
      "server135":{
        "volumes":[],
        "limit":300
      },
      "server136":{
        "volumes":[],
        "limit":300
      }
    }
  }
}
`

func TestReplication011(t *testing.T) {
	topo := setup(topologyLayout2)
	vg := NewDefaultVolumeGrowth()
	rp, _ := super_block.NewReplicaPlacementFromString("011")
	volumeGrowOption := &VolumeGrowOption{
		Collection:       "MAIL",
		ReplicaPlacement: rp,
		DataCenter:       "dc1",
		Rack:             "",
		DataNode:         "",
	}
	servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
	if err != nil {
		fmt.Println("finding empty slots error :", err)
		t.Fail()
	}
	for _, server := range servers {
		fmt.Println("assigned node :", server.Id())
	}
}

var topologyLayout3 = `
{
  "dc1":{
    "rack1":{
      "server111":{
        "volumes":[],
        "limit":2000
      }
    }
  },
  "dc2":{
    "rack2":{
      "server222":{
        "volumes":[],
        "limit":2000
      }
    }
  },
  "dc3":{
    "rack3":{
      "server333":{
        "volumes":[],
        "limit":1000
      }
    }
  },
  "dc4":{
    "rack4":{
      "server444":{
        "volumes":[],
        "limit":1000
      }
    }
  },
  "dc5":{
    "rack5":{
      "server555":{
        "volumes":[],
        "limit":500
      }
    }
  },
  "dc6":{
    "rack6":{
      "server666":{
        "volumes":[],
        "limit":500
      }
    }
  }
}
`

func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
	topo := setup(topologyLayout3)
	vg := NewDefaultVolumeGrowth()
	rp, _ := super_block.NewReplicaPlacementFromString("100")
	volumeGrowOption := &VolumeGrowOption{
		Collection:       "Weight",
		ReplicaPlacement: rp,
		DataCenter:       "",
		Rack:             "",
		DataNode:         "",
	}

	distribution := map[NodeId]int{}
	// assign 1000 volumes
	for i := 0; i < 1000; i++ {
		servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
		if err != nil {
			fmt.Println("finding empty slots error :", err)
			t.Fail()
		}
		for _, server := range servers {
			// fmt.Println("assigned node :", server.Id())
			if _, ok := distribution[server.id]; !ok {
				distribution[server.id] = 0
			}
			distribution[server.id] += 1
		}
	}

	for k, v := range distribution {
		fmt.Printf("%s : %d\n", k, v)
	}
}

var topologyLayout4 = `
{
  "dc1":{
    "rack1":{
      "serverdc111":{
		"ip": "127.0.0.1",
        "volumes":[
          {"id":1, "size":12312, "collection":"test", "replication":"001"},
          {"id":2, "size":12312, "collection":"test", "replication":"100"},
          {"id":4, "size":12312, "collection":"test", "replication":"100"},
          {"id":6, "size":12312, "collection":"test", "replication":"010"}
        ],
        "limit":100
      }
    }
  },
  "dc2":{
    "rack1":{
      "serverdc211":{
		"ip": "127.0.0.2",
        "volumes":[
          {"id":2, "size":12312, "collection":"test", "replication":"100"},
          {"id":3, "size":12312, "collection":"test", "replication":"010"},
          {"id":5, "size":12312, "collection":"test", "replication":"001"},
          {"id":6, "size":12312, "collection":"test", "replication":"010"}
		],
        "limit":100
      }
    }
  },
  "dc3":{
    "rack1":{
      "serverdc311":{
		"ip": "127.0.0.3",
        "volumes":[
          {"id":1, "size":12312, "collection":"test", "replication":"001"},
          {"id":3, "size":12312, "collection":"test", "replication":"010"},
          {"id":4, "size":12312, "collection":"test", "replication":"100"},
          {"id":5, "size":12312, "collection":"test", "replication":"001"}
		],
        "limit":100
      }
    }
  }
}
`

func TestPickForWrite(t *testing.T) {
	topo := setup(topologyLayout4)
	volumeGrowOption := &VolumeGrowOption{
		Collection: "test",
		DataCenter: "",
		Rack:       "",
		DataNode:   "",
	}
	VolumeGrowStrategy.Threshold = 0.9
	for _, rpStr := range []string{"001", "010", "100"} {
		rp, _ := super_block.NewReplicaPlacementFromString(rpStr)
		vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType)
		volumeGrowOption.ReplicaPlacement = rp
		for _, dc := range []string{"", "dc1", "dc2", "dc3", "dc0"} {
			volumeGrowOption.DataCenter = dc
			for _, r := range []string{""} {
				volumeGrowOption.Rack = r
				for _, dn := range []string{""} {
					if dc == "" && dn != "" {
						continue
					}
					volumeGrowOption.DataNode = dn
					fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl)
					if dc == "dc0" {
						if err == nil || count != 0 || !shouldGrow {
							fmt.Println(dc, r, dn, "pick for write should be with error")
							t.Fail()
						}
					} else if err != nil {
						fmt.Println(dc, r, dn, "pick for write error :", err)
						t.Fail()
					} else if count == 0 {
						fmt.Println(dc, r, dn, "pick for write count is zero")
						t.Fail()
					} else if len(fileId) == 0 {
						fmt.Println(dc, r, dn, "pick for write file id is empty")
						t.Fail()
					} else if shouldGrow {
						fmt.Println(dc, r, dn, "pick for write error : not should grow")
						t.Fail()
					}
				}
			}
		}
	}
}