Export API
OpenCost provides a set of APIs that work together to provide a flexible and extensible way to export data of any type. OpenCost leverages these APIs in the Diagnostics and pipeline exporters which target the Storage API. The following documentation will review the various components that make up the Export API.
Exporter[T]
An OpenCost Exporter is defined as an object capable of exporting some data T
per unit of time. This contract is defined in go by the following generic interface:
// Exporter[T] is a generic interface for exporting T data per unit of time.
type Exporter[TimeUnit any, T any] interface {
Export(time TimeUnit, data *T) error
}
The TimeUnit
in the above case can generally be any type, but OpenCost time units can be as simple as a time.Time
instance (for an instant) or a time.Duration
instance for data over a time period (rate). By extension of the Exporter
interface, two discrete aliases were defined to more clearly define the "type" of Exporter
being used:
- The
EventExporter[T]
interface is an alias forExporter[time.Time, T]
. We classify specific "events" as data exported at a specific point in time. - The
ComputeExporter[T]
interface is an alias forExporter[opencost.Window, T]
. We classify specific "compute" data as specific data exporter over a discrete start and end time.
By the name alone, you could make a few reasonable assumptions about what an Exporter is. An Exporter is a component that takes some data and "exports" it to some other system. This could be a database, a file, or even a message queue. The key point is that the Exporter is responsible for taking the data and sending it somewhere else. Specifically with regards to OpenCost, the Exporter will assist in accepting cost data generated by a model, and exporting that data to a [persistent] storage system.
Considering all of the possible ways we may want to store and organize such data, we created a set of components that can help facilitate the "export" process.
Components
These components are designed to be leveraged by the Exporter implementations to assist in the export process.
- Storage: The Storage component is responsible for storing the data in a persistent storage system. The Storage component is responsible for taking the data and sending it to the appropriate storage system.
- StoragePathFormatter: The storage path formatter is responsible for generating a unique path for the data being exported.
// StoragePathFormatter is an interface used to format storage paths for exporting data types.
type StoragePathFormatter[T any] interface {
// RootDir returns the root directory for the storage path.
RootDir() string
// ToFullPath returns the full path to a file name within the storage
// directory leveraging a prefix and an incoming T type (generally a daterange or timestamp).
ToFullPath(prefix string, in T, fileExt string) string
}
- Encoder: The encoder is responsible for encoding the data type
T
into a binary representation that be used by theStorage
component.
// Encoder[T] is a generic interface for encoding an instance of a T type into a byte slice.
type Encoder[T any] interface {
Encode(*T) ([]byte, error)
// FileExt returns the file extension for the encoded data. This can be used by a pathing strategy
// to append the file extension when exporting the data. Returning an empty string will typically
// omit the file extension completely.
FileExt() string
}
- Validator: The validator is responsible for validating the data type
T
before it is exported. This is useful for ensuring that the data being exported is valid and can be stored in the storage system.
// Validator is an implementation of an object capable of validating a T instance prior to
// insertion into a store.
type ExportValidator[T any] interface {
// Validate determines whether or not the given data can be legally
// added to the store.
Validate(window opencost.Window, data *T) error
// IsOverwrite determines whether or not the provided data can be used
// to overwrite existing data in the storage.
IsOverwrite(data *T) bool
}
Exporter Implementations
Similar to the specific aliases provided for the Exporter
interface, OpenCost provides:
EventStorageExporter[T]
, an implementation of theEventExporter[T]
interface that exports "events" (instant data).ComputeStorageExporter[T]
, an implementation of theComputeExporter[T]
interface that exports "compute" data (data over a time period).
These two out of the box implementations are designed to be used with the Storage
component to export data to a persistent storage system.
EventStorageExporter[T]
A new EventStorageExporter[T]
can be created using the github.com/opencost/opencost/core/pkg/exporter
package and the NewEventStorageExporter[T]
function:
// NewEventStorageExporter creates a new StorageExporter instance, which is responsible for exporting data to a storage backend.
// It uses a pathing strategy to determine the storage location, an encoder to convert the data to binary format, and
// a storage backend to write the data.
func NewEventStorageExporter[T any](
paths pathing.StoragePathFormatter[time.Time],
encoder Encoder[T],
storage storage.Storage,
) EventExporter[T]
This storage exporter will export data to a storage backend using the provided pathing strategy, encoder, and storage backend.
ComputeStorageExporter[T]
A new ComputeStorageExporter[T]
can be created using the github.com/opencost/opencost/core/pkg/exporter
package and the NewComputeStorageExporter[T]
function:
// NewComputeStorageExporter creates a new ComputeStorageExporter instance, which is responsible for exporting
// data for a specific window to a storage backend. It uses a pathing strategy to determine the storage location,
// an encoder to convert the data to binary format, and a validator to check the data before export. The pipeline
// name and resolution are also provided to help identify the data being exported.
func NewComputeStorageExporter[T any](
paths pathing.StoragePathFormatter[opencost.Window],
encoder Encoder[T],
storage storage.Storage,
validator validator.ExportValidator[T],
) ComputeExporter[T]
This storage exporter will export data to a storage backend using the provided pathing strategy, encoder, and storage backend. The validator
parameter is used to validate the data before it is exported.
These implementations are designed to help define parts of a larger system, but having a flexible (and testable) design is important to providing different "flavors" of storing data. To following is a basic example of an EventExporter[T]
implementation that is designed to store JSON encoded Message
objects to an in-memory file system.
package main
import (
"time"
"github.com/opencost/opencost/core/pkg/exporter"
"github.com/opencost/opencost/core/pkg/storage"
"github.com/opencost/opencost/core/pkg/exporter/pathing"
)
// constants we'll use in our example for our cluster identifier and event name
const (
ClusterId = "cluster-01"
MessageEventName = "message"
)
// Our 'T' type is a simple struct that contains a message string
type Message struct {
Message string `json:"message"`
}
func main() {
// create the storage.Storage implementation, an in-memory file system
store := storage.NewMemoryStorage()
// create a pathing.StoragePathFormatter implementation specially for exporting "events"
pathFormatter, err := pathing.NewEventStoragePathFormatter("federated", ClusterId, MessageEventName)
if err != nil {
panic("failed to create path formatter: " + err.Error())
}
// create a JSON encoder for our Message type
encoder := exporter.NewJSONEncoder[Message]()
// finally, create the EventStorageExporter implementation for our Message type
export := exporter.NewEventStorageExporter(pathFormatter, encoder, store)
at := time.Now().UTC()
export.Export(at, &Message{
Message: "Testing 1, 2, 3...",
})
}
The file contents exported to the in-memory file system at the path: federated/cluster-01/message/20250519222700.json
would look like this:
{
"message": "Testing 1, 2, 3..."
}
Export Sources
The OpenCost exporter package provides two generic interfaces for data sources: ExportSource
and ComputeSource
. These interfaces define contracts for components that provide data for export or computation operations.
ExportSource[T]
The ExportSource[T]
interface provides a factory-style contract for creating new instances of type T
for exporting.
type ExportSource[T any] interface {
Make(timestamp time.Time) *T
Name() string
}
Methods
-
*Make(timestamp time.Time) T
Creates a new instance of typeT
at the specified timestamp. This is typically used to generate data that represents a point-in-time snapshot. -
Name() string
Returns the name of the export source. Used for identification and logging.
ComputeSource[T]
The ComputeSource[T]
interface defines a data source that can compute instances of type T
over a time range with a specified resolution. This contract is useful for sources that leverage a queryable backend, or other potentially fallable data sources.
type ComputeSource[T any] interface {
CanCompute(start, end time.Time) bool
Compute(start, end time.Time, resolution time.Duration) (*T, error)
Name() string
}
Methods
-
CanCompute(start, end time.Time) bool
Determines whether the source can effectively compute data for the given time range. This allows the system to check capability before attempting computation that might fail. -
Compute(start, end time.Time, resolution time.Duration) (*T, error)
Computes and returns data of typeT
for the specified time range with the given resolution. Returns an error if computation fails. -
Name() string
Returns the name of the compute source. Used for identification and logging.
Usage
In the Controller
documentation below, you will see how we can compose the Exporter[T]
and ExportSource[T]
in a controller that can generically export data from a generic source to a generic storage backend.
Export Controllers
OpenCost's exporter package implements two primary export controller patterns:
- EventExportController[T]: For exporting point-in-time event data
- ComputeExportController[T]: For exporting computed data over time windows
These controllers compose data sources with exporters to create scheduled export processes that run on configurable intervals.
Controller Interfaces
All controllers implement the ExportController
interface:
type ExportController interface {
Name() string
Start(interval time.Duration) bool
Stop()
}
EventExportController[T]
The EventExportController[T]
handles time-stamped event exporting for data that represents a specific moment in time.
Architecture
┌─────────────────┐ ┌────────────────┐ ┌─────────────────┐
│ ExportSource │────▶│ Event T │────▶│ EventExporter │
└─────────────────┘ └────────────────┘ └─────────────────┘
Make() Export()
Components
- ExportSource[T]: Creates instances of type T for a specific timestamp
- EventExporter[T]: Exports instances of type T to a destination
Lifecycle
- Controller starts a background loop running at the specified interval
- At each interval:
- Gets current time (truncated to second)
- Calls
source.Make(time)
to create a new event instance - Calls
exporter.Export(time, event)
to export the event
ComputeExportController[T]
The ComputeExportController[T]
handles exporting data that is computed over time windows, such as aggregated metrics or cost data.
Architecture
┌─────────────────┐ ┌────────────────┐ ┌──────────────────┐
│ ComputeSource │────▶│ Computed T │────▶│ ComputeExporter │
└─────────────────┘ └────────────────┘ └──────────────────┘
Compute() Export()
Components
- ComputeSource[T]: Computes instances of type T for specific time windows
- ComputeExporter[T]: Exports computed instances to a destination
Key Parameters
- resolution: The time window size for exports (e.g., hourly, daily)
- sourceResolution: The granularity of data used for computation (e.g., per-minute data points). Note that some sources don't allow customizable resolutions in queries.
Lifecycle
- Controller starts a background loop running at the specified interval
- At each interval:
- Determines which time windows need exporting (previous and current)
- For each window:
- Checks if the source can compute data for the window
- Calls
source.Compute(start, end, sourceResolution)
to generate data - Calls
exporter.Export(window, data)
to export the data
ComputeExportControllerGroup
For managing multiple compute controllers with different resolutions, the ComputeExportControllerGroup
provides a way to group and manage them together:
// Create controllers for different resolutions
hourlyController := NewComputeExportController(source, exporter, time.Hour, time.Minute)
dailyController := NewComputeExportController(source, exporter, 24*time.Hour, time.Hour)
// Group them together
group := NewComputeExportControllerGroup(hourlyController, dailyController)
// Start all controllers with the same interval
group.Start(10 * time.Minute)
Summary
The OpenCost exporter framework provides a flexible system for scheduled data exports using strongly typed generic controllers that compose data sources with exporters. This approach allows for clean separation of concerns:
- Sources focus on data generation or computation
- Exporters handle the details of storing or transmitting data
- Controllers manage the scheduling and coordination
By understanding and leveraging this system, you can easily add new data exports to the OpenCost platform with minimal boilerplate.
Example
OpenCost leverages the full Export API components in a diagnostic exporter leveraging the DiagnosticService. Let's walk through a simple example of how to:
- Create an
ExportSource[T]
using an existing service (DiagnosticService
) - Build an
EventStorageExporter[T]
using:- A
StoragePathFormatter
that will generate a unique path for the data being exported - An
Encoder
that will encode the data typeT
into a binary representation (let's use JSON for this example) - A disk based
Storage
implementation that will write the data to disk
- A
- Create an
EventExportController[T]
that will export the data to disk every 10 seconds
Creating the Export Source
Our ExportSource[T]
will be a simple wrapper around the DiagnosticService
that will create a new instance of DiagnosticResult
for each export.
package exporter
import (
"context"
"time"
"github.com/opencost/opencost/core/pkg/diagnostics"
)
// DiagnosticSource is an `export.ExportSource` implementation that provides the basic data for a `DiagnosticsRunReport` payload.
type DiagnosticSource struct {
diagnosticService diagnostics.DiagnosticService
}
// NewDiagnosticSource creates a new `DiagnosticSource` instance. It accepts the `DiagnosticService` implementation
// that will be used to retrieve the diagnostic results.
func NewDiagnosticSource(diagnosticService diagnostics.DiagnosticService) *DiagnosticSource {
return &DiagnosticSource{
diagnosticService: diagnosticService,
}
}
// Make creates a new `DiagnosticsRunReport` instance with the provided current time.
func (ds *DiagnosticSource) Make(t time.Time) *diagnostics.DiagnosticsRunReport {
ctx := context.Background()
// returning nil will prevent export -- skip for 0 registered diagnostics
if ds.diagnosticService.Total() == 0 {
return nil
}
return &diagnostics.DiagnosticsRunReport{
StartTime: t,
Results: ds.diagnosticService.Run(ctx),
}
}
func (ds *DiagnosticSource) Name() string {
return diagnostics.DiagnosticsEventName + "-source"
}
Creating the EventExporter[T]
Next, we will create the EventExporter[T]
that will export the data to disk. We will use a StoragePathFormatter
that will generate a unique path for the data being exported, a JSON encoder to encode the data, and a disk based storage implementation to write the data to disk. Leveraging all the OpenCost component discussed above, it involves very little code. Note that any components used to create the EventExporter[T]
can be swapped out for any other implementation.
package exporter
import (
"github.com/opencost/opencost/core/pkg/diagnostics"
"github.com/opencost/opencost/core/pkg/exporter"
"github.com/opencost/opencost/core/pkg/exporter/pathing"
"github.com/opencost/opencost/core/pkg/log"
"github.com/opencost/opencost/core/pkg/storage"
)
// NewDiagnosticsEncoder returns a JSON encoder used to encode DiagnosticsRunReport events.
func NewDiagnosticsEncoder() exporter.Encoder[diagnostics.DiagnosticsRunReport] {
return exporter.NewJSONEncoder[diagnostics.DiagnosticsRunReport]()
}
// NewDiagnosticExporter creates a new `StorageExporter[DiagnosticsRunReport]` instance for exporting diagnostic run events.
func NewDiagnosticExporter(
clusterId string,
applicationName string,
storage storage.Storage,
) exporter.EventExporter[diagnostics.DiagnosticsRunReport] {
// The first parameter is left empty as the "root" directory - it is unused, so we pass empty string.
pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, diagnostics.DiagnosticsEventName, applicationName)
if err != nil {
log.Errorf("failed to create pathing formatter: %v", err)
return nil
}
return exporter.NewEventStorageExporter(
pathing,
NewDiagnosticsEncoder(),
storage,
)
}
Compose the ExportController
Now, leveraging the EventExporter[T]
and ExportSource[T]
, we can create an EventExportController[T]
that will export the data.
package exporter
import (
"github.com/opencost/opencost/core/pkg/diagnostics"
"github.com/opencost/opencost/core/pkg/exporter"
"github.com/opencost/opencost/core/pkg/storage"
)
// NewDiagnosticsExportController creates a new EventExportController for DiagnosticsRunReport events.
func NewDiagnosticsExportController(
clusterId string,
applicationName string,
store storage.Storage,
service diagnostics.DiagnosticService,
) *exporter.EventExportController[diagnostics.DiagnosticsRunReport] {
return exporter.NewEventExportController(
NewDiagnosticSource(service),
NewDiagnosticExporter(clusterId, applicationName, store),
)
}
Create and Start the Export Controller
Lastly, let's create a an ExportController
instance for the DiagnosticsRunReport
and start it.
package main
import (
"context"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
"github.com/opencost/opencost/core/pkg/diagnostics"
"github.com/opencost/opencost/core/pkg/storage"
diagexporter "github.com/opencost/opencost/core/pkg/diagnostics/exporter"
)
const (
ClusterID = "MyClusterID"
ApplicationName = "MyApplication"
ExportFrequency time.Duration = 10 * time.Second
)
// entry point for finops-agent
func main() {
diag := diagnostics.NewDiagnosticService()
// register a faux diagnostic runner that will flip a coin
diag.Register(
"coin-flip",
"A randomized diagnostic result",
"random",
func(ctx context.Context) (map[string]any, error) {
result := "tails"
if rand.Intn(2) == 1 {
result = "heads"
}
return map[string]any{
"result": result,
}, nil
},
)
// create a file storage located in the /tmp directory (this will ultimately produce /tmp/<cluster-id>/diagnostics/<application-name>/<time>.json files)
fileStorage := storage.NewFileStorage("/tmp")
// diagnostics exporter running every 10 seconds
diagnosticsExporter := diagexporter.NewDiagnosticsExportController(ClusterID, ApplicationName, fileStorage, diag)
diagnosticsExporter.Start(ExportFrequency)
WaitForSignal()
}
// WaitForSignal waits for a termination signal (SIGINT or SIGTERM) and then exits the program.
func WaitForSignal() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
done := make(chan struct{}, 1)
go func() {
defer close(done)
<-signalChan
}()
<-done
}