Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/openai/usage/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
state/openai/usage/state_*
4 changes: 3 additions & 1 deletion x-pack/metricbeat/module/openai/usage/persistcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ func (s *stateManager) SaveState(apiKey, dateStr string) error {
func (s *stateManager) hashKey(apiKey string) string {
// Check cache first to avoid recomputing hashes
if hashedKey, ok := s.hashCache.Load(apiKey); ok {
return hashedKey.(string)
if hashedKeyStr, ok := hashedKey.(string); ok {
return hashedKeyStr
}
}

// Generate SHA-256 hash and hex encode for safe filename usage
Expand Down
6 changes: 3 additions & 3 deletions x-pack/metricbeat/module/openai/usage/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ type BaseData struct {
type UsageResponse struct {
Object string `json:"object"`
Data []UsageData `json:"data"`
FtData []interface{} `json:"ft_data"`
FtData []any `json:"ft_data"`
DalleApiData []DalleData `json:"dalle_api_data"`
WhisperApiData []WhisperData `json:"whisper_api_data"`
TtsApiData []TtsData `json:"tts_api_data"`
AssistantCodeInterpreterData []interface{} `json:"assistant_code_interpreter_data"`
RetrievalStorageData []interface{} `json:"retrieval_storage_data"`
AssistantCodeInterpreterData []any `json:"assistant_code_interpreter_data"`
RetrievalStorageData []any `json:"retrieval_storage_data"`
}

type UsageData struct {
Expand Down
14 changes: 7 additions & 7 deletions x-pack/metricbeat/module/openai/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// 2. Calculates start date using configured lookback days
// 3. Fetches usage data for each day in the range
// 4. Reports collected metrics through the mb.ReporterV2
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
func (m *MetricSet) Fetch(ctx context.Context, report mb.ReporterV2) error {
endDate := time.Now().UTC().Truncate(time.Hour * 24) // truncate to day as we only collect daily data

if !m.config.Collection.Realtime {
Expand All @@ -105,7 +105,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
startDate := endDate.AddDate(0, 0, -m.config.Collection.LookbackDays)

m.report = report
return m.fetchDateRange(startDate, endDate, m.httpClient)
return m.fetchDateRange(ctx, startDate, endDate, m.httpClient)
}

// fetchDateRange retrieves OpenAI API usage data for each configured API key within a date range.
Expand All @@ -116,8 +116,8 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// 3. Collects daily usage data
// 4. Updates state store with latest processed date
// 5. Handles errors per day without failing entire range
func (m *MetricSet) fetchDateRange(startDate, endDate time.Time, httpClient *RLHTTPClient) error {
g, ctx := errgroup.WithContext(context.TODO())
func (m *MetricSet) fetchDateRange(ctx context.Context, startDate, endDate time.Time, httpClient *RLHTTPClient) error {
g, ctx := errgroup.WithContext(ctx)

for i := range m.config.APIKeys {
apiKey := m.config.APIKeys[i]
Expand Down Expand Up @@ -332,7 +332,7 @@ func (m *MetricSet) processTTSData(data []TtsData) {
m.processEvents(events)
}

func (m *MetricSet) processFTData(data []interface{}) {
func (m *MetricSet) processFTData(data []any) {
events := make([]mb.Event, 0, len(data))
for _, ft := range data {
event := mb.Event{
Expand All @@ -347,7 +347,7 @@ func (m *MetricSet) processFTData(data []interface{}) {
m.processEvents(events)
}

func (m *MetricSet) processAssistantCodeInterpreterData(data []interface{}) {
func (m *MetricSet) processAssistantCodeInterpreterData(data []any) {
events := make([]mb.Event, 0, len(data))
for _, aci := range data {
event := mb.Event{
Expand All @@ -362,7 +362,7 @@ func (m *MetricSet) processAssistantCodeInterpreterData(data []interface{}) {
m.processEvents(events)
}

func (m *MetricSet) processRetrievalStorageData(data []interface{}) {
func (m *MetricSet) processRetrievalStorageData(data []any) {
events := make([]mb.Event, 0, len(data))
for _, rs := range data {
event := mb.Event{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func TestFetch(t *testing.T) {
},
}

f := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL+"/usage", apiKey))
events, errs := mbtest.ReportingFetchV2Error(f)
f := mbtest.NewReportingMetricSetV2WithContext(t, getConfig(server.URL+"/usage", apiKey))
events, errs := mbtest.ReportingFetchV2WithContext(f)

require.Empty(t, errs, "Expected no errors")
require.NotEmpty(t, events, "Expected events to be returned")
Expand All @@ -297,9 +297,9 @@ func TestData(t *testing.T) {
server := initServer(usagePath, apiKey)
defer server.Close()

f := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL+"/usage", apiKey))
f := mbtest.NewReportingMetricSetV2WithContext(t, getConfig(server.URL+"/usage", apiKey))

err := mbtest.WriteEventsReporterV2Error(f, t, "")
err := mbtest.WriteEventsReporterV2WithContext(f, t, "")
require.NoError(t, err, "Writing events should not return an error")
}

Expand Down