You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
111 lines
2.9 KiB
111 lines
2.9 KiB
package iceberg
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
)
|
|
|
|
const (
|
|
resourceGroupNone = "none"
|
|
resourceGroupBucket = "bucket"
|
|
resourceGroupNamespace = "namespace"
|
|
resourceGroupBucketNamespace = "bucket_namespace"
|
|
)
|
|
|
|
type resourceGroupConfig struct {
|
|
GroupBy string
|
|
MaxTablesPerGroup int64
|
|
}
|
|
|
|
func readResourceGroupConfig(values map[string]*plugin_pb.ConfigValue) (resourceGroupConfig, error) {
|
|
groupBy := strings.TrimSpace(strings.ToLower(readStringConfig(values, "resource_group_by", "")))
|
|
if groupBy == "" {
|
|
groupBy = resourceGroupNone
|
|
}
|
|
|
|
switch groupBy {
|
|
case resourceGroupNone, resourceGroupBucket, resourceGroupNamespace, resourceGroupBucketNamespace:
|
|
default:
|
|
return resourceGroupConfig{}, fmt.Errorf("invalid resource_group_by %q (valid: none, bucket, namespace, bucket_namespace)", groupBy)
|
|
}
|
|
|
|
maxTablesPerGroup := readInt64Config(values, "max_tables_per_resource_group", 0)
|
|
if maxTablesPerGroup < 0 {
|
|
return resourceGroupConfig{}, fmt.Errorf("max_tables_per_resource_group must be >= 0, got %d", maxTablesPerGroup)
|
|
}
|
|
if groupBy == resourceGroupNone && maxTablesPerGroup > 0 {
|
|
return resourceGroupConfig{}, fmt.Errorf("max_tables_per_resource_group requires resource_group_by to be set")
|
|
}
|
|
|
|
return resourceGroupConfig{
|
|
GroupBy: groupBy,
|
|
MaxTablesPerGroup: maxTablesPerGroup,
|
|
}, nil
|
|
}
|
|
|
|
func (c resourceGroupConfig) enabled() bool {
|
|
return c.GroupBy != "" && c.GroupBy != resourceGroupNone
|
|
}
|
|
|
|
func resourceGroupKey(info tableInfo, groupBy string) string {
|
|
switch groupBy {
|
|
case resourceGroupBucket:
|
|
return info.BucketName
|
|
case resourceGroupNamespace:
|
|
return info.Namespace
|
|
case resourceGroupBucketNamespace:
|
|
return info.BucketName + "/" + info.Namespace
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func selectTablesByResourceGroup(tables []tableInfo, cfg resourceGroupConfig, maxResults int) ([]tableInfo, bool) {
|
|
if !cfg.enabled() {
|
|
if maxResults > 0 && len(tables) > maxResults {
|
|
return tables[:maxResults], true
|
|
}
|
|
return tables, false
|
|
}
|
|
|
|
grouped := make(map[string][]tableInfo)
|
|
groupOrder := make([]string, 0)
|
|
for _, table := range tables {
|
|
key := resourceGroupKey(table, cfg.GroupBy)
|
|
if _, ok := grouped[key]; !ok {
|
|
groupOrder = append(groupOrder, key)
|
|
}
|
|
grouped[key] = append(grouped[key], table)
|
|
}
|
|
|
|
selected := make([]tableInfo, 0, len(tables))
|
|
selectedPerGroup := make(map[string]int64)
|
|
for {
|
|
progress := false
|
|
for _, key := range groupOrder {
|
|
if maxResults > 0 && len(selected) >= maxResults {
|
|
return selected, len(selected) < len(tables)
|
|
}
|
|
if cfg.MaxTablesPerGroup > 0 && selectedPerGroup[key] >= cfg.MaxTablesPerGroup {
|
|
continue
|
|
}
|
|
|
|
queue := grouped[key]
|
|
if len(queue) == 0 {
|
|
continue
|
|
}
|
|
|
|
selected = append(selected, queue[0])
|
|
grouped[key] = queue[1:]
|
|
selectedPerGroup[key]++
|
|
progress = true
|
|
}
|
|
if !progress {
|
|
break
|
|
}
|
|
}
|
|
|
|
return selected, len(selected) < len(tables)
|
|
}
|