Browse Source

add http endpoint to get the size of a collection (#5910)

pull/5912/head
Riccardo Bertossa 4 months ago
committed by GitHub
parent
commit
6fe8639504
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      weed/server/master_server.go
  2. 55
      weed/server/master_server_handlers_admin.go
  3. 22
      weed/topology/collection.go

1
weed/server/master_server.go

@ -146,6 +146,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
r.HandleFunc("/collection/info", ms.guard.WhiteList(ms.collectionInfoHandler))
/* /*
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))

55
weed/server/master_server_handlers_admin.go

@ -178,3 +178,58 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
} }
return volumeGrowOption, nil return volumeGrowOption, nil
} }
func (ms *MasterServer) collectionInfoHandler(w http.ResponseWriter, r *http.Request) {
//get collection from request
collectionName := r.FormValue("collection")
if collectionName == "" {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection is required"))
return
}
//output details of the volumes?
detail := r.FormValue("detail") == "true"
//collect collection info
collection, ok := ms.Topo.FindCollection(collectionName)
if !ok {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
return
}
volumeLayouts := collection.GetAllVolumeLayouts()
if detail {
//prepare the json response
all_stats := make([]map[string]interface{}, len(volumeLayouts))
for i, volumeLayout := range volumeLayouts {
volumeLayoutStats := volumeLayout.Stats()
m := make(map[string]interface{})
m["Version"] = util.Version()
m["Collection"] = collectionName
m["TotalSize"] = volumeLayoutStats.TotalSize
m["FileCount"] = volumeLayoutStats.FileCount
m["UsedSize"] = volumeLayoutStats.UsedSize
all_stats[i] = m
}
//write it
writeJsonQuiet(w, r, http.StatusOK, all_stats)
} else {
//prepare the json response
collectionStats := map[string]interface{}{
"Version": util.Version(),
"Collection": collectionName,
"TotalSize": uint64(0),
"FileCount": uint64(0),
"UsedSize": uint64(0),
"VolumeCount": uint64(0),
}
for _, volumeLayout := range volumeLayouts {
volumeLayoutStats := volumeLayout.Stats()
collectionStats["TotalSize"] = collectionStats["TotalSize"].(uint64) + volumeLayoutStats.TotalSize
collectionStats["FileCount"] = collectionStats["FileCount"].(uint64) + volumeLayoutStats.FileCount
collectionStats["UsedSize"] = collectionStats["UsedSize"].(uint64) + volumeLayoutStats.UsedSize
collectionStats["VolumeCount"] = collectionStats["VolumeCount"].(uint64) + 1
}
//write it
writeJsonQuiet(w, r, http.StatusOK, collectionStats)
}
}

22
weed/topology/collection.go

@ -44,6 +44,28 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, t
return vl.(*VolumeLayout) return vl.(*VolumeLayout)
} }
func (c *Collection) GetVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) (*VolumeLayout, bool) {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
if diskType != types.HardDriveType {
keyString += string(diskType)
}
vl, ok := c.storageType2VolumeLayout.Find(keyString)
return vl.(*VolumeLayout), ok
}
func (c *Collection) GetAllVolumeLayouts() []*VolumeLayout {
var vls []*VolumeLayout
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
vls = append(vls, vl.(*VolumeLayout))
}
}
return vls
}
func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) { func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) {
keyString := rp.String() keyString := rp.String()
if ttl != nil { if ttl != nil {

Loading…
Cancel
Save