mirror of https://github.com/grafana/grafana
UnifiedSearch: Use ResourceIndex from dashboards apiserver (v0alpha1 only) (#96939)
parent
104f795156
commit
f6ccf976e5
@ -0,0 +1,96 @@ |
||||
package v0alpha1 |
||||
|
||||
import ( |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
|
||||
common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1" |
||||
) |
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
type SearchResults struct { |
||||
metav1.TypeMeta `json:",inline"` |
||||
|
||||
// Where the query started from
|
||||
Offset int64 `json:"offset,omitempty"` |
||||
|
||||
// The number of matching results
|
||||
TotalHits int64 `json:"totalHits"` |
||||
|
||||
// The dashboard body (unstructured for now)
|
||||
Hits []DashboardHit `json:"hits"` |
||||
|
||||
// Cost of running the query
|
||||
QueryCost float64 `json:"queryCost,omitempty"` |
||||
|
||||
// Max score
|
||||
MaxScore float64 `json:"maxScore,omitempty"` |
||||
|
||||
// How are the results sorted
|
||||
SortBy *SortBy `json:"sortBy,omitempty"` |
||||
|
||||
// Facet results
|
||||
Facets map[string]FacetResult `json:"facets,omitempty"` |
||||
} |
||||
|
||||
type SortBy struct { |
||||
Field string `json:"field"` |
||||
Descending bool `json:"desc,omitempty"` |
||||
} |
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
type SortableFields struct { |
||||
metav1.TypeMeta `json:",inline"` |
||||
|
||||
// Sortable fields (depends on backend support)
|
||||
Fields []SortableField `json:"fields"` |
||||
} |
||||
|
||||
type SortableField struct { |
||||
Field string `json:"string,omitempty"` |
||||
Display string `json:"display,omitempty"` |
||||
Type string `json:"type,omitempty"` // string or number
|
||||
} |
||||
|
||||
// Dashboard or folder hit
|
||||
// +enum
|
||||
type HitKind string |
||||
|
||||
// PluginType values
|
||||
const ( |
||||
HitTypeDash HitKind = "Dashboard" |
||||
HitTypeFolder HitKind = "Folder" |
||||
) |
||||
|
||||
type DashboardHit struct { |
||||
// Dashboard or folder
|
||||
Kind HitKind `json:"kind"` |
||||
// The k8s "name" (eg, grafana UID)
|
||||
Name string `json:"name"` |
||||
// The display nam
|
||||
Title string `json:"title"` |
||||
// Filter tags
|
||||
Tags []string `json:"tags,omitempty"` |
||||
// The UID/name for the folder
|
||||
Folder string `json:"folder,omitempty"` |
||||
// Stick untyped extra fields in this object (including the sort value)
|
||||
Field *common.Unstructured `json:"field,omitempty"` |
||||
// Explain the score (if possible)
|
||||
Explain *common.Unstructured `json:"explain,omitempty"` |
||||
// When using "real" search, this is the score
|
||||
Score float64 `json:"score,omitempty"` |
||||
} |
||||
|
||||
type FacetResult struct { |
||||
Field string `json:"field,omitempty"` |
||||
// The distinct terms
|
||||
Total int64 `json:"total,omitempty"` |
||||
// The number of documents that do *not* have this field
|
||||
Missing int64 `json:"missing,omitempty"` |
||||
// Term facets
|
||||
Terms []TermFacet `json:"terms,omitempty"` |
||||
} |
||||
|
||||
type TermFacet struct { |
||||
Term string `json:"term,omitempty"` |
||||
Count int64 `json:"count,omitempty"` |
||||
} |
@ -1 +1,7 @@ |
||||
API rule violation: list_type_missing,github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1,DashboardHit,Tags |
||||
API rule violation: list_type_missing,github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1,FacetResult,Terms |
||||
API rule violation: list_type_missing,github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1,LibraryPanelStatus,Warnings |
||||
API rule violation: list_type_missing,github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1,SearchResults,Hits |
||||
API rule violation: list_type_missing,github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1,SortableFields,Fields |
||||
API rule violation: names_match,github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1,SortBy,Descending |
||||
API rule violation: names_match,github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1,SortableField,Field |
||||
|
@ -1,122 +1,332 @@ |
||||
package dashboard |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"net/http" |
||||
"net/url" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apiserver/pkg/registry/rest" |
||||
"go.opentelemetry.io/otel/trace" |
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/kube-openapi/pkg/common" |
||||
"k8s.io/kube-openapi/pkg/spec3" |
||||
"k8s.io/kube-openapi/pkg/validation/spec" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||
dashboardv0alpha1 "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
"github.com/grafana/grafana/pkg/util/errhttp" |
||||
) |
||||
|
||||
// The DTO returns everything the UI needs in a single request
|
||||
type SearchConnector struct { |
||||
newFunc func() runtime.Object |
||||
client resource.ResourceIndexClient |
||||
log log.Logger |
||||
type SearchHandler struct { |
||||
log log.Logger |
||||
client resource.ResourceIndexClient |
||||
tracer trace.Tracer |
||||
} |
||||
|
||||
func NewSearchConnector( |
||||
client resource.ResourceIndexClient, |
||||
newFunc func() runtime.Object, |
||||
) (rest.Storage, error) { |
||||
v := &SearchConnector{ |
||||
client: client, |
||||
newFunc: newFunc, |
||||
log: log.New("grafana-apiserver.dashboards.search"), |
||||
func NewSearchHandler(client resource.ResourceIndexClient, tracer trace.Tracer) *SearchHandler { |
||||
return &SearchHandler{ |
||||
client: client, |
||||
log: log.New("grafana-apiserver.dashboards.search"), |
||||
tracer: tracer, |
||||
} |
||||
return v, nil |
||||
} |
||||
|
||||
var ( |
||||
_ rest.Connecter = (*SearchConnector)(nil) |
||||
_ rest.StorageMetadata = (*SearchConnector)(nil) |
||||
_ rest.Scoper = (*SearchConnector)(nil) |
||||
_ rest.SingularNameProvider = (*SearchConnector)(nil) |
||||
) |
||||
func (s *SearchHandler) GetAPIRoutes(defs map[string]common.OpenAPIDefinition) *builder.APIRoutes { |
||||
searchResults := defs["github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1.SearchResults"].Schema |
||||
sortableFields := defs["github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1.SortableFields"].Schema |
||||
|
||||
func (s *SearchConnector) New() runtime.Object { |
||||
return s.newFunc() |
||||
return &builder.APIRoutes{ |
||||
Namespace: []builder.APIRouteHandler{ |
||||
{ |
||||
Path: "search", |
||||
Spec: &spec3.PathProps{ |
||||
Get: &spec3.Operation{ |
||||
OperationProps: spec3.OperationProps{ |
||||
Tags: []string{"Search"}, |
||||
Description: "Dashboard search", |
||||
Parameters: []*spec3.Parameter{ |
||||
{ |
||||
ParameterProps: spec3.ParameterProps{ |
||||
Name: "namespace", |
||||
In: "path", |
||||
Required: true, |
||||
Example: "default", |
||||
Description: "workspace", |
||||
Schema: spec.StringProperty(), |
||||
}, |
||||
}, |
||||
{ |
||||
ParameterProps: spec3.ParameterProps{ |
||||
Name: "query", |
||||
In: "query", |
||||
Description: "user query string", |
||||
Required: false, |
||||
Schema: spec.StringProperty(), |
||||
}, |
||||
}, |
||||
{ |
||||
ParameterProps: spec3.ParameterProps{ |
||||
Name: "folder", |
||||
In: "query", |
||||
Description: "search/list within a folder (not recursive)", |
||||
Required: false, |
||||
Schema: spec.StringProperty(), |
||||
}, |
||||
}, |
||||
{ |
||||
ParameterProps: spec3.ParameterProps{ |
||||
Name: "sort", |
||||
In: "query", |
||||
Description: "sortable field", |
||||
Example: "", // not sorted
|
||||
Examples: map[string]*spec3.Example{ |
||||
"": { |
||||
ExampleProps: spec3.ExampleProps{ |
||||
Summary: "default sorting", |
||||
Value: "", |
||||
}, |
||||
}, |
||||
"title": { |
||||
ExampleProps: spec3.ExampleProps{ |
||||
Summary: "title ascending", |
||||
Value: "title", |
||||
}, |
||||
}, |
||||
"-title": { |
||||
ExampleProps: spec3.ExampleProps{ |
||||
Summary: "title descending", |
||||
Value: "-title", |
||||
}, |
||||
}, |
||||
}, |
||||
Required: false, |
||||
Schema: spec.StringProperty(), |
||||
}, |
||||
}, |
||||
}, |
||||
Responses: &spec3.Responses{ |
||||
ResponsesProps: spec3.ResponsesProps{ |
||||
StatusCodeResponses: map[int]*spec3.Response{ |
||||
200: { |
||||
ResponseProps: spec3.ResponseProps{ |
||||
Content: map[string]*spec3.MediaType{ |
||||
"application/json": { |
||||
MediaTypeProps: spec3.MediaTypeProps{ |
||||
Schema: &searchResults, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
Handler: s.DoSearch, |
||||
}, |
||||
{ |
||||
Path: "search/sortable", |
||||
Spec: &spec3.PathProps{ |
||||
Get: &spec3.Operation{ |
||||
OperationProps: spec3.OperationProps{ |
||||
Tags: []string{"Search"}, |
||||
Description: "Get sortable fields", |
||||
Parameters: []*spec3.Parameter{ |
||||
{ |
||||
ParameterProps: spec3.ParameterProps{ |
||||
Name: "namespace", |
||||
In: "path", |
||||
Required: true, |
||||
Example: "default", |
||||
Description: "workspace", |
||||
Schema: spec.StringProperty(), |
||||
}, |
||||
}, |
||||
}, |
||||
Responses: &spec3.Responses{ |
||||
ResponsesProps: spec3.ResponsesProps{ |
||||
StatusCodeResponses: map[int]*spec3.Response{ |
||||
200: { |
||||
ResponseProps: spec3.ResponseProps{ |
||||
Content: map[string]*spec3.MediaType{ |
||||
"application/json": { |
||||
MediaTypeProps: spec3.MediaTypeProps{ |
||||
Schema: &sortableFields, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
Handler: s.DoSortable, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (s *SearchConnector) Destroy() { |
||||
func (s *SearchHandler) DoSortable(w http.ResponseWriter, r *http.Request) { |
||||
sortable := &dashboardv0alpha1.SortableFields{ |
||||
TypeMeta: v1.TypeMeta{ |
||||
APIVersion: dashboardv0alpha1.APIVERSION, |
||||
Kind: "SortableFields", |
||||
}, |
||||
Fields: []dashboardv0alpha1.SortableField{ |
||||
{Field: "title", Display: "Title (A-Z)", Type: "string"}, |
||||
{Field: "-title", Display: "Title (Z-A)", Type: "string"}, |
||||
}, |
||||
} |
||||
s.write(w, sortable) |
||||
} |
||||
|
||||
func (s *SearchConnector) NamespaceScoped() bool { |
||||
return true // namespace == org
|
||||
} |
||||
func (s *SearchHandler) DoSearch(w http.ResponseWriter, r *http.Request) { |
||||
ctx, span := s.tracer.Start(r.Context(), "dashboard.search") |
||||
defer span.End() |
||||
|
||||
func (s *SearchConnector) GetSingularName() string { |
||||
return "Search" |
||||
} |
||||
user, err := identity.GetRequester(ctx) |
||||
if err != nil { |
||||
errhttp.Write(ctx, err, w) |
||||
return |
||||
} |
||||
|
||||
func (s *SearchConnector) ConnectMethods() []string { |
||||
return []string{"GET"} |
||||
} |
||||
queryParams, err := url.ParseQuery(r.URL.RawQuery) |
||||
if err != nil { |
||||
errhttp.Write(ctx, err, w) |
||||
return |
||||
} |
||||
|
||||
func (s *SearchConnector) NewConnectOptions() (runtime.Object, bool, string) { |
||||
return nil, false, "" |
||||
} |
||||
// get limit and offset from query params
|
||||
limit := 50 |
||||
offset := 0 |
||||
if queryParams.Has("limit") { |
||||
limit, _ = strconv.Atoi(queryParams.Get("limit")) |
||||
} |
||||
if queryParams.Has("offset") { |
||||
offset, _ = strconv.Atoi(queryParams.Get("offset")) |
||||
} |
||||
|
||||
func (s *SearchConnector) ProducesMIMETypes(verb string) []string { |
||||
return nil |
||||
} |
||||
searchRequest := &resource.ResourceSearchRequest{ |
||||
Options: &resource.ListOptions{ |
||||
Key: &resource.ResourceKey{ |
||||
Namespace: user.GetNamespace(), |
||||
Group: dashboardv0alpha1.GROUP, |
||||
Resource: "dashboards", |
||||
}, |
||||
}, |
||||
Query: queryParams.Get("query"), |
||||
Limit: int64(limit), |
||||
Offset: int64(offset), |
||||
Fields: []string{ |
||||
"title", |
||||
"folder", |
||||
"tags", |
||||
}, |
||||
} |
||||
|
||||
func (s *SearchConnector) ProducesObject(verb string) interface{} { |
||||
return s.newFunc() |
||||
} |
||||
// Add the folder constraint. Note this does not do recursive search
|
||||
folder := queryParams.Get("folder") |
||||
if folder != "" { |
||||
searchRequest.Options.Fields = []*resource.Requirement{{ |
||||
Key: "folder", |
||||
Operator: "=", |
||||
Values: []string{folder}, |
||||
}} |
||||
} |
||||
|
||||
func (s *SearchConnector) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { |
||||
user, err := identity.GetRequester(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
// Add sorting
|
||||
if queryParams.Has("sort") { |
||||
for _, sort := range queryParams["sort"] { |
||||
s := &resource.ResourceSearchRequest_Sort{Field: sort} |
||||
if strings.HasPrefix(sort, "-") { |
||||
s.Desc = true |
||||
s.Field = s.Field[1:] |
||||
} |
||||
searchRequest.SortBy = append(searchRequest.SortBy, s) |
||||
} |
||||
} |
||||
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
||||
queryParams, err := url.ParseQuery(r.URL.RawQuery) |
||||
if err != nil { |
||||
responder.Error(err) |
||||
return |
||||
// Also query folders
|
||||
if searchRequest.Query != "" { |
||||
searchRequest.Federated = []*resource.ResourceKey{{ |
||||
Namespace: searchRequest.Options.Key.Namespace, |
||||
Group: "folder.grafana.app", |
||||
Resource: "folders", |
||||
}} |
||||
} |
||||
|
||||
// The facet term fields
|
||||
facets, ok := queryParams["facet"] |
||||
if ok { |
||||
searchRequest.Facet = make(map[string]*resource.ResourceSearchRequest_Facet) |
||||
for _, v := range facets { |
||||
searchRequest.Facet[v] = &resource.ResourceSearchRequest_Facet{ |
||||
Field: v, |
||||
Limit: 50, |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Run the query
|
||||
result, err := s.client.Search(ctx, searchRequest) |
||||
if err != nil { |
||||
errhttp.Write(ctx, err, w) |
||||
return |
||||
} |
||||
|
||||
// get limit and offset from query params
|
||||
limit := 0 |
||||
offset := 0 |
||||
if queryParams.Has("limit") { |
||||
limit, _ = strconv.Atoi(queryParams.Get("limit")) |
||||
sr := &dashboardv0alpha1.SearchResults{ |
||||
Offset: searchRequest.Offset, |
||||
TotalHits: result.TotalHits, |
||||
QueryCost: result.QueryCost, |
||||
MaxScore: result.MaxScore, |
||||
Hits: make([]dashboardv0alpha1.DashboardHit, len(result.Results.Rows)), |
||||
} |
||||
for i, row := range result.Results.Rows { |
||||
hit := &dashboardv0alpha1.DashboardHit{ |
||||
Kind: dashboardv0alpha1.HitTypeDash, |
||||
Name: row.Key.Name, |
||||
Title: string(row.Cells[0]), |
||||
Folder: string(row.Cells[1]), |
||||
} |
||||
if queryParams.Has("offset") { |
||||
offset, _ = strconv.Atoi(queryParams.Get("offset")) |
||||
if row.Cells[2] != nil { |
||||
_ = json.Unmarshal(row.Cells[2], &hit.Tags) |
||||
} |
||||
sr.Hits[i] = *hit |
||||
} |
||||
|
||||
searchRequest := &resource.SearchRequest{ |
||||
Tenant: user.GetNamespace(), //<< not necessary it is in the namespace (and user context)
|
||||
Kind: strings.Split(queryParams.Get("kind"), ","), |
||||
QueryType: queryParams.Get("queryType"), |
||||
Query: queryParams.Get("query"), |
||||
Limit: int64(limit), |
||||
Offset: int64(offset), |
||||
// Add facet results
|
||||
if result.Facet != nil { |
||||
sr.Facets = make(map[string]dashboardv0alpha1.FacetResult) |
||||
for k, v := range result.Facet { |
||||
sr.Facets[k] = dashboardv0alpha1.FacetResult{ |
||||
Field: v.Field, |
||||
Total: v.Total, |
||||
Missing: v.Missing, |
||||
Terms: make([]dashboardv0alpha1.TermFacet, len(v.Terms)), |
||||
} |
||||
for j, t := range v.Terms { |
||||
sr.Facets[k].Terms[j] = dashboardv0alpha1.TermFacet{ |
||||
Term: t.Term, |
||||
Count: t.Count, |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TODO... actually query
|
||||
result, err := s.client.Search(r.Context(), searchRequest) |
||||
if err != nil { |
||||
responder.Error(err) |
||||
return |
||||
} |
||||
s.write(w, sr) |
||||
} |
||||
|
||||
jj, err := json.Marshal(result) |
||||
if err != nil { |
||||
responder.Error(err) |
||||
return |
||||
} |
||||
_, _ = w.Write(jj) |
||||
}), nil |
||||
func (s *SearchHandler) write(w http.ResponseWriter, obj any) { |
||||
w.Header().Set("Content-Type", "application/json") |
||||
_ = json.NewEncoder(w).Encode(obj) |
||||
} |
||||
|
@ -1,150 +0,0 @@ |
||||
package search |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"net/http" |
||||
"net/url" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apiserver/pkg/authorization/authorizer" |
||||
genericapiserver "k8s.io/apiserver/pkg/server" |
||||
common "k8s.io/kube-openapi/pkg/common" |
||||
"k8s.io/kube-openapi/pkg/spec3" |
||||
|
||||
"github.com/grafana/grafana/pkg/api/response" |
||||
request2 "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
) |
||||
|
||||
var _ builder.APIGroupBuilder = (*SearchAPIBuilder)(nil) |
||||
|
||||
type SearchAPIBuilder struct { |
||||
unified resource.ResourceClient |
||||
namespacer request2.NamespaceMapper |
||||
} |
||||
|
||||
func NewSearchAPIBuilder( |
||||
unified resource.ResourceClient, |
||||
cfg *setting.Cfg, |
||||
) (*SearchAPIBuilder, error) { |
||||
return &SearchAPIBuilder{ |
||||
unified: unified, |
||||
namespacer: request2.GetNamespaceMapper(cfg), |
||||
}, nil |
||||
} |
||||
|
||||
func RegisterAPIService( |
||||
features featuremgmt.FeatureToggles, |
||||
apiregistration builder.APIRegistrar, |
||||
unified resource.ResourceClient, |
||||
cfg *setting.Cfg, |
||||
) (*SearchAPIBuilder, error) { |
||||
if !(features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) || features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs)) { |
||||
return nil, nil |
||||
} |
||||
builder, err := NewSearchAPIBuilder(unified, cfg) |
||||
apiregistration.RegisterAPI(builder) |
||||
return builder, err |
||||
} |
||||
|
||||
func (b *SearchAPIBuilder) GetGroupVersion() schema.GroupVersion { |
||||
return schema.GroupVersion{Group: "search.grafana.app", Version: "v0alpha1"} |
||||
} |
||||
|
||||
func (b *SearchAPIBuilder) InstallSchema(scheme *runtime.Scheme) error { |
||||
return nil |
||||
} |
||||
|
||||
func (b *SearchAPIBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions { |
||||
return func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { |
||||
return map[string]common.OpenAPIDefinition{} |
||||
} |
||||
} |
||||
|
||||
func (b *SearchAPIBuilder) GetAPIRoutes() *builder.APIRoutes { |
||||
return &builder.APIRoutes{ |
||||
Namespace: []builder.APIRouteHandler{ |
||||
{ |
||||
Path: "search", |
||||
Spec: &spec3.PathProps{ |
||||
Get: &spec3.Operation{ |
||||
OperationProps: spec3.OperationProps{ |
||||
Tags: []string{"Search"}, |
||||
Summary: "Search", |
||||
Description: "Search for resources", |
||||
}, |
||||
}, |
||||
}, |
||||
Handler: func(w http.ResponseWriter, r *http.Request) { |
||||
// get tenant
|
||||
orgId, err := request2.OrgIDForList(r.Context()) |
||||
if err != nil { |
||||
response.Error(500, "failed to get orgId", err) |
||||
} |
||||
tenant := b.namespacer(orgId) |
||||
|
||||
queryParams, err := url.ParseQuery(r.URL.RawQuery) |
||||
if err != nil { |
||||
response.Error(500, "failed to parse query params", err) |
||||
} |
||||
|
||||
// get limit and offset from query params
|
||||
limit := 0 |
||||
offset := 0 |
||||
if queryParams.Has("limit") { |
||||
limit, _ = strconv.Atoi(queryParams.Get("limit")) |
||||
} |
||||
if queryParams.Has("offset") { |
||||
offset, _ = strconv.Atoi(queryParams.Get("offset")) |
||||
} |
||||
|
||||
searchRequest := &resource.SearchRequest{ |
||||
Tenant: tenant, |
||||
Kind: strings.Split(queryParams.Get("kind"), ","), |
||||
QueryType: queryParams.Get("queryType"), |
||||
Query: queryParams.Get("query"), |
||||
Limit: int64(limit), |
||||
Offset: int64(offset), |
||||
} |
||||
|
||||
res, err := b.unified.Search(r.Context(), searchRequest) |
||||
if err != nil { |
||||
response.Error(500, "search request failed", err) |
||||
} |
||||
|
||||
// TODO need a nicer way of handling this
|
||||
// the [][]byte response already contains the marshalled JSON, so we don't need to re-encode it
|
||||
rawMessages := make([]json.RawMessage, len(res.GetItems())) |
||||
for i, item := range res.GetItems() { |
||||
rawMessages[i] = item.Value |
||||
} |
||||
|
||||
w.Header().Set("Content-Type", "application/json") |
||||
if err := json.NewEncoder(w).Encode(rawMessages); err != nil { |
||||
response.Error(500, "failed to json encode raw response", err) |
||||
} |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (b *SearchAPIBuilder) GetAuthorizer() authorizer.Authorizer { |
||||
return nil |
||||
} |
||||
|
||||
func (b *SearchAPIBuilder) PostProcessOpenAPI(oas *spec3.OpenAPI) (*spec3.OpenAPI, error) { |
||||
return oas, nil |
||||
} |
||||
|
||||
func (b *SearchAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver.APIGroupInfo, _ builder.APIGroupOptions) error { |
||||
apiGroupInfo.PrioritizedVersions = []schema.GroupVersion{b.GetGroupVersion()} |
||||
return nil |
||||
} |
@ -1,73 +0,0 @@ |
||||
package unifiedSearch |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"errors" |
||||
"io" |
||||
"net/http" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
|
||||
"github.com/grafana/grafana/pkg/api/response" |
||||
"github.com/grafana/grafana/pkg/api/routing" |
||||
"github.com/grafana/grafana/pkg/middleware" |
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" |
||||
) |
||||
|
||||
type SearchHTTPService interface { |
||||
RegisterHTTPRoutes(storageRoute routing.RouteRegister) |
||||
} |
||||
|
||||
type searchHTTPService struct { |
||||
search SearchService |
||||
} |
||||
|
||||
func ProvideSearchHTTPService(search SearchService) SearchHTTPService { |
||||
return &searchHTTPService{search: search} |
||||
} |
||||
|
||||
func (s *searchHTTPService) RegisterHTTPRoutes(storageRoute routing.RouteRegister) { |
||||
storageRoute.Post("/", middleware.ReqSignedIn, routing.Wrap(s.doQuery)) |
||||
} |
||||
|
||||
func (s *searchHTTPService) doQuery(c *contextmodel.ReqContext) response.Response { |
||||
searchReadinessCheckResp := s.search.IsReady(c.Req.Context(), c.SignedInUser.GetOrgID()) |
||||
if !searchReadinessCheckResp.IsReady { |
||||
return response.JSON(http.StatusOK, &backend.DataResponse{ |
||||
Frames: []*data.Frame{{ |
||||
Name: "Loading", |
||||
}}, |
||||
Error: nil, |
||||
}) |
||||
} |
||||
|
||||
body, err := io.ReadAll(c.Req.Body) |
||||
if err != nil { |
||||
return response.Error(http.StatusInternalServerError, "error reading bytes", err) |
||||
} |
||||
|
||||
query := &Query{} |
||||
err = json.Unmarshal(body, query) |
||||
if err != nil { |
||||
return response.Error(http.StatusBadRequest, "error parsing body", err) |
||||
} |
||||
|
||||
resp := s.search.doQuery(c.Req.Context(), c.SignedInUser, c.SignedInUser.GetOrgID(), *query) |
||||
|
||||
if resp.Error != nil { |
||||
return response.Error(http.StatusInternalServerError, "error handling search request", resp.Error) |
||||
} |
||||
|
||||
if len(resp.Frames) == 0 { |
||||
msg := "invalid search response" |
||||
return response.Error(http.StatusInternalServerError, msg, errors.New(msg)) |
||||
} |
||||
|
||||
bytes, err := resp.MarshalJSON() |
||||
if err != nil { |
||||
return response.Error(http.StatusInternalServerError, "error marshalling response", err) |
||||
} |
||||
|
||||
return response.JSON(http.StatusOK, bytes) |
||||
} |
@ -1,328 +0,0 @@ |
||||
package unifiedSearch |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
"github.com/grafana/grafana/pkg/infra/db" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/infra/tracing" |
||||
"github.com/grafana/grafana/pkg/registry" |
||||
"github.com/grafana/grafana/pkg/services/accesscontrol" |
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/grafana/grafana/pkg/services/folder" |
||||
"github.com/grafana/grafana/pkg/services/org" |
||||
"github.com/grafana/grafana/pkg/services/store" |
||||
"github.com/grafana/grafana/pkg/services/user" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
) |
||||
|
||||
type StandardSearchService struct { |
||||
registry.BackgroundService |
||||
cfg *setting.Cfg |
||||
sql db.DB |
||||
ac accesscontrol.Service |
||||
orgService org.Service |
||||
userService user.Service |
||||
logger log.Logger |
||||
reIndexCh chan struct{} |
||||
features featuremgmt.FeatureToggles |
||||
resourceClient resource.ResourceClient |
||||
} |
||||
|
||||
func (s *StandardSearchService) IsReady(ctx context.Context, orgId int64) IsSearchReadyResponse { |
||||
return IsSearchReadyResponse{IsReady: true} |
||||
} |
||||
|
||||
func ProvideService(cfg *setting.Cfg, sql db.DB, entityEventStore store.EntityEventsService, |
||||
ac accesscontrol.Service, tracer tracing.Tracer, features featuremgmt.FeatureToggles, orgService org.Service, |
||||
userService user.Service, folderStore folder.Store, resourceClient resource.ResourceClient) SearchService { |
||||
logger := log.New("searchV3") |
||||
s := &StandardSearchService{ |
||||
cfg: cfg, |
||||
sql: sql, |
||||
ac: ac, |
||||
logger: logger, |
||||
reIndexCh: make(chan struct{}, 1), |
||||
orgService: orgService, |
||||
userService: userService, |
||||
features: features, |
||||
resourceClient: resourceClient, |
||||
} |
||||
return s |
||||
} |
||||
|
||||
func (s *StandardSearchService) IsDisabled() bool { |
||||
return !s.features.IsEnabledGlobally(featuremgmt.FlagPanelTitleSearch) |
||||
} |
||||
|
||||
func (s *StandardSearchService) Run(ctx context.Context) error { |
||||
// TODO: implement this? ( copied from pkg/services/searchV2/service.go )
|
||||
// orgQuery := &org.SearchOrgsQuery{}
|
||||
// result, err := s.orgService.Search(ctx, orgQuery)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("can't get org list: %w", err)
|
||||
// }
|
||||
// orgIDs := make([]int64, 0, len(result))
|
||||
// for _, org := range result {
|
||||
// orgIDs = append(orgIDs, org.ID)
|
||||
// }
|
||||
// TODO: do we need to initialize the bleve index again ( should be initialized on startup )?
|
||||
// return s.dashboardIndex.run(ctx, orgIDs, s.reIndexCh)
|
||||
return nil |
||||
} |
||||
|
||||
func (s *StandardSearchService) TriggerReIndex() { |
||||
select { |
||||
case s.reIndexCh <- struct{}{}: |
||||
default: |
||||
// channel is full => re-index will happen soon anyway.
|
||||
} |
||||
} |
||||
|
||||
func (s *StandardSearchService) getUser(ctx context.Context, backendUser *backend.User, orgId int64) (*user.SignedInUser, error) { |
||||
// TODO: get user & user's permissions from the request context
|
||||
var usr *user.SignedInUser |
||||
if s.cfg.AnonymousEnabled && backendUser.Email == "" && backendUser.Login == "" { |
||||
getOrg := org.GetOrgByNameQuery{Name: s.cfg.AnonymousOrgName} |
||||
orga, err := s.orgService.GetByName(ctx, &getOrg) |
||||
if err != nil { |
||||
s.logger.Error("Anonymous access organization error.", "org_name", s.cfg.AnonymousOrgName, "error", err) |
||||
return nil, err |
||||
} |
||||
|
||||
usr = &user.SignedInUser{ |
||||
OrgID: orga.ID, |
||||
OrgName: orga.Name, |
||||
OrgRole: org.RoleType(s.cfg.AnonymousOrgRole), |
||||
IsAnonymous: true, |
||||
} |
||||
} else { |
||||
getSignedInUserQuery := &user.GetSignedInUserQuery{ |
||||
Login: backendUser.Login, |
||||
Email: backendUser.Email, |
||||
OrgID: orgId, |
||||
} |
||||
var err error |
||||
usr, err = s.userService.GetSignedInUser(ctx, getSignedInUserQuery) |
||||
if err != nil { |
||||
s.logger.Error("Error while retrieving user", "error", err, "email", backendUser.Email, "login", getSignedInUserQuery.Login) |
||||
return nil, errors.New("auth error") |
||||
} |
||||
|
||||
if usr == nil { |
||||
s.logger.Error("No user found", "email", backendUser.Email) |
||||
return nil, errors.New("auth error") |
||||
} |
||||
} |
||||
|
||||
if usr.Permissions == nil { |
||||
usr.Permissions = make(map[int64]map[string][]string) |
||||
} |
||||
|
||||
if _, ok := usr.Permissions[orgId]; ok { |
||||
// permissions as part of the `s.sql.GetSignedInUser` query - return early
|
||||
return usr, nil |
||||
} |
||||
|
||||
// TODO: ensure this is cached
|
||||
permissions, err := s.ac.GetUserPermissions(ctx, usr, |
||||
accesscontrol.Options{ReloadCache: false}) |
||||
if err != nil { |
||||
s.logger.Error("Failed to retrieve user permissions", "error", err, "email", backendUser.Email) |
||||
return nil, errors.New("auth error") |
||||
} |
||||
|
||||
usr.Permissions[orgId] = accesscontrol.GroupScopesByActionContext(ctx, permissions) |
||||
return usr, nil |
||||
} |
||||
|
||||
func (s *StandardSearchService) DoQuery(ctx context.Context, user *backend.User, orgID int64, q Query) *backend.DataResponse { |
||||
signedInUser, err := s.getUser(ctx, user, orgID) |
||||
if err != nil { |
||||
return &backend.DataResponse{Error: err} |
||||
} |
||||
|
||||
return s.doQuery(ctx, signedInUser, orgID, q) |
||||
} |
||||
|
||||
func (s *StandardSearchService) doQuery(ctx context.Context, signedInUser *user.SignedInUser, orgID int64, q Query) *backend.DataResponse { |
||||
return s.doSearchQuery(ctx, q, s.cfg.AppSubURL, orgID) |
||||
} |
||||
|
||||
func (s *StandardSearchService) doSearchQuery(ctx context.Context, qry Query, _ string, orgID int64) *backend.DataResponse { |
||||
response := &backend.DataResponse{} |
||||
|
||||
// will use stack id for cloud and org id for on-prem
|
||||
tenantId := request.GetNamespaceMapper(s.cfg)(orgID) |
||||
|
||||
req := newSearchRequest(tenantId, qry) |
||||
res, err := s.resourceClient.Search(ctx, req) |
||||
if err != nil { |
||||
return s.error(err, "Failed to search resources", response) |
||||
} |
||||
|
||||
frame, err := loadSearchResponse(res, s) |
||||
if err != nil { |
||||
return s.error(err, "Failed to load search response", response) |
||||
} |
||||
|
||||
response.Frames = append(response.Frames, frame) |
||||
|
||||
if len(res.Groups) > 0 { |
||||
tagsFrame := loadTagsResponse(res) |
||||
response.Frames = append(response.Frames, tagsFrame) |
||||
} |
||||
|
||||
return response |
||||
} |
||||
|
||||
func (s *StandardSearchService) error(err error, message string, response *backend.DataResponse) *backend.DataResponse { |
||||
s.logger.Error(message, "error", err) |
||||
response.Error = err |
||||
return response |
||||
} |
||||
|
||||
func loadSearchResponse(res *resource.SearchResponse, s *StandardSearchService) (*data.Frame, error) { |
||||
frame := newSearchFrame(res) |
||||
for _, r := range res.Items { |
||||
doc, err := getDoc(r.Value) |
||||
if err != nil { |
||||
s.logger.Error("Failed to parse doc", "error", err) |
||||
return nil, err |
||||
} |
||||
kind := strings.ToLower(doc.Kind) |
||||
link := dashboardPageItemLink(doc, s.cfg.AppSubURL) |
||||
frame.AppendRow(kind, doc.UID, doc.Spec.Title, link, doc.Spec.Tags, doc.FolderID) |
||||
} |
||||
return frame, nil |
||||
} |
||||
|
||||
func loadTagsResponse(res *resource.SearchResponse) *data.Frame { |
||||
tagsFrame := newTagsFrame() |
||||
for _, grp := range res.Groups { |
||||
tagsFrame.AppendRow(grp.Name, grp.Count) |
||||
} |
||||
return tagsFrame |
||||
} |
||||
|
||||
func newSearchFrame(res *resource.SearchResponse) *data.Frame { |
||||
fUID := newField("uid", data.FieldTypeString) |
||||
fKind := newField("kind", data.FieldTypeString) |
||||
fName := newField("name", data.FieldTypeString) |
||||
fLocation := newField("location", data.FieldTypeString) |
||||
fTags := newField("tags", data.FieldTypeNullableJSON) |
||||
fURL := newField("url", data.FieldTypeString) |
||||
fURL.Config = &data.FieldConfig{ |
||||
Links: []data.DataLink{ |
||||
{Title: "link", URL: "${__value.text}"}, |
||||
}, |
||||
} |
||||
|
||||
frame := data.NewFrame("Query results", fKind, fUID, fName, fURL, fTags, fLocation) |
||||
|
||||
frame.SetMeta(&data.FrameMeta{ |
||||
Type: "search-results", |
||||
Custom: &customMeta{ |
||||
Count: uint64(len(res.Items)), |
||||
}, |
||||
}) |
||||
return frame |
||||
} |
||||
|
||||
func newTagsFrame() *data.Frame { |
||||
fTag := newField("tag", data.FieldTypeString) |
||||
fCount := newField("count", data.FieldTypeInt64) |
||||
return data.NewFrame("tags", fTag, fCount) |
||||
} |
||||
|
||||
func dashboardPageItemLink(doc *DashboardListDoc, subURL string) string { |
||||
if doc.FolderID == "" { |
||||
return fmt.Sprintf("%s/d/%s/%s", subURL, doc.Name, doc.Namespace) |
||||
} |
||||
return fmt.Sprintf("%s/dashboards/f/%s/%s", subURL, doc.Name, doc.Namespace) |
||||
} |
||||
|
||||
type customMeta struct { |
||||
Count uint64 `json:"count"` |
||||
MaxScore float64 `json:"max_score,omitempty"` |
||||
SortBy string `json:"sortBy,omitempty"` |
||||
} |
||||
|
||||
type DashboardListDoc struct { |
||||
UID string `json:"Uid"` |
||||
Group string `json:"Group"` |
||||
Namespace string `json:"Namespace"` |
||||
Kind string `json:"Kind"` |
||||
Name string `json:"Name"` |
||||
CreatedAt time.Time `json:"CreatedAt"` |
||||
CreatedBy string `json:"CreatedBy"` |
||||
UpdatedAt time.Time `json:"UpdatedAt"` |
||||
UpdatedBy string `json:"UpdatedBy"` |
||||
FolderID string `json:"FolderId"` |
||||
Spec struct { |
||||
Title string `json:"title"` |
||||
Tags *json.RawMessage `json:"tags"` |
||||
} `json:"Spec"` |
||||
} |
||||
|
||||
func getDoc(data []byte) (*DashboardListDoc, error) { |
||||
res := &DashboardListDoc{} |
||||
err := json.Unmarshal(data, res) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return res, nil |
||||
} |
||||
|
||||
func newSearchRequest(tenant string, qry Query) *resource.SearchRequest { |
||||
groupBy := make([]*resource.GroupBy, len(qry.Facet)) |
||||
for _, g := range qry.Facet { |
||||
groupBy = append(groupBy, &resource.GroupBy{Name: g.Field, Limit: int64(g.Limit)}) |
||||
} |
||||
|
||||
return &resource.SearchRequest{ |
||||
Tenant: tenant, |
||||
Query: qry.Query, |
||||
Limit: int64(qry.Limit), |
||||
Offset: int64(qry.From), |
||||
Kind: qry.Kind, |
||||
SortBy: []string{sortField(qry.Sort)}, |
||||
GroupBy: groupBy, |
||||
Filters: qry.Tags, |
||||
} |
||||
} |
||||
|
||||
const ( |
||||
sortSuffix = "_sort" |
||||
descending = "-" |
||||
) |
||||
|
||||
func sortField(sort string) string { |
||||
sf := strings.TrimSuffix(sort, sortSuffix) |
||||
if !strings.HasPrefix(sf, descending) { |
||||
return dashboardListFieldMapping[sf] |
||||
} |
||||
sf = strings.TrimPrefix(sf, descending) |
||||
sf = dashboardListFieldMapping[sf] |
||||
return descending + sf |
||||
} |
||||
|
||||
// mapping of dashboard list fields to search doc fields
|
||||
var dashboardListFieldMapping = map[string]string{ |
||||
"name": "title", |
||||
} |
||||
|
||||
func newField(name string, typ data.FieldType) *data.Field { |
||||
f := data.NewFieldFromFieldType(typ, 0) |
||||
f.Name = name |
||||
return f |
||||
} |
@ -1,49 +0,0 @@ |
||||
package unifiedSearch |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
|
||||
"github.com/grafana/grafana/pkg/registry" |
||||
"github.com/grafana/grafana/pkg/services/user" |
||||
) |
||||
|
||||
type FacetField struct { |
||||
Field string `json:"field"` |
||||
Limit int `json:"limit,omitempty"` // explicit page size
|
||||
} |
||||
|
||||
type Query struct { |
||||
Query string `json:"query"` |
||||
Location string `json:"location,omitempty"` // parent folder ID
|
||||
Sort string `json:"sort,omitempty"` // field ASC/DESC
|
||||
Datasource string `json:"ds_uid,omitempty"` // "datasource" collides with the JSON value at the same level :()
|
||||
DatasourceType string `json:"ds_type,omitempty"` |
||||
Tags []string `json:"tags,omitempty"` |
||||
Kind []string `json:"kind,omitempty"` |
||||
PanelType string `json:"panel_type,omitempty"` |
||||
UIDs []string `json:"uid,omitempty"` |
||||
Explain bool `json:"explain,omitempty"` // adds details on why document matched
|
||||
WithAllowedActions bool `json:"withAllowedActions,omitempty"` // adds allowed actions per entity
|
||||
Facet []FacetField `json:"facet,omitempty"` |
||||
SkipLocation bool `json:"skipLocation,omitempty"` |
||||
HasPreview string `json:"hasPreview,omitempty"` // the light|dark theme
|
||||
Limit int `json:"limit,omitempty"` // explicit page size
|
||||
From int `json:"from,omitempty"` // for paging
|
||||
} |
||||
|
||||
type IsSearchReadyResponse struct { |
||||
IsReady bool |
||||
Reason string // initial-indexing-ongoing, org-indexing-ongoing
|
||||
} |
||||
|
||||
type SearchService interface { |
||||
registry.CanBeDisabled |
||||
registry.BackgroundService |
||||
DoQuery(ctx context.Context, user *backend.User, orgId int64, query Query) *backend.DataResponse |
||||
doQuery(ctx context.Context, user *user.SignedInUser, orgId int64, query Query) *backend.DataResponse |
||||
IsReady(ctx context.Context, orgId int64) IsSearchReadyResponse |
||||
// RegisterDashboardIndexExtender(ext DashboardIndexExtender)
|
||||
TriggerReIndex() |
||||
} |
@ -1,578 +0,0 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"context" |
||||
golog "log" |
||||
"path/filepath" |
||||
reflect "reflect" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/blevesearch/bleve/v2" |
||||
"github.com/blevesearch/bleve/v2/search" |
||||
"github.com/google/uuid" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/infra/tracing" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"golang.org/x/sync/errgroup" |
||||
) |
||||
|
||||
const tracingPrexfixIndex = "unified_storage.index." |
||||
const specFieldPrefix = "Spec." |
||||
const descendingPrefix = "-" |
||||
|
||||
type Shard struct { |
||||
index bleve.Index |
||||
path string |
||||
batch *bleve.Batch |
||||
} |
||||
|
||||
type Opts struct { |
||||
Workers int // This controls how many goroutines are used to index objects
|
||||
BatchSize int // This is the batch size for how many objects to add to the index at once
|
||||
ListLimit int // This is how big the List page size is. If the response size is too large, the number of items will be limited by the server.
|
||||
IndexDir string // The directory where the indexes for each tenant are stored
|
||||
} |
||||
|
||||
type Index struct { |
||||
shardMutex sync.RWMutex |
||||
shards map[string]*Shard |
||||
opts Opts |
||||
s *server |
||||
log log.Logger |
||||
tracer tracing.Tracer |
||||
} |
||||
|
||||
func NewIndex(s *server, opts Opts, tracer tracing.Tracer) *Index { |
||||
return &Index{ |
||||
shardMutex: sync.RWMutex{}, |
||||
s: s, |
||||
opts: opts, |
||||
shards: make(map[string]*Shard), |
||||
log: log.New("unifiedstorage.search.index"), |
||||
tracer: tracer, |
||||
} |
||||
} |
||||
|
||||
// IndexBatches goes through all the shards and indexes their batches if they are large enough
|
||||
func (i *Index) IndexBatches(ctx context.Context, maxSize int, tenants []string) error { |
||||
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"IndexBatches") |
||||
defer span.End() |
||||
|
||||
group := errgroup.Group{} |
||||
group.SetLimit(i.opts.Workers) |
||||
totalBatchesIndexed := 0 |
||||
|
||||
for _, tenant := range tenants { |
||||
shard, err := i.getShard(tenant) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
// Index the batch if it is large enough
|
||||
if shard.batch.Size() >= maxSize { |
||||
totalBatchesIndexed++ |
||||
group.Go(func() error { |
||||
i.log.Debug("indexing batch for shard", "tenant", tenant, "size", shard.batch.Size()) |
||||
err = shard.index.Batch(shard.batch) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
shard.batch.Reset() |
||||
return nil |
||||
}) |
||||
} |
||||
} |
||||
|
||||
err := group.Wait() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
span.AddEvent("batches indexed", trace.WithAttributes(attribute.Int("batches_indexed", totalBatchesIndexed))) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// AddToBatches adds resources to their respective shard's batch
|
||||
// returns a list of tenants that have changes
|
||||
func (i *Index) AddToBatches(ctx context.Context, list *ListResponse) ([]string, error) { |
||||
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"AddToBatches") |
||||
defer span.End() |
||||
|
||||
tenantsWithChanges := make(map[string]bool) |
||||
for _, obj := range list.Items { |
||||
// Transform the raw resource into a more generic indexable resource
|
||||
res, err := NewIndexedResource(obj.Value) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
shard, err := i.getShard(res.Namespace) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
i.log.Debug("indexing resource in batch", "batch_count", len(list.Items), "kind", res.Kind, "tenant", res.Namespace) |
||||
|
||||
err = shard.batch.Index(res.Uid, res) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if _, ok := tenantsWithChanges[res.Namespace]; !ok { |
||||
tenantsWithChanges[res.Namespace] = true |
||||
} |
||||
} |
||||
|
||||
tenants := make([]string, 0, len(tenantsWithChanges)) |
||||
for tenant := range tenantsWithChanges { |
||||
tenants = append(tenants, tenant) |
||||
} |
||||
|
||||
return tenants, nil |
||||
} |
||||
|
||||
func (i *Index) Init(ctx context.Context) error { |
||||
logger := i.log.FromContext(ctx) |
||||
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Init") |
||||
defer span.End() |
||||
|
||||
start := time.Now().Unix() |
||||
group := errgroup.Group{} |
||||
group.SetLimit(i.opts.Workers) |
||||
|
||||
totalObjects := 0 |
||||
// Get all tenants currently in Unified Storage
|
||||
tenants, err := i.s.backend.Namespaces(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
for _, tenant := range tenants { |
||||
group.Go(func() error { |
||||
logger.Info("initializing index for tenant", "tenant", tenant) |
||||
objs, err := i.InitForTenant(ctx, tenant) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
totalObjects += objs |
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
err = group.Wait() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
//index all remaining batches for all tenants
|
||||
logger.Info("indexing remaining batches", "shards", len(i.shards)) |
||||
err = i.IndexBatches(ctx, 1, i.allTenants()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
end := time.Now().Unix() |
||||
totalDocCount := getTotalDocCount(i) |
||||
logger.Info("Initial indexing finished", "seconds", float64(end-start), "objs_fetched", totalObjects, "objs_indexed", totalDocCount) |
||||
span.AddEvent( |
||||
"indexing finished", |
||||
trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalDocCount))), |
||||
trace.WithAttributes(attribute.Int64("objects_fetched", int64(totalObjects))), |
||||
) |
||||
if IndexServerMetrics != nil { |
||||
IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start)) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (i *Index) InitForTenant(ctx context.Context, namespace string) (int, error) { |
||||
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"InitForTenant") |
||||
defer span.End() |
||||
logger := i.log.FromContext(ctx) |
||||
|
||||
resourceTypes := fetchResourceTypes() |
||||
totalObjectsFetched := 0 |
||||
for _, rt := range resourceTypes { |
||||
logger.Debug("indexing resource", "kind", rt.Kind, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers, "namespace", namespace) |
||||
r := &ListRequest{Options: rt.ListOptions, Limit: int64(i.opts.ListLimit)} |
||||
r.Options.Key.Namespace = namespace // scope the list to a tenant or this will take forever when US has 1M+ resources
|
||||
|
||||
// Paginate through the list of resources and index each page
|
||||
for { |
||||
logger.Debug("fetching resource list", "kind", rt.Kind, "namespace", namespace) |
||||
list, err := i.s.List(ctx, r) |
||||
if err != nil { |
||||
return totalObjectsFetched, err |
||||
} |
||||
|
||||
// Record the number of objects indexed for the kind
|
||||
IndexServerMetrics.IndexedKinds.WithLabelValues(rt.Kind).Add(float64(len(list.Items))) |
||||
|
||||
totalObjectsFetched += len(list.Items) |
||||
|
||||
logger.Debug("indexing batch", "kind", rt.Kind, "count", len(list.Items), "namespace", namespace) |
||||
//add changes to batches for shards with changes in the List
|
||||
err = i.writeBatch(ctx, list) |
||||
if err != nil { |
||||
return totalObjectsFetched, err |
||||
} |
||||
|
||||
if list.NextPageToken == "" { |
||||
break |
||||
} |
||||
|
||||
r.NextPageToken = list.NextPageToken |
||||
} |
||||
} |
||||
|
||||
span.AddEvent( |
||||
"indexing finished for tenant", |
||||
trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched))), |
||||
trace.WithAttributes(attribute.String("tenant", namespace)), |
||||
) |
||||
|
||||
return totalObjectsFetched, nil |
||||
} |
||||
|
||||
func (i *Index) writeBatch(ctx context.Context, list *ListResponse) error { |
||||
tenants, err := i.AddToBatches(ctx, list) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Index the batches for tenants with changes if the batch is large enough
|
||||
err = i.IndexBatches(ctx, i.opts.BatchSize, tenants) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (i *Index) Index(ctx context.Context, data *Data) error { |
||||
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Index") |
||||
defer span.End() |
||||
logger := i.log.FromContext(ctx) |
||||
|
||||
// Transform the raw resource into a more generic indexable resource
|
||||
res, err := NewIndexedResource(data.Value.Value) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
tenant := res.Namespace |
||||
logger.Debug("indexing resource for tenant", "res", string(data.Value.Value), "tenant", tenant) |
||||
|
||||
// if tenant doesn't exist, they may have been created during initial indexing
|
||||
_, ok := i.shards[tenant] |
||||
if !ok { |
||||
i.log.Info("tenant not found, initializing their index", "tenant", tenant) |
||||
_, err = i.InitForTenant(ctx, tenant) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
shard, err := i.getShard(tenant) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = shard.index.Index(res.Uid, res) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
//record the kind of resource that was indexed
|
||||
IndexServerMetrics.IndexedKinds.WithLabelValues(res.Kind).Inc() |
||||
|
||||
// record latency from when event was created to when it was indexed
|
||||
latencySeconds := float64(time.Now().UnixMicro()-data.Value.ResourceVersion) / 1e6 |
||||
if latencySeconds > 5 { |
||||
logger.Warn("high index latency", "latency", latencySeconds) |
||||
} |
||||
if IndexServerMetrics != nil { |
||||
IndexServerMetrics.IndexLatency.WithLabelValues(data.Key.Resource).Observe(latencySeconds) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (i *Index) Delete(ctx context.Context, uid string, key *ResourceKey) error { |
||||
_, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Delete") |
||||
defer span.End() |
||||
|
||||
shard, err := i.getShard(key.Namespace) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = shard.index.Delete(uid) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
IndexServerMetrics.IndexedKinds.WithLabelValues(key.Resource).Dec() |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (i *Index) Search(ctx context.Context, request *SearchRequest) (*IndexResults, error) { |
||||
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Search") |
||||
defer span.End() |
||||
logger := i.log.FromContext(ctx) |
||||
|
||||
if request.Tenant == "" { |
||||
request.Tenant = "default" |
||||
} |
||||
shard, err := i.getShard(request.Tenant) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
docCount, err := shard.index.DocCount() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
logger.Info("got index for tenant", "tenant", request.Tenant, "docCount", docCount) |
||||
|
||||
fields, _ := shard.index.Fields() |
||||
logger.Debug("indexed fields", "fields", fields) |
||||
|
||||
// use 10 as a default limit for now
|
||||
if request.Limit <= 0 { |
||||
request.Limit = 10 |
||||
} |
||||
|
||||
textQuery := bleve.NewQueryStringQuery(request.Query) |
||||
query := bleve.NewConjunctionQuery(textQuery) |
||||
|
||||
if len(request.Kind) > 0 { |
||||
// apply OR condition filter for each kind ( dashboard, folder, etc )
|
||||
orQuery := bleve.NewDisjunctionQuery() |
||||
for _, term := range request.Kind { |
||||
termQuery := bleve.NewTermQuery(term) |
||||
orQuery.AddQuery(termQuery) |
||||
} |
||||
query.AddQuery(orQuery) |
||||
} |
||||
|
||||
if len(request.Filters) > 0 { |
||||
orQuery := bleve.NewDisjunctionQuery() |
||||
for _, filter := range request.Filters { |
||||
matchQuery := bleve.NewMatchQuery(filter) |
||||
orQuery.AddQuery(matchQuery) |
||||
} |
||||
query.AddQuery(orQuery) |
||||
} |
||||
|
||||
req := bleve.NewSearchRequest(query) |
||||
if len(request.SortBy) > 0 { |
||||
sorting := getSortFields(request) |
||||
req.SortBy(sorting) |
||||
} |
||||
|
||||
for _, group := range request.GroupBy { |
||||
facet := bleve.NewFacetRequest(specFieldPrefix+group.Name, int(group.Limit)) |
||||
req.AddFacet(group.Name+"_facet", facet) |
||||
} |
||||
|
||||
req.From = int(request.Offset) |
||||
req.Size = int(request.Limit) |
||||
|
||||
req.Fields = []string{"*"} // return all indexed fields in search results
|
||||
|
||||
logger.Info("searching index", "query", request.Query, "tenant", request.Tenant) |
||||
res, err := shard.index.Search(req) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
hits := res.Hits |
||||
|
||||
logger.Info("got search results", "hits", hits) |
||||
|
||||
results := make([]IndexedResource, len(hits)) |
||||
for resKey, hit := range hits { |
||||
ir := IndexedResource{}.FromSearchHit(hit) |
||||
results[resKey] = ir |
||||
} |
||||
|
||||
groups := []*Group{} |
||||
for _, group := range request.GroupBy { |
||||
groupByFacet := res.Facets[group.Name+"_facet"] |
||||
terms := getTermFacets(groupByFacet.Terms) |
||||
for _, term := range terms { |
||||
groups = append(groups, &Group{Name: term.Term, Count: int64(term.Count)}) |
||||
} |
||||
} |
||||
|
||||
return &IndexResults{Values: results, Groups: groups}, nil |
||||
} |
||||
|
||||
// Count returns the total doc count
|
||||
func (i *Index) Count() (int, error) { |
||||
total := 0 |
||||
for _, shard := range i.shards { |
||||
count, err := shard.index.DocCount() |
||||
if err != nil { |
||||
i.log.Error("failed to get doc count", "error", err) |
||||
} |
||||
total += int(count) |
||||
} |
||||
return total, nil |
||||
} |
||||
|
||||
// allTenants returns a list of all tenants in the index
|
||||
func (i *Index) allTenants() []string { |
||||
tenants := make([]string, 0, len(i.shards)) |
||||
for tenant := range i.shards { |
||||
tenants = append(tenants, tenant) |
||||
} |
||||
return tenants |
||||
} |
||||
|
||||
func (i *Index) getShard(tenant string) (*Shard, error) { |
||||
i.shardMutex.Lock() |
||||
defer i.shardMutex.Unlock() |
||||
|
||||
shard, ok := i.shards[tenant] |
||||
if ok { |
||||
return shard, nil |
||||
} |
||||
|
||||
index, path, err := i.createIndex() |
||||
if err != nil { |
||||
return &Shard{}, err |
||||
} |
||||
|
||||
shard = &Shard{ |
||||
index: index, |
||||
path: path, |
||||
batch: index.NewBatch(), |
||||
} |
||||
i.shards[tenant] = shard |
||||
|
||||
return shard, nil |
||||
} |
||||
|
||||
func (i *Index) createIndex() (bleve.Index, string, error) { |
||||
if i.opts.IndexDir == "" { |
||||
return createInMemoryIndex() |
||||
} |
||||
return createFileIndex(i.opts.IndexDir) |
||||
} |
||||
|
||||
var mappings = createIndexMappings() |
||||
|
||||
// less memory intensive alternative for larger indexes with less tenants (on-prem)
|
||||
func createFileIndex(path string) (bleve.Index, string, error) { |
||||
indexPath := filepath.Join(path, uuid.New().String()) |
||||
index, err := bleve.New(indexPath, mappings) |
||||
if err != nil { |
||||
golog.Fatalf("Failed to create index: %v", err) |
||||
} |
||||
return index, indexPath, err |
||||
} |
||||
|
||||
// faster indexing when there are many tenants with smaller batches (cloud)
|
||||
func createInMemoryIndex() (bleve.Index, string, error) { |
||||
index, err := bleve.NewMemOnly(mappings) |
||||
return index, "", err |
||||
} |
||||
|
||||
type IndexerListOptions struct { |
||||
*ListOptions |
||||
Kind string |
||||
} |
||||
|
||||
// TODO - fetch from api
|
||||
// Folders need to be indexed first as dashboards depend on them to be indexed already.
|
||||
func fetchResourceTypes() []*IndexerListOptions { |
||||
return []*IndexerListOptions{ |
||||
{ |
||||
ListOptions: &ListOptions{ |
||||
Key: &ResourceKey{ |
||||
Group: "folder.grafana.app", |
||||
Resource: "folders", |
||||
}, |
||||
}, |
||||
Kind: "Folder", |
||||
}, |
||||
{ |
||||
ListOptions: &ListOptions{ |
||||
Key: &ResourceKey{ |
||||
Group: "playlist.grafana.app", |
||||
Resource: "playlists", |
||||
}, |
||||
}, |
||||
Kind: "Playlist", |
||||
}, |
||||
{ |
||||
ListOptions: &ListOptions{ |
||||
Key: &ResourceKey{ |
||||
Group: "dashboard.grafana.app", |
||||
Resource: "dashboards", |
||||
}, |
||||
}, |
||||
Kind: "Dashboard", |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func getSortFields(request *SearchRequest) []string { |
||||
sorting := make([]string, 0, len(request.SortBy)) |
||||
for _, sort := range request.SortBy { |
||||
if IsSpecField(sort) { |
||||
descending := strings.HasPrefix(sort, descendingPrefix) |
||||
sort = strings.TrimPrefix(sort, descendingPrefix) |
||||
sortOrder := "" |
||||
if descending { |
||||
sortOrder = descendingPrefix |
||||
} |
||||
sorting = append(sorting, sortOrder+specFieldPrefix+sort) |
||||
continue |
||||
} |
||||
sorting = append(sorting, sort) |
||||
} |
||||
return sorting |
||||
} |
||||
|
||||
func getTermFacets(f *search.TermFacets) []*search.TermFacet { |
||||
e := reflect.ValueOf(f).Elem() |
||||
if e.Kind() != reflect.Struct { |
||||
return []*search.TermFacet{} |
||||
} |
||||
// workaround - this field is private, so we need to use reflection to access it
|
||||
// TODO - fork bleve and create a pr to make this field accessible
|
||||
v := e.FieldByName("termLookup") |
||||
if v.Kind() != reflect.Map { |
||||
return []*search.TermFacet{} |
||||
} |
||||
|
||||
terms := []*search.TermFacet{} |
||||
termsRange := v.MapRange() |
||||
for termsRange.Next() { |
||||
value := termsRange.Value() |
||||
// facet value is *search.TermFacet
|
||||
if value.Kind() == reflect.Pointer { |
||||
val := value.Elem() |
||||
if val.Kind() == reflect.Struct { |
||||
group := newTerm(val) |
||||
terms = append(terms, group) |
||||
} |
||||
} |
||||
} |
||||
return terms |
||||
} |
||||
|
||||
func newTerm(val reflect.Value) *search.TermFacet { |
||||
term := &search.TermFacet{} |
||||
|
||||
for i := 0; i < val.NumField(); i++ { |
||||
field := val.Field(i) |
||||
if field.Kind() == reflect.String { |
||||
term.Term = field.String() |
||||
} |
||||
if field.Kind() == reflect.Int { |
||||
term.Count = int(field.Int()) |
||||
} |
||||
} |
||||
return term |
||||
} |
@ -1,253 +0,0 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"strings" |
||||
|
||||
"github.com/blevesearch/bleve/v2" |
||||
"github.com/blevesearch/bleve/v2/mapping" |
||||
"github.com/blevesearch/bleve/v2/search" |
||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
||||
) |
||||
|
||||
type IndexedResource struct { |
||||
Uid string |
||||
Group string |
||||
Namespace string |
||||
Kind string |
||||
Name string |
||||
Title string |
||||
CreatedAt string |
||||
CreatedBy string |
||||
UpdatedAt string |
||||
UpdatedBy string |
||||
FolderId string |
||||
Spec map[string]any |
||||
} |
||||
|
||||
type IndexResults struct { |
||||
Values []IndexedResource |
||||
Groups []*Group |
||||
} |
||||
|
||||
func (ir IndexedResource) FromSearchHit(hit *search.DocumentMatch) IndexedResource { |
||||
ir.Uid = fieldValue("Uid", hit) |
||||
ir.Kind = fieldValue("Kind", hit) |
||||
ir.Name = fieldValue("Name", hit) |
||||
ir.Namespace = fieldValue("Namespace", hit) |
||||
ir.Group = fieldValue("Group", hit) |
||||
ir.CreatedAt = fieldValue("CreatedAt", hit) |
||||
ir.CreatedBy = fieldValue("CreatedBy", hit) |
||||
ir.UpdatedAt = fieldValue("UpdatedAt", hit) |
||||
ir.UpdatedBy = fieldValue("UpdatedBy", hit) |
||||
ir.Title = fieldValue("Title", hit) |
||||
|
||||
// add indexed spec fields to search results
|
||||
specResult := map[string]any{} |
||||
for k, v := range hit.Fields { |
||||
if strings.HasPrefix(k, "Spec.") { |
||||
specKey := strings.TrimPrefix(k, "Spec.") |
||||
specResult[specKey] = v |
||||
} |
||||
ir.Spec = specResult |
||||
} |
||||
|
||||
return ir |
||||
} |
||||
|
||||
func fieldValue(field string, hit *search.DocumentMatch) string { |
||||
if val, ok := hit.Fields[field]; ok { |
||||
return val.(string) |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
// NewIndexedResource creates a new IndexedResource from a raw resource.
|
||||
// rawResource is the raw json for the resource from unified storage.
|
||||
func NewIndexedResource(rawResource []byte) (*IndexedResource, error) { |
||||
k8sObj := unstructured.Unstructured{} |
||||
err := k8sObj.UnmarshalJSON(rawResource) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
meta, err := utils.MetaAccessor(&k8sObj) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
ir := &IndexedResource{} |
||||
ir.Uid = string(meta.GetUID()) |
||||
ir.Name = meta.GetName() |
||||
ir.Title = meta.FindTitle("") |
||||
ir.Namespace = meta.GetNamespace() |
||||
ir.Group = meta.GetGroupVersionKind().Group |
||||
ir.Kind = meta.GetGroupVersionKind().Kind |
||||
ir.CreatedAt = meta.GetCreationTimestamp().Time.Format("2006-01-02T15:04:05Z") |
||||
ir.CreatedBy = meta.GetCreatedBy() |
||||
updatedAt, err := meta.GetUpdatedTimestamp() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if updatedAt != nil { |
||||
ir.UpdatedAt = updatedAt.Format("2006-01-02T15:04:05Z") |
||||
} else { |
||||
ir.UpdatedAt = ir.CreatedAt |
||||
} |
||||
ir.UpdatedBy = meta.GetUpdatedBy() |
||||
spec, err := meta.GetSpec() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
specValues, ok := spec.(map[string]any) |
||||
if ok { |
||||
ir.Spec = specValues |
||||
} |
||||
|
||||
return ir, nil |
||||
} |
||||
|
||||
func createIndexMappings() *mapping.IndexMappingImpl { |
||||
// Create the index mapping
|
||||
indexMapping := bleve.NewIndexMapping() |
||||
// Create an individual index mapping for each kind
|
||||
indexMapping.TypeField = "Kind" |
||||
|
||||
// for all kinds, create their index mappings
|
||||
for k := range getSpecObjectMappings() { |
||||
objMapping := createIndexMappingForKind(k) |
||||
indexMapping.AddDocumentMapping(k, objMapping) |
||||
} |
||||
|
||||
return indexMapping |
||||
} |
||||
|
||||
func createIndexMappingForKind(resourceKind string) *mapping.DocumentMapping { |
||||
// create mappings for top level fields
|
||||
baseFields := map[string]*mapping.FieldMapping{ |
||||
"Uid": bleve.NewTextFieldMapping(), |
||||
"Group": bleve.NewTextFieldMapping(), |
||||
"Namespace": bleve.NewTextFieldMapping(), |
||||
"Kind": bleve.NewTextFieldMapping(), |
||||
"Name": bleve.NewTextFieldMapping(), |
||||
"Title": bleve.NewTextFieldMapping(), |
||||
"CreatedAt": bleve.NewDateTimeFieldMapping(), |
||||
"CreatedBy": bleve.NewTextFieldMapping(), |
||||
"UpdatedAt": bleve.NewDateTimeFieldMapping(), |
||||
"UpdatedBy": bleve.NewTextFieldMapping(), |
||||
"FolderId": bleve.NewTextFieldMapping(), |
||||
} |
||||
|
||||
// Spec is different for all resources, so we need to generate the spec mapping based on the kind
|
||||
specMapping := createSpecObjectMapping(resourceKind) |
||||
|
||||
objectMapping := bleve.NewDocumentMapping() |
||||
objectMapping.Dynamic = false // only map fields that we have explicitly defined
|
||||
|
||||
// map spec
|
||||
objectMapping.AddSubDocumentMapping("Spec", specMapping) |
||||
|
||||
// map top level fields
|
||||
for k, v := range baseFields { |
||||
objectMapping.AddFieldMappingsAt(k, v) |
||||
} |
||||
|
||||
return objectMapping |
||||
} |
||||
|
||||
type SpecFieldMapping struct { |
||||
Field string |
||||
Type string |
||||
} |
||||
|
||||
// Right now we are hardcoding which spec fields to index for each kind
|
||||
// In the future, which fields to index will be defined on the resources themselves by their owners.
|
||||
func getSpecObjectMappings() map[string][]SpecFieldMapping { |
||||
return specMappings |
||||
} |
||||
|
||||
// Generate the spec field mapping for a given kind
|
||||
func createSpecObjectMapping(kind string) *mapping.DocumentMapping { |
||||
specMapping := bleve.NewDocumentMapping() |
||||
specMapping.Dynamic = false |
||||
|
||||
// get the fields to index for the kind
|
||||
mappings := getSpecObjectMappings()[kind] |
||||
|
||||
for _, m := range mappings { |
||||
fieldName := m.Field |
||||
fieldType := m.Type |
||||
|
||||
// Create a field mapping based on field type
|
||||
switch fieldType { |
||||
case "string", "string[]": |
||||
specMapping.AddFieldMappingsAt(fieldName, bleve.NewTextFieldMapping()) |
||||
case "int", "int64", "float64": |
||||
specMapping.AddFieldMappingsAt(fieldName, bleve.NewNumericFieldMapping()) |
||||
case "bool": |
||||
specMapping.AddFieldMappingsAt(fieldName, bleve.NewBooleanFieldMapping()) |
||||
case "time": |
||||
specMapping.AddFieldMappingsAt(fieldName, bleve.NewDateTimeFieldMapping()) |
||||
default: |
||||
// TODO support indexing arrays and nested fields
|
||||
// We are only indexing top level string,int, and bool fields within spec for now. Arrays or nested fields are not yet supported.
|
||||
} |
||||
} |
||||
|
||||
return specMapping |
||||
} |
||||
|
||||
func IsSpecField(field string) bool { |
||||
field = strings.TrimPrefix(field, "-") |
||||
_, ok := specFields[field] |
||||
return ok |
||||
} |
||||
|
||||
var specFields = mapSpecFields() |
||||
|
||||
func mapSpecFields() map[string]bool { |
||||
fields := map[string]bool{} |
||||
for _, mappings := range specMappings { |
||||
for _, m := range mappings { |
||||
fields[m.Field] = true |
||||
} |
||||
} |
||||
return fields |
||||
} |
||||
|
||||
var specMappings = map[string][]SpecFieldMapping{ |
||||
"Playlist": { |
||||
{ |
||||
Field: "interval", |
||||
Type: "string", |
||||
}, |
||||
{ |
||||
Field: "title", |
||||
Type: "string", |
||||
}, |
||||
}, |
||||
"Folder": { |
||||
{ |
||||
Field: "title", |
||||
Type: "string", |
||||
}, |
||||
{ |
||||
Field: "description", |
||||
Type: "string", |
||||
}, |
||||
}, |
||||
"Dashboard": { |
||||
{ |
||||
Field: "title", |
||||
Type: "string", |
||||
}, |
||||
{ |
||||
Field: "description", |
||||
Type: "string", |
||||
}, |
||||
{ |
||||
Field: "tags", |
||||
Type: "string[]", |
||||
}, |
||||
}, |
||||
} |
@ -1,126 +0,0 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"os" |
||||
"path/filepath" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/grafana/dskit/instrument" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
) |
||||
|
||||
var ( |
||||
onceIndex sync.Once |
||||
IndexServerMetrics *IndexMetrics |
||||
) |
||||
|
||||
type IndexMetrics struct { |
||||
IndexDir string |
||||
IndexServer *IndexServer |
||||
|
||||
// metrics
|
||||
IndexLatency *prometheus.HistogramVec |
||||
IndexSize prometheus.Gauge |
||||
IndexedDocs prometheus.Gauge |
||||
IndexedKinds *prometheus.GaugeVec |
||||
IndexCreationTime *prometheus.HistogramVec |
||||
} |
||||
|
||||
var IndexCreationBuckets = []float64{1, 5, 10, 25, 50, 75, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000} |
||||
|
||||
func NewIndexMetrics(indexDir string, indexServer *IndexServer) *IndexMetrics { |
||||
onceIndex.Do(func() { |
||||
IndexServerMetrics = &IndexMetrics{ |
||||
IndexDir: indexDir, |
||||
IndexServer: indexServer, |
||||
IndexLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ |
||||
Namespace: "index_server", |
||||
Name: "index_latency_seconds", |
||||
Help: "Time (in seconds) until index is updated with new event", |
||||
Buckets: instrument.DefBuckets, |
||||
NativeHistogramBucketFactor: 1.1, // enable native histograms
|
||||
NativeHistogramMaxBucketNumber: 160, |
||||
NativeHistogramMinResetDuration: time.Hour, |
||||
}, []string{"resource"}), |
||||
IndexSize: prometheus.NewGauge(prometheus.GaugeOpts{ |
||||
Namespace: "index_server", |
||||
Name: "index_size", |
||||
Help: "Size of the index in bytes", |
||||
}), |
||||
IndexedDocs: prometheus.NewGauge(prometheus.GaugeOpts{ |
||||
Namespace: "index_server", |
||||
Name: "indexed_docs", |
||||
Help: "Number of indexed documents by resource", |
||||
}), |
||||
IndexedKinds: prometheus.NewGaugeVec(prometheus.GaugeOpts{ |
||||
Namespace: "index_server", |
||||
Name: "indexed_kinds", |
||||
Help: "Number of indexed documents by kind", |
||||
}, []string{"kind"}), |
||||
IndexCreationTime: prometheus.NewHistogramVec(prometheus.HistogramOpts{ |
||||
Namespace: "index_server", |
||||
Name: "index_creation_time_seconds", |
||||
Help: "Time (in seconds) it takes until index is created", |
||||
Buckets: IndexCreationBuckets, |
||||
NativeHistogramBucketFactor: 1.1, // enable native histograms
|
||||
NativeHistogramMaxBucketNumber: 160, |
||||
NativeHistogramMinResetDuration: time.Hour, |
||||
}, []string{}), |
||||
} |
||||
}) |
||||
|
||||
return IndexServerMetrics |
||||
} |
||||
|
||||
func (s *IndexMetrics) Collect(ch chan<- prometheus.Metric) { |
||||
s.IndexLatency.Collect(ch) |
||||
s.IndexCreationTime.Collect(ch) |
||||
s.IndexedKinds.Collect(ch) |
||||
|
||||
// collect index size
|
||||
totalSize, err := getTotalIndexSize(s.IndexDir) |
||||
if err == nil { |
||||
s.IndexSize.Set(float64(totalSize)) |
||||
s.IndexSize.Collect(ch) |
||||
} |
||||
|
||||
// collect index docs
|
||||
s.IndexedDocs.Set(getTotalDocCount(s.IndexServer.index)) |
||||
s.IndexedDocs.Collect(ch) |
||||
} |
||||
|
||||
func (s *IndexMetrics) Describe(ch chan<- *prometheus.Desc) { |
||||
s.IndexLatency.Describe(ch) |
||||
s.IndexSize.Describe(ch) |
||||
s.IndexedDocs.Describe(ch) |
||||
s.IndexedKinds.Describe(ch) |
||||
s.IndexCreationTime.Describe(ch) |
||||
} |
||||
|
||||
// getTotalDocCount returns the total number of documents in the index
|
||||
func getTotalDocCount(index *Index) float64 { |
||||
count, _ := index.Count() |
||||
return float64(count) |
||||
} |
||||
|
||||
// getTotalIndexSize returns the total size of the index directory when using a file-based index
|
||||
func getTotalIndexSize(dir string) (int64, error) { |
||||
var totalSize int64 |
||||
|
||||
err := filepath.WalkDir(dir, func(path string, info os.DirEntry, err error) error { |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if !info.IsDir() { |
||||
fileInfo, err := info.Info() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
totalSize += fileInfo.Size() |
||||
} |
||||
return nil |
||||
}) |
||||
|
||||
return totalSize, err |
||||
} |
@ -1,254 +0,0 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"errors" |
||||
"log/slog" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"google.golang.org/grpc" |
||||
) |
||||
|
||||
type IndexServer struct { |
||||
ResourceServer |
||||
s *server |
||||
index *Index |
||||
ws *indexWatchServer |
||||
log *slog.Logger |
||||
cfg *setting.Cfg |
||||
tracer tracing.Tracer |
||||
} |
||||
|
||||
const tracingPrefixIndexServer = "unified_storage.index_server." |
||||
|
||||
func (is *IndexServer) Search(ctx context.Context, req *SearchRequest) (*SearchResponse, error) { |
||||
ctx, span := is.tracer.Start(ctx, tracingPrefixIndexServer+"Search") |
||||
defer span.End() |
||||
|
||||
results, err := is.index.Search(ctx, req) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
res := &SearchResponse{} |
||||
for _, r := range results.Values { |
||||
resJsonBytes, err := json.Marshal(r) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
res.Items = append(res.Items, &ResourceWrapper{Value: resJsonBytes}) |
||||
res.Groups = results.Groups |
||||
} |
||||
return res, nil |
||||
} |
||||
|
||||
func (is *IndexServer) History(ctx context.Context, req *HistoryRequest) (*HistoryResponse, error) { |
||||
return nil, nil |
||||
} |
||||
|
||||
func (is *IndexServer) Origin(ctx context.Context, req *OriginRequest) (*OriginResponse, error) { |
||||
return nil, nil |
||||
} |
||||
|
||||
// Load the index
|
||||
func (is *IndexServer) Load(ctx context.Context) error { |
||||
ctx, span := is.tracer.Start(ctx, tracingPrefixIndexServer+"Load") |
||||
defer span.End() |
||||
|
||||
opts := Opts{ |
||||
Workers: is.cfg.IndexWorkers, |
||||
BatchSize: is.cfg.IndexMaxBatchSize, |
||||
ListLimit: is.cfg.IndexListLimit, |
||||
IndexDir: is.cfg.IndexPath, |
||||
} |
||||
is.index = NewIndex(is.s, opts, is.tracer) |
||||
err := is.index.Init(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Watch resources for changes and update the index
|
||||
func (is *IndexServer) Watch(ctx context.Context) error { |
||||
rtList := fetchResourceTypes() |
||||
for _, rt := range rtList { |
||||
wr := &WatchRequest{ |
||||
Options: rt.ListOptions, |
||||
} |
||||
|
||||
go func() { |
||||
for { |
||||
// blocking call
|
||||
err := is.s.Watch(wr, is.ws) |
||||
if err != nil { |
||||
is.log.Error("Error watching resource", "error", err) |
||||
} |
||||
is.log.Debug("Resource watch ended. Restarting watch") |
||||
} |
||||
}() |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Init sets the resource server on the index server
|
||||
// so we can call the resource server from the index server
|
||||
// TODO: a chicken and egg problem - index server needs the resource server but the resource server is created with the index server
|
||||
func (is *IndexServer) Init(ctx context.Context, rs *server) error { |
||||
is.s = rs |
||||
is.ws = &indexWatchServer{ |
||||
is: is, |
||||
context: ctx, |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func NewResourceIndexServer(cfg *setting.Cfg, tracer tracing.Tracer) ResourceIndexServer { |
||||
logger := slog.Default().With("logger", "index-server") |
||||
|
||||
indexServer := &IndexServer{ |
||||
log: logger, |
||||
cfg: cfg, |
||||
tracer: tracer, |
||||
} |
||||
|
||||
err := prometheus.Register(NewIndexMetrics(cfg.IndexPath, indexServer)) |
||||
if err != nil { |
||||
logger.Warn("Failed to register index metrics", "error", err) |
||||
} |
||||
|
||||
return indexServer |
||||
} |
||||
|
||||
type ResourceIndexer interface { |
||||
Index(ctx context.Context) (*Index, error) |
||||
} |
||||
|
||||
type indexWatchServer struct { |
||||
grpc.ServerStream |
||||
context context.Context |
||||
is *IndexServer |
||||
} |
||||
|
||||
func (f *indexWatchServer) Send(we *WatchEvent) error { |
||||
if we.Type == WatchEvent_ADDED { |
||||
return f.Add(we) |
||||
} |
||||
|
||||
if we.Type == WatchEvent_DELETED { |
||||
return f.Delete(we) |
||||
} |
||||
|
||||
if we.Type == WatchEvent_MODIFIED { |
||||
return f.Update(we) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (f *indexWatchServer) RecvMsg(m interface{}) error { |
||||
return nil |
||||
} |
||||
|
||||
func (f *indexWatchServer) SendMsg(m interface{}) error { |
||||
return errors.New("not implemented") |
||||
} |
||||
|
||||
func (f *indexWatchServer) Context() context.Context { |
||||
if f.context == nil { |
||||
f.context = context.Background() |
||||
} |
||||
return f.context |
||||
} |
||||
|
||||
func (f *indexWatchServer) Index() *Index { |
||||
return f.is.index |
||||
} |
||||
|
||||
func (f *indexWatchServer) Add(we *WatchEvent) error { |
||||
data, err := getData(we.Resource) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = f.Index().Index(f.context, data) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (f *indexWatchServer) Delete(we *WatchEvent) error { |
||||
rs, err := resource(we) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
data, err := getData(rs) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = f.Index().Delete(f.context, data.Uid, data.Key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (f *indexWatchServer) Update(we *WatchEvent) error { |
||||
rs, err := resource(we) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
data, err := getData(rs) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = f.Index().Delete(f.context, data.Uid, data.Key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = f.Index().Index(f.context, data) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type Data struct { |
||||
Key *ResourceKey |
||||
Value *ResourceWrapper |
||||
Uid string |
||||
} |
||||
|
||||
func getData(wr *WatchEvent_Resource) (*Data, error) { |
||||
r, err := NewIndexedResource(wr.Value) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
key := &ResourceKey{ |
||||
Group: r.Group, |
||||
Resource: r.Kind, // We use Kind as resource key since watch events don't have a resource name on them
|
||||
Namespace: r.Namespace, |
||||
Name: r.Name, |
||||
} |
||||
|
||||
value := &ResourceWrapper{ |
||||
ResourceVersion: wr.Version, |
||||
Value: wr.Value, |
||||
} |
||||
return &Data{Key: key, Value: value, Uid: r.Uid}, nil |
||||
} |
||||
|
||||
func resource(we *WatchEvent) (*WatchEvent_Resource, error) { |
||||
rs := we.Resource |
||||
if rs == nil || len(rs.Value) == 0 { |
||||
// for updates/deletes
|
||||
rs = we.Previous |
||||
} |
||||
if rs == nil || len(rs.Value) == 0 { |
||||
return nil, errors.New("resource not found") |
||||
} |
||||
return rs, nil |
||||
} |
@ -1,281 +0,0 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"os" |
||||
"strconv" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/infra/tracing" |
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
"golang.org/x/exp/rand" |
||||
) |
||||
|
||||
const testTenant = "default" |
||||
|
||||
var testContext = context.Background() |
||||
|
||||
func TestIndexDashboard(t *testing.T) { |
||||
data := readTestData(t, "dashboard-resource.json") |
||||
list := &ListResponse{Items: []*ResourceWrapper{{Value: data}}} |
||||
index := newTestIndex(t, 1) |
||||
|
||||
err := index.writeBatch(testContext, list) |
||||
require.NoError(t, err) |
||||
|
||||
assertCountEquals(t, index, 1) |
||||
require.Equal(t, 1, len(index.allTenants())) |
||||
assertSearchCountEquals(t, index, "*", nil, nil, 1) |
||||
} |
||||
|
||||
func TestIndexFolder(t *testing.T) { |
||||
data := readTestData(t, "folder-resource.json") |
||||
list := &ListResponse{Items: []*ResourceWrapper{{Value: data}}} |
||||
index := newTestIndex(t, 1) |
||||
|
||||
err := index.writeBatch(testContext, list) |
||||
require.NoError(t, err) |
||||
|
||||
assertCountEquals(t, index, 1) |
||||
assertSearchCountEquals(t, index, "*", nil, nil, 1) |
||||
} |
||||
|
||||
func TestSearchFolder(t *testing.T) { |
||||
dashboard := readTestData(t, "dashboard-resource.json") |
||||
folder := readTestData(t, "folder-resource.json") |
||||
list := &ListResponse{Items: []*ResourceWrapper{{Value: dashboard}, {Value: folder}}} |
||||
index := newTestIndex(t, 1) |
||||
|
||||
err := index.writeBatch(testContext, list) |
||||
require.NoError(t, err) |
||||
|
||||
assertCountEquals(t, index, 2) |
||||
assertSearchCountEquals(t, index, "*", []string{"folder"}, nil, 1) |
||||
} |
||||
|
||||
func TestSearchDashboardsAndFoldersOnly(t *testing.T) { |
||||
dashboard := readTestData(t, "dashboard-resource.json") |
||||
folder := readTestData(t, "folder-resource.json") |
||||
playlist := readTestData(t, "playlist-resource.json") |
||||
list := &ListResponse{Items: []*ResourceWrapper{{Value: dashboard}, {Value: folder}, {Value: playlist}}} |
||||
index := newTestIndex(t, 1) |
||||
|
||||
err := index.writeBatch(testContext, list) |
||||
require.NoError(t, err) |
||||
|
||||
assertCountEquals(t, index, 3) |
||||
assertSearchCountEquals(t, index, "*", []string{"dashboard", "folder"}, nil, 2) |
||||
} |
||||
|
||||
func TestLookupNames(t *testing.T) { |
||||
records := 1000 |
||||
folders, ids := simulateFolders(records) |
||||
list := &ListResponse{Items: []*ResourceWrapper{}} |
||||
for _, f := range folders { |
||||
list.Items = append(list.Items, &ResourceWrapper{Value: []byte(f)}) |
||||
} |
||||
index := newTestIndex(t, 1) |
||||
|
||||
err := index.writeBatch(testContext, list) |
||||
require.NoError(t, err) |
||||
|
||||
assertCountEquals(t, index, records) |
||||
query := "" |
||||
chunk := ids[:100] // query for n folders by id
|
||||
for _, id := range chunk { |
||||
query += `"` + id + `" ` |
||||
} |
||||
assertSearchCountEquals(t, index, query, nil, nil, int64(len(chunk))) |
||||
} |
||||
|
||||
func TestIndexDashboardWithTags(t *testing.T) { |
||||
dashboard := readTestData(t, "dashboard-resource.json") |
||||
data := readTestData(t, "dashboard-tagged-resource.json") |
||||
data2 := readTestData(t, "dashboard-tagged-resource2.json") |
||||
list := &ListResponse{Items: []*ResourceWrapper{{Value: dashboard}, {Value: data}, {Value: data2}}} |
||||
index := newTestIndex(t, 2) |
||||
|
||||
err := index.writeBatch(testContext, list) |
||||
require.NoError(t, err) |
||||
|
||||
assertCountEquals(t, index, 3) |
||||
assertSearchCountEquals(t, index, "*", nil, []string{"tag1"}, 2) |
||||
assertSearchCountEquals(t, index, "*", nil, []string{"tag4"}, 1) |
||||
assertSearchGroupCountEquals(t, index, "*", "tags", nil, 4) |
||||
assertSearchGroupCountEquals(t, index, "*", "tags", []string{"tag4"}, 3) |
||||
} |
||||
|
||||
func TestSort(t *testing.T) { |
||||
dashboard := readTestData(t, "dashboard-resource.json") |
||||
folder := readTestData(t, "folder-resource.json") |
||||
playlist := readTestData(t, "playlist-resource.json") |
||||
list := &ListResponse{Items: []*ResourceWrapper{{Value: dashboard}, {Value: folder}, {Value: playlist}}} |
||||
index := newTestIndex(t, 1) |
||||
|
||||
err := index.writeBatch(testContext, list) |
||||
require.NoError(t, err) |
||||
|
||||
assertCountEquals(t, index, 3) |
||||
|
||||
req := &SearchRequest{Query: "*", Tenant: testTenant, Limit: 4, Offset: 0, Kind: []string{"dashboard", "folder"}, SortBy: []string{"title"}} |
||||
results, err := index.Search(testContext, req) |
||||
require.NoError(t, err) |
||||
|
||||
val := results.Values[0] |
||||
assert.Equal(t, "dashboard-a", val.Spec["title"]) |
||||
|
||||
req = &SearchRequest{Query: "*", Tenant: testTenant, Limit: 4, Offset: 0, Kind: []string{"dashboard", "folder"}, SortBy: []string{"-title"}} |
||||
results, err = index.Search(testContext, req) |
||||
require.NoError(t, err) |
||||
|
||||
val = results.Values[0] |
||||
assert.NotEqual(t, "dashboard-a", val.Spec["title"]) |
||||
} |
||||
|
||||
func TestIndexBatch(t *testing.T) { |
||||
index := newTestIndex(t, 1000) |
||||
|
||||
startAll := time.Now() |
||||
ns := namespaces() |
||||
// simulate 10 List calls
|
||||
for i := 0; i < 10; i++ { |
||||
list := &ListResponse{Items: loadTestItems(strconv.Itoa(i), ns)} |
||||
start := time.Now() |
||||
_, err := index.AddToBatches(testContext, list) |
||||
require.NoError(t, err) |
||||
elapsed := time.Since(start) |
||||
fmt.Println("Time elapsed:", elapsed) |
||||
} |
||||
|
||||
// index all batches for each shard/tenant
|
||||
err := index.IndexBatches(testContext, 1, ns) |
||||
require.NoError(t, err) |
||||
|
||||
elapsed := time.Since(startAll) |
||||
fmt.Println("Total Time elapsed:", elapsed) |
||||
|
||||
assert.Equal(t, len(ns), len(index.shards)) |
||||
assertCountEquals(t, index, 100000) |
||||
} |
||||
|
||||
func loadTestItems(uid string, tenants []string) []*ResourceWrapper { |
||||
resource := `{ |
||||
"kind": "<kind>", |
||||
"title": "test", |
||||
"metadata": { |
||||
"uid": "<uid>", |
||||
"name": "test", |
||||
"namespace": "<ns>" |
||||
}, |
||||
"spec": { |
||||
"title": "test", |
||||
"description": "test", |
||||
"interval": "5m" |
||||
} |
||||
}` |
||||
|
||||
items := []*ResourceWrapper{} |
||||
for i := 0; i < 10000; i++ { |
||||
res := strings.Replace(resource, "<uid>", strconv.Itoa(i)+uid, 1) |
||||
// shuffle kinds
|
||||
kind := kinds[rand.Intn(len(kinds))] |
||||
res = strings.Replace(res, "<kind>", kind, 1) |
||||
// shuffle namespaces
|
||||
ns := tenants[rand.Intn(len(tenants))] |
||||
res = strings.Replace(res, "<ns>", ns, 1) |
||||
items = append(items, &ResourceWrapper{Value: []byte(res)}) |
||||
} |
||||
return items |
||||
} |
||||
|
||||
var kinds = []string{ |
||||
"playlist", |
||||
"folder", |
||||
} |
||||
|
||||
// simulate many tenants ( cloud )
|
||||
func namespaces() []string { |
||||
ns := []string{} |
||||
for i := 0; i < 1000; i++ { |
||||
ns = append(ns, "tenant"+strconv.Itoa(i)) |
||||
} |
||||
return ns |
||||
} |
||||
|
||||
func newTestIndex(t *testing.T, batchSize int) *Index { |
||||
tracingCfg := tracing.NewEmptyTracingConfig() |
||||
trace, err := tracing.ProvideService(tracingCfg) |
||||
require.NoError(t, err) |
||||
|
||||
return &Index{ |
||||
tracer: trace, |
||||
shards: make(map[string]*Shard), |
||||
log: log.New("unifiedstorage.search.index"), |
||||
opts: Opts{ |
||||
ListLimit: 5000, |
||||
Workers: 10, |
||||
BatchSize: batchSize, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func assertCountEquals(t *testing.T, index *Index, expected int) { |
||||
total, err := index.Count() |
||||
require.NoError(t, err) |
||||
assert.Equal(t, expected, total) |
||||
} |
||||
|
||||
func assertSearchCountEquals(t *testing.T, index *Index, search string, kind []string, filters []string, expected int64) { |
||||
req := &SearchRequest{Query: search, Tenant: testTenant, Limit: expected + 1, Offset: 0, Kind: kind, Filters: filters} |
||||
start := time.Now() |
||||
results, err := index.Search(testContext, req) |
||||
require.NoError(t, err) |
||||
elapsed := time.Since(start) |
||||
fmt.Println("Search time:", elapsed) |
||||
assert.Equal(t, expected, int64(len(results.Values))) |
||||
} |
||||
|
||||
func assertSearchGroupCountEquals(t *testing.T, index *Index, search string, group string, filters []string, expected int64) { |
||||
groupBy := []*GroupBy{{Name: group, Limit: 100}} |
||||
req := &SearchRequest{Query: search, Tenant: testTenant, Limit: 1, Offset: 0, GroupBy: groupBy, Filters: filters} |
||||
results, err := index.Search(testContext, req) |
||||
require.NoError(t, err) |
||||
assert.Equal(t, expected, int64(len(results.Groups))) |
||||
} |
||||
|
||||
func readTestData(t *testing.T, name string) []byte { |
||||
// We can ignore the gosec G304 because this is only for tests
|
||||
// nolint:gosec
|
||||
data, err := os.ReadFile("./testdata/" + name) |
||||
require.NoError(t, err) |
||||
return data |
||||
} |
||||
|
||||
func simulateFolders(size int) ([]string, []string) { |
||||
folders := []string{} |
||||
ids := []string{} |
||||
for i := 0; i < size; i++ { |
||||
id := "folder-" + strconv.Itoa(i) |
||||
folder := `{ |
||||
"kind": "Folder", |
||||
"title": "test", |
||||
"metadata": { |
||||
"uid": "` + id + `", |
||||
"name": "folder-` + strconv.Itoa(i) + `", |
||||
"namespace": "default" |
||||
}, |
||||
"spec": { |
||||
"title": "test", |
||||
"description": "test" |
||||
} |
||||
}` |
||||
folders = append(folders, folder) |
||||
ids = append(ids, id) |
||||
} |
||||
return folders, ids |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -1,126 +0,0 @@ |
||||
package test |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana/pkg/setting" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
"github.com/grafana/grafana/pkg/storage/unified/sql" |
||||
"github.com/grafana/grafana/pkg/util/testutil" |
||||
"github.com/stretchr/testify/require" |
||||
"golang.org/x/net/context" |
||||
) |
||||
|
||||
// addResource is a helper to create a resource in unified storage
|
||||
func addResource(t *testing.T, ctx context.Context, backend sql.Backend, resourceName string, data string) { |
||||
ir, err := resource.NewIndexedResource([]byte(data)) |
||||
require.NoError(t, err) |
||||
_, err = backend.WriteEvent(ctx, resource.WriteEvent{ |
||||
Type: resource.WatchEvent_ADDED, |
||||
Value: []byte(data), |
||||
Key: &resource.ResourceKey{ |
||||
Namespace: ir.Namespace, |
||||
Group: ir.Group, |
||||
Resource: resourceName, |
||||
Name: ir.Name, |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestIntegrationIndexerSearch(t *testing.T) { |
||||
if testing.Short() { |
||||
t.Skip("skipping integration test") |
||||
} |
||||
|
||||
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second)) |
||||
cfg := setting.NewCfg() |
||||
cfg.IndexWorkers = 1 |
||||
cfg.IndexMaxBatchSize = 100 |
||||
cfg.IndexListLimit = 100 |
||||
backend, server := newServer(t, cfg) |
||||
|
||||
playlist1 := `{ |
||||
"kind": "Playlist", |
||||
"apiVersion": "playlist.grafana.app/v0alpha1", |
||||
"metadata": { |
||||
"name": "playlist dogs", |
||||
"namespace": "tenant1", |
||||
"uid": "1fe028dc-81bb-4268-a3ff-20899ff0a16f", |
||||
"resourceVersion": "1", |
||||
"creationTimestamp": "2024-01-01T12:00:00Z" |
||||
}, |
||||
"spec": { |
||||
"interval": "5m", |
||||
"title": "dogs" |
||||
} |
||||
}` |
||||
playlist2 := `{ |
||||
"kind": "Playlist", |
||||
"apiVersion": "playlist.grafana.app/v0alpha1", |
||||
"metadata": { |
||||
"name": "playlist cats", |
||||
"namespace": "tenant1", |
||||
"uid": "1fe028dc-81bb-4268-a3ff-20899ff0a16f123", |
||||
"resourceVersion": "2", |
||||
"creationTimestamp": "2024-01-02T12:00:00Z" |
||||
}, |
||||
"spec": { |
||||
"interval": "5m", |
||||
"title": "cats" |
||||
} |
||||
}` |
||||
|
||||
// add playlist1 and playlist2 to unified storage
|
||||
addResource(t, ctx, backend, "playlists", playlist1) |
||||
addResource(t, ctx, backend, "playlists", playlist2) |
||||
|
||||
// initialize and build the search index
|
||||
indexer, ok := server.(resource.ResourceIndexer) |
||||
if !ok { |
||||
t.Fatal("server does not implement ResourceIndexer") |
||||
} |
||||
_, err := indexer.Index(ctx) |
||||
require.NoError(t, err) |
||||
|
||||
// run search tests against the index
|
||||
t.Run("can search for all resources", func(t *testing.T) { |
||||
res, err := server.Search(ctx, &resource.SearchRequest{ |
||||
Tenant: "tenant1", |
||||
Query: "*", |
||||
Limit: 10, |
||||
Offset: 0, |
||||
}) |
||||
require.NoError(t, err) |
||||
require.Len(t, res.Items, 2) |
||||
}) |
||||
|
||||
t.Run("can search for resources by title", func(t *testing.T) { |
||||
res, err := server.Search(ctx, &resource.SearchRequest{ |
||||
Tenant: "tenant1", |
||||
Query: "Spec.title:dogs", |
||||
Limit: 10, |
||||
Offset: 0, |
||||
}) |
||||
require.NoError(t, err) |
||||
require.Len(t, res.Items, 1) |
||||
}) |
||||
|
||||
t.Run("can filter resources by created time", func(t *testing.T) { |
||||
res, err := server.Search(ctx, &resource.SearchRequest{ |
||||
Tenant: "tenant1", |
||||
Query: "CreatedAt:>=\"2024-01-02\"", |
||||
Limit: 10, |
||||
Offset: 0, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
require.Len(t, res.Items, 1) |
||||
ir := resource.IndexedResource{} |
||||
err = json.Unmarshal(res.Items[0].Value, &ir) |
||||
require.NoError(t, err) |
||||
require.Equal(t, "playlist cats", ir.Name) |
||||
}) |
||||
} |
Loading…
Reference in new issue