Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 111 additions & 11 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,41 @@
#include "azure_msiauth.h"
#include "azure_kusto_store.h"

/**
* Retrieve an OAuth2 access token using Managed Service Identity (MSI).
*
* @param ctx Plugin's context containing the OAuth2 configuration.
* @return int 0 on success, -1 on failure.
*/
static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx)
{
char *token;

/* Retrieve access token */
token = flb_azure_msiauth_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
flb_plg_error(ctx->ins, "error retrieving oauth2 access token (MSI access token is NULL)");
return -1;
}

return 0;
}

/**
* Retrieve an OAuth2 access token using workload identity federation.
*
* @param ctx Plugin's context containing workload identity configuration.
* @return int 0 on success, -1 on failure.
*/
static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
{
int ret;

ret = flb_azure_workload_identity_token_get(ctx->o,
ctx->workload_identity_token_file,
ctx->client_id,
ctx->tenant_id);
ctx->tenant_id,
ctx->kusto_scope);
if (ret == -1) {
flb_plg_error(ctx->ins, "error retrieving workload identity token");
return -1;
Expand All @@ -70,6 +83,15 @@ static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
return 0;
}

/**
* Retrieve an OAuth2 access token using service principal credentials.
*
* Constructs the OAuth2 payload with client credentials and requests
* an access token from the configured OAuth2 endpoint.
*
* @param ctx Plugin's context containing client credentials and OAuth2 config.
* @return int 0 on success, -1 on failure.
*/
static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
{
int ret;
Expand All @@ -83,7 +105,9 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
return -1;
}

ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
ret = flb_oauth2_payload_append(ctx->o, "scope", 5,
ctx->kusto_scope,
flb_sds_len(ctx->kusto_scope));
if (ret == -1) {
flb_plg_error(ctx->ins, "error appending oauth2 params");
return -1;
Expand All @@ -100,25 +124,36 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
flb_plg_error(ctx->ins, "error appending oauth2 params");
return -1;
}

/* Retrieve access token */
char *token = flb_oauth2_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
flb_plg_error(ctx->ins, "error retrieving oauth2 access token - "
"check Fluent Bit logs for '[oauth2]' errors "
"(common causes: connection failure to '%s', invalid credentials, "
"or malformed response)", ctx->oauth_url ? ctx->oauth_url : "unknown");
return -1;
}

flb_plg_debug(ctx->ins, "OAuth2 token retrieval process completed successfully");
return 0;
}

/**
* Obtain the current Azure Kusto bearer token as a formatted string.
*
* Acquires the token mutex, refreshes the token if expired based on the
* configured authentication type, and returns a copy of the token string
* in the format "<token_type> <access_token>".
*
* @param ctx Plugin's context.
* @return flb_sds_t The bearer token string, or NULL on error.
*/
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
{
int ret = 0;
flb_sds_t output = NULL;

if (pthread_mutex_lock(&ctx->token_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return NULL;
}

Expand All @@ -144,6 +179,9 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
flb_sds_len(ctx->o->access_token) + 2);
if (!output) {
flb_plg_error(ctx->ins, "error creating token buffer");
if (pthread_mutex_unlock(&ctx->token_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
}
return NULL;
}
flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
Expand Down Expand Up @@ -411,6 +449,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
struct mk_list *tmp;
struct mk_list *head;
struct mk_list *f_head;
struct mk_list *f_tmp;
struct flb_fstore_file *fsf;
struct flb_fstore_stream *fs_stream;
flb_sds_t payload = NULL;
Expand All @@ -428,10 +467,15 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
continue;
}

mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) {
fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
chunk = fsf->data;

/* Skip files with no associated chunk data (may happen during shutdown) */
if (chunk == NULL) {
continue;
}

/* Locked chunks are being processed, skip */
if (chunk->locked == FLB_TRUE) {
continue;
Expand Down Expand Up @@ -893,21 +937,28 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s",
ctx->store_dir);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
ctx->has_old_buffers = azure_kusto_store_has_data(ctx);

/* validate 'total_file_size' */
if (ctx->file_size <= 0) {
flb_plg_error(ctx->ins, "Failed to parse upload_file_size");
azure_kusto_store_exit(ctx);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
if (ctx->file_size < 1000000) {
flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB");
azure_kusto_store_exit(ctx);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
if (ctx->file_size > MAX_FILE_SIZE) {
flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE);
azure_kusto_store_exit(ctx);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}

Expand All @@ -934,15 +985,22 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
* Create upstream context for Kusto Ingestion endpoint
*/
ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u) {
flb_plg_error(ctx->ins, "upstream creation failed");
if (ctx->buffering_enabled == FLB_TRUE) {
azure_kusto_store_exit(ctx);
}
pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
if (ctx->buffering_enabled == FLB_TRUE){
flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC);
ctx->u->base.net.io_timeout = ctx->io_timeout;
ctx->has_old_buffers = azure_kusto_store_has_data(ctx);
}
if (!ctx->u) {
flb_plg_error(ctx->ins, "upstream creation failed");
return -1;
}

flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));

Expand All @@ -951,6 +1009,14 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create oauth2 context");
if (ctx->buffering_enabled == FLB_TRUE) {
azure_kusto_store_exit(ctx);
}
flb_upstream_destroy(ctx->u);
pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
flb_output_upstream_set(ctx->u, ins);
Expand Down Expand Up @@ -1114,6 +1180,19 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int
return 0;
}

/**
* Buffer a data chunk into the file storage for later ingestion.
*
* Writes the formatted chunk data to the upload file via the store layer.
*
* @param out_context Plugin's context (cast to struct flb_azure_kusto).
* @param upload_file Target file handle for buffered storage.
* @param chunk Data chunk to buffer.
* @param chunk_size Size of the data chunk.
* @param tag Fluent Bit tag associated with the chunk.
* @param tag_len Length of the tag string.
* @return int 0 on success, -1 on failure.
*/
static int buffer_chunk(void *out_context, struct azure_kusto_file *upload_file,
flb_sds_t chunk, int chunk_size,
flb_sds_t tag, size_t tag_len)
Expand Down Expand Up @@ -1533,6 +1612,27 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, auth_type_str),
"Set the authentication type: 'service_principal', 'managed_identity', or 'workload_identity'. "
"For managed_identity, use 'system' as client_id for system-assigned identity, or specify the managed identity's client ID"},
{FLB_CONFIG_MAP_STR, "cloud_name", "AzureCloud", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, cloud_name),
"Set the Azure cloud environment. Supported values: "
"'AzureCloud' (default), 'AzureChinaCloud', 'AzureUSGovernmentCloud'. "
"For private clouds (USSEC, USNAT, BLEU, etc.), set "
"cloud_login_host, cloud_kusto_scope, and cloud_kusto_resource instead"},
{FLB_CONFIG_MAP_STR, "cloud_login_host", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, custom_login_host),
"Custom OAuth login host for private/sovereign clouds "
"(e.g. login.microsoftonline.eaglex.ic.gov). When set, cloud_kusto_scope "
"and cloud_kusto_resource must also be provided"},
{FLB_CONFIG_MAP_STR, "cloud_kusto_scope", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, custom_kusto_scope),
"Custom Kusto OAuth scope for private/sovereign clouds "
"(e.g. https://help.kusto.core.eaglex.ic.gov/.default). When set, "
"cloud_login_host and cloud_kusto_resource must also be provided"},
{FLB_CONFIG_MAP_STR, "cloud_kusto_resource", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, custom_kusto_resource),
"Custom Kusto IMDS resource URL for private/sovereign clouds "
"(e.g. https://api.kusto.core.eaglex.ic.gov/). When set, cloud_login_host "
"and cloud_kusto_scope must also be provided"},
{FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint),
"Set the Kusto cluster's ingestion endpoint URL (e.g. "
Expand Down
49 changes: 44 additions & 5 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,31 @@ typedef enum {
FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY /* Workload Identity */
} flb_azure_kusto_auth_type;

/* Kusto streaming inserts oauth scope */
#define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default"
/* Azure cloud environment types */
typedef enum {
FLB_AZURE_CLOUD_PUBLIC = 0, /* AzureCloud (default) */
FLB_AZURE_CLOUD_CHINA, /* AzureChinaCloud */
FLB_AZURE_CLOUD_US_GOVERNMENT /* AzureUSGovernmentCloud */
} flb_azure_cloud_type;

/* MSAL authorization URL */
/* MSAL authorization URL template: %s = login host, %s = tenant_id */
#define FLB_MSAL_AUTH_URL_TEMPLATE \
"https://login.microsoftonline.com/%s/oauth2/v2.0/token"
"https://%s/%s/oauth2/v2.0/token"

/* Cloud-specific login hosts */
#define FLB_AZURE_LOGIN_HOST_PUBLIC "login.microsoftonline.com"
#define FLB_AZURE_LOGIN_HOST_CHINA "login.chinacloudapi.cn"
#define FLB_AZURE_LOGIN_HOST_US_GOVERNMENT "login.microsoftonline.us"

/* Cloud-specific Kusto scopes */
#define FLB_AZURE_KUSTO_SCOPE_PUBLIC "https://help.kusto.windows.net/.default"
#define FLB_AZURE_KUSTO_SCOPE_CHINA "https://help.kusto.chinacloudapi.cn/.default"
#define FLB_AZURE_KUSTO_SCOPE_US_GOVERNMENT "https://help.kusto.usgovcloudapi.net/.default"

/* Cloud-specific Kusto IMDS resources */
#define FLB_AZURE_KUSTO_RESOURCE_PUBLIC "https://api.kusto.windows.net/"
#define FLB_AZURE_KUSTO_RESOURCE_CHINA "https://api.kusto.chinacloudapi.cn/"
#define FLB_AZURE_KUSTO_RESOURCE_US_GOVERNMENT "https://api.kusto.usgovcloudapi.net/"

#define FLB_AZURE_KUSTO_MGMT_URI_PATH "/v1/rest/mgmt"
#define FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE "{\"csl\":\"%s\", \"db\": \"NetDefaultDB\"}"
Expand All @@ -74,7 +93,6 @@ typedef enum {

#define FLB_AZURE_IMDS_ENDPOINT "/metadata/identity/oauth2/token"
#define FLB_AZURE_IMDS_API_VERSION "2018-02-01"
#define FLB_AZURE_IMDS_RESOURCE "https://api.kusto.windows.net/"


struct flb_azure_kusto_resources {
Expand All @@ -84,6 +102,15 @@ struct flb_azure_kusto_resources {

/* used to reload resouces after some time */
uint64_t load_time;

/* flag to prevent concurrent coroutines from reloading simultaneously */
int loading_in_progress;

/* Old resources pending cleanup - deferred destruction to avoid use-after-free
* when other threads may still be using them during high-volume operations */
struct flb_upstream_ha *old_blob_ha;
struct flb_upstream_ha *old_queue_ha;
flb_sds_t old_identity_token;
};

struct flb_azure_kusto {
Expand All @@ -105,6 +132,18 @@ struct flb_azure_kusto {
char *auth_type_str;
char *workload_identity_token_file;

/* Cloud environment */
int cloud_type;
char *cloud_name;
flb_sds_t kusto_scope;
flb_sds_t kusto_resource;
flb_sds_t login_host;

/* Custom cloud overrides (for private/sovereign clouds like USSEC, USNAT, BLEU) */
flb_sds_t custom_login_host;
flb_sds_t custom_kusto_scope;
flb_sds_t custom_kusto_resource;

/* compress payload */
int compression_enabled;

Expand Down
Loading
Loading