diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index cd3f1c3966b..7c806dc5af7 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -39,6 +39,12 @@ #include "azure_msiauth.h" #include "azure_kusto_store.h" +/** + * Retrieve an OAuth2 access token using Managed Service Identity (MSI). + * + * @param ctx Plugin's context containing the OAuth2 configuration. + * @return int 0 on success, -1 on failure. + */ static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx) { char *token; @@ -46,13 +52,19 @@ static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx) /* Retrieve access token */ token = flb_azure_msiauth_token_get(ctx->o); if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); + flb_plg_error(ctx->ins, "error retrieving oauth2 access token (MSI access token is NULL)"); return -1; } return 0; } +/** + * Retrieve an OAuth2 access token using workload identity federation. + * + * @param ctx Plugin's context containing workload identity configuration. + * @return int 0 on success, -1 on failure. + */ static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx) { int ret; @@ -60,7 +72,8 @@ static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx) ret = flb_azure_workload_identity_token_get(ctx->o, ctx->workload_identity_token_file, ctx->client_id, - ctx->tenant_id); + ctx->tenant_id, + ctx->kusto_scope); if (ret == -1) { flb_plg_error(ctx->ins, "error retrieving workload identity token"); return -1; @@ -70,6 +83,15 @@ static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx) return 0; } +/** + * Retrieve an OAuth2 access token using service principal credentials. + * + * Constructs the OAuth2 payload with client credentials and requests + * an access token from the configured OAuth2 endpoint. + * + * @param ctx Plugin's context containing client credentials and OAuth2 config. + * @return int 0 on success, -1 on failure. + */ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx) { int ret; @@ -83,7 +105,9 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx) return -1; } - ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39); + ret = flb_oauth2_payload_append(ctx->o, "scope", 5, + ctx->kusto_scope, + flb_sds_len(ctx->kusto_scope)); if (ret == -1) { flb_plg_error(ctx->ins, "error appending oauth2 params"); return -1; @@ -100,11 +124,13 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx) flb_plg_error(ctx->ins, "error appending oauth2 params"); return -1; } - /* Retrieve access token */ char *token = flb_oauth2_token_get(ctx->o); if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); + flb_plg_error(ctx->ins, "error retrieving oauth2 access token - " + "check Fluent Bit logs for '[oauth2]' errors " + "(common causes: connection failure to '%s', invalid credentials, " + "or malformed response)", ctx->oauth_url ? ctx->oauth_url : "unknown"); return -1; } @@ -112,13 +138,22 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx) return 0; } +/** + * Obtain the current Azure Kusto bearer token as a formatted string. + * + * Acquires the token mutex, refreshes the token if expired based on the + * configured authentication type, and returns a copy of the token string + * in the format " ". + * + * @param ctx Plugin's context. + * @return flb_sds_t The bearer token string, or NULL on error. + */ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) { int ret = 0; flb_sds_t output = NULL; if (pthread_mutex_lock(&ctx->token_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); return NULL; } @@ -144,6 +179,9 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) flb_sds_len(ctx->o->access_token) + 2); if (!output) { flb_plg_error(ctx->ins, "error creating token buffer"); + if (pthread_mutex_unlock(&ctx->token_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + } return NULL; } flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type, @@ -411,6 +449,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con struct mk_list *tmp; struct mk_list *head; struct mk_list *f_head; + struct mk_list *f_tmp; struct flb_fstore_file *fsf; struct flb_fstore_stream *fs_stream; flb_sds_t payload = NULL; @@ -428,10 +467,15 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con continue; } - mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { + mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; + /* Skip files with no associated chunk data (may happen during shutdown) */ + if (chunk == NULL) { + continue; + } + /* Locked chunks are being processed, skip */ if (chunk->locked == FLB_TRUE) { continue; @@ -893,6 +937,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi if (ret == -1) { flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s", ctx->store_dir); + flb_azure_kusto_conf_destroy(ctx); return -1; } ctx->has_old_buffers = azure_kusto_store_has_data(ctx); @@ -900,14 +945,20 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi /* validate 'total_file_size' */ if (ctx->file_size <= 0) { flb_plg_error(ctx->ins, "Failed to parse upload_file_size"); + azure_kusto_store_exit(ctx); + flb_azure_kusto_conf_destroy(ctx); return -1; } if (ctx->file_size < 1000000) { flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB"); + azure_kusto_store_exit(ctx); + flb_azure_kusto_conf_destroy(ctx); return -1; } if (ctx->file_size > MAX_FILE_SIZE) { flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE); + azure_kusto_store_exit(ctx); + flb_azure_kusto_conf_destroy(ctx); return -1; } @@ -934,15 +985,22 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi * Create upstream context for Kusto Ingestion endpoint */ ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); + if (!ctx->u) { + flb_plg_error(ctx->ins, "upstream creation failed"); + if (ctx->buffering_enabled == FLB_TRUE) { + azure_kusto_store_exit(ctx); + } + pthread_mutex_destroy(&ctx->resources_mutex); + pthread_mutex_destroy(&ctx->token_mutex); + pthread_mutex_destroy(&ctx->blob_mutex); + flb_azure_kusto_conf_destroy(ctx); + return -1; + } if (ctx->buffering_enabled == FLB_TRUE){ flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC); ctx->u->base.net.io_timeout = ctx->io_timeout; ctx->has_old_buffers = azure_kusto_store_has_data(ctx); } - if (!ctx->u) { - flb_plg_error(ctx->ins, "upstream creation failed"); - return -1; - } flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); @@ -951,6 +1009,14 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); if (!ctx->o) { flb_plg_error(ctx->ins, "cannot create oauth2 context"); + if (ctx->buffering_enabled == FLB_TRUE) { + azure_kusto_store_exit(ctx); + } + flb_upstream_destroy(ctx->u); + pthread_mutex_destroy(&ctx->resources_mutex); + pthread_mutex_destroy(&ctx->token_mutex); + pthread_mutex_destroy(&ctx->blob_mutex); + flb_azure_kusto_conf_destroy(ctx); return -1; } flb_output_upstream_set(ctx->u, ins); @@ -1114,6 +1180,19 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int return 0; } +/** + * Buffer a data chunk into the file storage for later ingestion. + * + * Writes the formatted chunk data to the upload file via the store layer. + * + * @param out_context Plugin's context (cast to struct flb_azure_kusto). + * @param upload_file Target file handle for buffered storage. + * @param chunk Data chunk to buffer. + * @param chunk_size Size of the data chunk. + * @param tag Fluent Bit tag associated with the chunk. + * @param tag_len Length of the tag string. + * @return int 0 on success, -1 on failure. + */ static int buffer_chunk(void *out_context, struct azure_kusto_file *upload_file, flb_sds_t chunk, int chunk_size, flb_sds_t tag, size_t tag_len) @@ -1533,6 +1612,27 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, auth_type_str), "Set the authentication type: 'service_principal', 'managed_identity', or 'workload_identity'. " "For managed_identity, use 'system' as client_id for system-assigned identity, or specify the managed identity's client ID"}, + {FLB_CONFIG_MAP_STR, "cloud_name", "AzureCloud", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, cloud_name), + "Set the Azure cloud environment. Supported values: " + "'AzureCloud' (default), 'AzureChinaCloud', 'AzureUSGovernmentCloud'. " + "For private clouds (USSEC, USNAT, BLEU, etc.), set " + "cloud_login_host, cloud_kusto_scope, and cloud_kusto_resource instead"}, + {FLB_CONFIG_MAP_STR, "cloud_login_host", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, custom_login_host), + "Custom OAuth login host for private/sovereign clouds " + "(e.g. login.microsoftonline.eaglex.ic.gov). When set, cloud_kusto_scope " + "and cloud_kusto_resource must also be provided"}, + {FLB_CONFIG_MAP_STR, "cloud_kusto_scope", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, custom_kusto_scope), + "Custom Kusto OAuth scope for private/sovereign clouds " + "(e.g. https://help.kusto.core.eaglex.ic.gov/.default). When set, " + "cloud_login_host and cloud_kusto_resource must also be provided"}, + {FLB_CONFIG_MAP_STR, "cloud_kusto_resource", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, custom_kusto_resource), + "Custom Kusto IMDS resource URL for private/sovereign clouds " + "(e.g. https://api.kusto.core.eaglex.ic.gov/). When set, cloud_login_host " + "and cloud_kusto_scope must also be provided"}, {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint), "Set the Kusto cluster's ingestion endpoint URL (e.g. " diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 5da2d9b5593..d306f2f1073 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -43,12 +43,31 @@ typedef enum { FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY /* Workload Identity */ } flb_azure_kusto_auth_type; -/* Kusto streaming inserts oauth scope */ -#define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default" +/* Azure cloud environment types */ +typedef enum { + FLB_AZURE_CLOUD_PUBLIC = 0, /* AzureCloud (default) */ + FLB_AZURE_CLOUD_CHINA, /* AzureChinaCloud */ + FLB_AZURE_CLOUD_US_GOVERNMENT /* AzureUSGovernmentCloud */ +} flb_azure_cloud_type; -/* MSAL authorization URL */ +/* MSAL authorization URL template: %s = login host, %s = tenant_id */ #define FLB_MSAL_AUTH_URL_TEMPLATE \ - "https://login.microsoftonline.com/%s/oauth2/v2.0/token" + "https://%s/%s/oauth2/v2.0/token" + +/* Cloud-specific login hosts */ +#define FLB_AZURE_LOGIN_HOST_PUBLIC "login.microsoftonline.com" +#define FLB_AZURE_LOGIN_HOST_CHINA "login.chinacloudapi.cn" +#define FLB_AZURE_LOGIN_HOST_US_GOVERNMENT "login.microsoftonline.us" + +/* Cloud-specific Kusto scopes */ +#define FLB_AZURE_KUSTO_SCOPE_PUBLIC "https://help.kusto.windows.net/.default" +#define FLB_AZURE_KUSTO_SCOPE_CHINA "https://help.kusto.chinacloudapi.cn/.default" +#define FLB_AZURE_KUSTO_SCOPE_US_GOVERNMENT "https://help.kusto.usgovcloudapi.net/.default" + +/* Cloud-specific Kusto IMDS resources */ +#define FLB_AZURE_KUSTO_RESOURCE_PUBLIC "https://api.kusto.windows.net/" +#define FLB_AZURE_KUSTO_RESOURCE_CHINA "https://api.kusto.chinacloudapi.cn/" +#define FLB_AZURE_KUSTO_RESOURCE_US_GOVERNMENT "https://api.kusto.usgovcloudapi.net/" #define FLB_AZURE_KUSTO_MGMT_URI_PATH "/v1/rest/mgmt" #define FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE "{\"csl\":\"%s\", \"db\": \"NetDefaultDB\"}" @@ -74,7 +93,6 @@ typedef enum { #define FLB_AZURE_IMDS_ENDPOINT "/metadata/identity/oauth2/token" #define FLB_AZURE_IMDS_API_VERSION "2018-02-01" -#define FLB_AZURE_IMDS_RESOURCE "https://api.kusto.windows.net/" struct flb_azure_kusto_resources { @@ -84,6 +102,15 @@ struct flb_azure_kusto_resources { /* used to reload resouces after some time */ uint64_t load_time; + + /* flag to prevent concurrent coroutines from reloading simultaneously */ + int loading_in_progress; + + /* Old resources pending cleanup - deferred destruction to avoid use-after-free + * when other threads may still be using them during high-volume operations */ + struct flb_upstream_ha *old_blob_ha; + struct flb_upstream_ha *old_queue_ha; + flb_sds_t old_identity_token; }; struct flb_azure_kusto { @@ -105,6 +132,18 @@ struct flb_azure_kusto { char *auth_type_str; char *workload_identity_token_file; + /* Cloud environment */ + int cloud_type; + char *cloud_name; + flb_sds_t kusto_scope; + flb_sds_t kusto_resource; + flb_sds_t login_host; + + /* Custom cloud overrides (for private/sovereign clouds like USSEC, USNAT, BLEU) */ + flb_sds_t custom_login_host; + flb_sds_t custom_kusto_scope; + flb_sds_t custom_kusto_resource; + /* compress payload */ int compression_enabled; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 23db8fac647..c7787e4e0d2 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -146,6 +146,22 @@ static int flb_azure_kusto_resources_clear(struct flb_azure_kusto_resources *res resources->identity_token = NULL; } + /* Also clean up any old resources pending destruction */ + if (resources->old_blob_ha) { + flb_upstream_ha_destroy(resources->old_blob_ha); + resources->old_blob_ha = NULL; + } + + if (resources->old_queue_ha) { + flb_upstream_ha_destroy(resources->old_queue_ha); + resources->old_queue_ha = NULL; + } + + if (resources->old_identity_token) { + flb_sds_destroy(resources->old_identity_token); + resources->old_identity_token = NULL; + } + resources->load_time = 0; return 0; @@ -548,14 +564,48 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, flb_plg_debug(ctx->ins, "difference is %" PRIu64, now - ctx->resources->load_time); flb_plg_debug(ctx->ins, "effective ingestion resource interval is %d", ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer); + /* Acquire the mutex for the staleness check to prevent concurrent coroutines + * from both deciding to reload and rotate resources simultaneously. + * Without this, two flushes can race: the second rotation destroys old_blob_ha + * which is still being used by the first flush's in-flight blob upload, + * causing a use-after-free SIGSEGV in cmt_gauge_inc. + */ + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking resources mutex for staleness check"); + return -1; + } + /* check if we have all resources and they are not stale */ if (ctx->resources->blob_ha && ctx->resources->queue_ha && ctx->resources->identity_token && now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer) { flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); + pthread_mutex_unlock(&ctx->resources_mutex); ret = 0; } + else if (ctx->resources->loading_in_progress) { + /* + * Another coroutine is already loading resources. Return error to + * trigger FLB_RETRY so this flush will be retried once the resources + * are available. This prevents concurrent rotations that caused + * use-after-free SIGSEGV. + */ + flb_plg_info(ctx->ins, "ingestion resources loading already in progress by another coroutine, will retry"); + pthread_mutex_unlock(&ctx->resources_mutex); + /* Return -1 directly without going through cleanup, since cleanup + * would reset load_time and loading_in_progress which belong to + * the coroutine that is actually doing the loading. + */ + return -1; + } else { + /* + * Mark loading in progress (while holding mutex) to prevent other + * concurrent coroutines from also starting a reload. + */ + ctx->resources->loading_in_progress = FLB_TRUE; + /* Release the mutex before making network calls (which may yield the coroutine) */ + pthread_mutex_unlock(&ctx->resources_mutex); flb_plg_info(ctx->ins, "loading kusto ingestion resources and refresh interval is %d", ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer); response = execute_ingest_csl_command(ctx, ".get ingestion resources"); @@ -598,16 +648,57 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, parse_ingestion_identity_token(ctx, response); if (identity_token) { + /* + Deferred cleanup: destroy resources from two refresh cycles ago, + then move current resources to 'old' before assigning new ones. + This avoids use-after-free when other threads may still be using + the current resources during high-volume operations. + + With a 1-hour refresh interval, the race condition requires an + ingest operation to take >1 hour (the deferred cleanup grace period). + This is extremely unlikely under normal conditions (and hence a lock based + mechanism is avoided for performance). + */ + if (ctx->resources->old_blob_ha) { + flb_upstream_ha_destroy(ctx->resources->old_blob_ha); + flb_plg_debug(ctx->ins, "clearing up old blob HA"); + } + if (ctx->resources->old_queue_ha) { + flb_upstream_ha_destroy(ctx->resources->old_queue_ha); + flb_plg_debug(ctx->ins, "clearing up old queue HA"); + } + if (ctx->resources->old_identity_token) { + flb_sds_destroy(ctx->resources->old_identity_token); + flb_plg_debug(ctx->ins, "clearing up old identity token"); + } + + /* Move current to old */ + ctx->resources->old_blob_ha = ctx->resources->blob_ha; + ctx->resources->old_queue_ha = ctx->resources->queue_ha; + ctx->resources->old_identity_token = ctx->resources->identity_token; + + /* Assign new resources */ ctx->resources->blob_ha = blob_ha; ctx->resources->queue_ha = queue_ha; ctx->resources->identity_token = identity_token; ctx->resources->load_time = now; + flb_plg_info(ctx->ins, "ingestion resources rotated successfully, " + "previous resources moved to deferred cleanup"); + + /* Clear the loading flag on success */ + ctx->resources->loading_in_progress = FLB_FALSE; ret = 0; } else { flb_plg_error(ctx->ins, "error parsing ingestion identity token"); + /* + * Must unlock the mutex before goto cleanup to avoid + * deadlock (cleanup also acquires the mutex to reset + * load_time). This was a pre-existing bug. + */ + pthread_mutex_unlock(&ctx->resources_mutex); ret = -1; goto cleanup; } @@ -665,6 +756,16 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, cleanup: if (ret == -1) { + /* + * Reset load_time to 0 so the next call will retry the reload. + * We set load_time = now at the start to prevent concurrent reloads, + * but if the reload failed, we need to allow future retries. + */ + if (pthread_mutex_lock(&ctx->resources_mutex) == 0) { + ctx->resources->load_time = 0; + ctx->resources->loading_in_progress = FLB_FALSE; + pthread_mutex_unlock(&ctx->resources_mutex); + } if (queue_ha) { flb_upstream_ha_destroy(queue_ha); } @@ -700,6 +801,115 @@ static int flb_azure_kusto_resources_destroy(struct flb_azure_kusto_resources *r return 0; } +/** + * Resolves cloud-specific endpoints based on the cloud_name configuration. + * Sets kusto_scope, kusto_resource, and login_host in the context. + * + * @param ctx Pointer to the plugin's context + * @return int 0 on success, -1 on failure + */ +static int azure_kusto_resolve_cloud_endpoints(struct flb_azure_kusto *ctx) +{ + const char *scope = NULL; + const char *resource = NULL; + const char *login_host = NULL; + int has_custom_login_host; + int has_custom_scope; + int has_custom_resource; + + /* Check if custom overrides are provided */ + has_custom_login_host = (ctx->custom_login_host && flb_sds_len(ctx->custom_login_host) > 0); + has_custom_scope = (ctx->custom_kusto_scope && flb_sds_len(ctx->custom_kusto_scope) > 0); + has_custom_resource = (ctx->custom_kusto_resource && flb_sds_len(ctx->custom_kusto_resource) > 0); + + /* If all three custom properties are provided, use them directly */ + if (has_custom_login_host && has_custom_scope && has_custom_resource) { + ctx->login_host = flb_sds_create(ctx->custom_login_host); + if (!ctx->login_host) { + flb_errno(); + return -1; + } + + ctx->kusto_scope = flb_sds_create(ctx->custom_kusto_scope); + if (!ctx->kusto_scope) { + flb_errno(); + return -1; + } + + ctx->kusto_resource = flb_sds_create(ctx->custom_kusto_resource); + if (!ctx->kusto_resource) { + flb_errno(); + return -1; + } + + flb_plg_info(ctx->ins, + "using custom cloud endpoints: login_host='%s', scope='%s', resource='%s'", + ctx->login_host, ctx->kusto_scope, ctx->kusto_resource); + return 0; + } + + /* If some but not all custom properties are set, error out */ + if (has_custom_login_host || has_custom_scope || has_custom_resource) { + flb_plg_error(ctx->ins, + "When using custom cloud endpoints, all three properties must be set: " + "cloud_login_host, cloud_kusto_scope, cloud_kusto_resource"); + return -1; + } + + /* Resolve from well-known cloud names */ + if (!ctx->cloud_name || strcasecmp(ctx->cloud_name, "AzureCloud") == 0) { + ctx->cloud_type = FLB_AZURE_CLOUD_PUBLIC; + scope = FLB_AZURE_KUSTO_SCOPE_PUBLIC; + resource = FLB_AZURE_KUSTO_RESOURCE_PUBLIC; + login_host = FLB_AZURE_LOGIN_HOST_PUBLIC; + } + else if (strcasecmp(ctx->cloud_name, "AzureChinaCloud") == 0) { + ctx->cloud_type = FLB_AZURE_CLOUD_CHINA; + scope = FLB_AZURE_KUSTO_SCOPE_CHINA; + resource = FLB_AZURE_KUSTO_RESOURCE_CHINA; + login_host = FLB_AZURE_LOGIN_HOST_CHINA; + } + else if (strcasecmp(ctx->cloud_name, "AzureUSGovernmentCloud") == 0) { + ctx->cloud_type = FLB_AZURE_CLOUD_US_GOVERNMENT; + scope = FLB_AZURE_KUSTO_SCOPE_US_GOVERNMENT; + resource = FLB_AZURE_KUSTO_RESOURCE_US_GOVERNMENT; + login_host = FLB_AZURE_LOGIN_HOST_US_GOVERNMENT; + } + else { + flb_plg_error(ctx->ins, + "Unknown cloud_name '%s'. Use a well-known cloud name " + "('AzureCloud', 'AzureChinaCloud', 'AzureUSGovernmentCloud') " + "or specify custom endpoints via " + "cloud_login_host, cloud_kusto_scope, and cloud_kusto_resource", + ctx->cloud_name); + return -1; + } + + ctx->kusto_scope = flb_sds_create(scope); + if (!ctx->kusto_scope) { + flb_errno(); + return -1; + } + + ctx->kusto_resource = flb_sds_create(resource); + if (!ctx->kusto_resource) { + flb_errno(); + return -1; + } + + ctx->login_host = flb_sds_create(login_host); + if (!ctx->login_host) { + flb_errno(); + return -1; + } + + flb_plg_info(ctx->ins, "cloud environment='%s', login_host='%s', scope='%s'", + ctx->cloud_name ? ctx->cloud_name : "AzureCloud", + ctx->login_host, ctx->kusto_scope); + + return 0; +} + struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *ins, struct flb_config *config) { @@ -722,6 +932,14 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } + /* Resolve cloud-specific endpoints */ + ret = azure_kusto_resolve_cloud_endpoints(ctx); + if (ret == -1) { + flb_plg_error(ins, "failed to resolve cloud endpoints"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + /* Auth method validation and setup */ if (strcasecmp(ctx->auth_type_str, "service_principal") == 0) { ctx->auth_type = FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL; @@ -802,30 +1020,35 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * /* MSI auth */ /* Construct the URL template with or without client_id for managed identity */ if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM) { - ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1); + ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 + + flb_sds_len(ctx->kusto_resource)); if (!ctx->oauth_url) { flb_errno(); flb_azure_kusto_conf_destroy(ctx); return NULL; } flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), - FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", ""); + FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", "", + ctx->kusto_resource); } else { /* User-assigned managed identity */ ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 + sizeof("&client_id=") - 1 + - flb_sds_len(ctx->client_id)); + flb_sds_len(ctx->client_id) + + flb_sds_len(ctx->kusto_resource)); if (!ctx->oauth_url) { flb_errno(); flb_azure_kusto_conf_destroy(ctx); return NULL; } flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), - FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->client_id); + FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", + ctx->client_id, ctx->kusto_resource); } } else { /* Standard OAuth2 for service principal or workload identity */ ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 + + flb_sds_len(ctx->login_host) + flb_sds_len(ctx->tenant_id)); if (!ctx->oauth_url) { flb_errno(); @@ -833,7 +1056,8 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), - FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); + FLB_MSAL_AUTH_URL_TEMPLATE, ctx->login_host, + ctx->tenant_id); } ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources)); @@ -857,6 +1081,21 @@ int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx) flb_plg_info(ctx->ins, "before exiting the plugin kusto conf destroy called"); + if (ctx->kusto_scope) { + flb_sds_destroy(ctx->kusto_scope); + ctx->kusto_scope = NULL; + } + + if (ctx->kusto_resource) { + flb_sds_destroy(ctx->kusto_resource); + ctx->kusto_resource = NULL; + } + + if (ctx->login_host) { + flb_sds_destroy(ctx->login_host); + ctx->login_host = NULL; + } + if (ctx->oauth_url) { flb_sds_destroy(ctx->oauth_url); ctx->oauth_url = NULL; diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 0a187a119aa..c3ef112a42d 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -188,13 +188,14 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t goto cleanup; } + if (uri) { - flb_plg_debug(ctx->ins, "azure_kusto: before calling azure storage api :: value of set io_timeout is %d", u_conn->net->io_timeout); flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0, NULL, 0); if (c) { + flb_http_buffer_size(c, 0); flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); flb_http_add_header(c, "Content-Type", 12, "application/json", 16); flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); @@ -451,6 +452,7 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t flb_sds_len(payload), NULL, 0, NULL, 0); if (c) { + flb_http_buffer_size(c, 0); flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); flb_http_add_header(c, "Content-Type", 12, "application/atom+xml", 20); diff --git a/plugins/out_azure_kusto/azure_msiauth.c b/plugins/out_azure_kusto/azure_msiauth.c index 884f850dd58..5e5064e565f 100644 --- a/plugins/out_azure_kusto/azure_msiauth.c +++ b/plugins/out_azure_kusto/azure_msiauth.c @@ -62,6 +62,9 @@ char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx) return NULL; } + /* Allow response buffer to grow as needed */ + flb_http_buffer_size(c, 0); + /* Append HTTP Header */ flb_http_add_header(c, "Metadata", 8, "true", 4); @@ -135,7 +138,9 @@ static flb_sds_t read_token_from_file(const char *token_file) return token; } -int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, const char *client_id, const char *tenant_id) +int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, + const char *client_id, const char *tenant_id, + const char *scope) { int ret; size_t b_sent; @@ -158,7 +163,7 @@ int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *to return -1; } - flb_info("[azure workload identity] after read token from file %s", federated_token); + flb_debug("[azure workload identity] federated token read successfully from file"); /* Build the form data for token exchange *before* creating the client */ body = flb_sds_create_size(4096); @@ -169,23 +174,48 @@ int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *to } body = flb_sds_cat(body, "client_id=", 10); + if (!body) { + goto body_error; + } body = flb_sds_cat(body, client_id, strlen(client_id)); - /* Use the correct grant_type and length for workload identity */ + if (!body) { + goto body_error; + } body = flb_sds_cat(body, "&grant_type=client_credentials", 30); + if (!body) { + goto body_error; + } body = flb_sds_cat(body, "&client_assertion_type=urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 77); + if (!body) { + goto body_error; + } body = flb_sds_cat(body, "&client_assertion=", 18); + if (!body) { + goto body_error; + } body = flb_sds_cat(body, federated_token, flb_sds_len(federated_token)); - /* Use the correct scope and length for Kusto */ - body = flb_sds_cat(body, "&scope=https://help.kusto.windows.net/.default", 46); - if (!body) { - /* This check might be redundant if flb_sds_cat handles errors, but safe */ - flb_error("[azure workload identity] failed to build request body"); - flb_sds_destroy(federated_token); - return -1; + goto body_error; + } + /* Use the cloud-specific scope for Kusto */ + body = flb_sds_cat(body, "&scope=", 7); + if (!body) { + goto body_error; + } + body = flb_sds_cat(body, scope, strlen(scope)); + if (!body) { + goto body_error; } /* Get upstream connection to Azure AD token endpoint */ + goto body_ok; + +body_error: + flb_error("[azure workload identity] failed to build request body (OOM)"); + flb_sds_destroy(federated_token); + return -1; + +body_ok: u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { flb_error("[azure workload identity] could not get an upstream connection"); @@ -206,6 +236,9 @@ int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *to return -1; } + /* Allow response buffer to grow as needed */ + flb_http_buffer_size(c, 0); + /* Prepare token exchange request headers */ flb_http_add_header(c, "Content-Type", 12, "application/x-www-form-urlencoded", 33); @@ -213,8 +246,8 @@ int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *to /* c->body_buf = body; */ /* c->body_len = flb_sds_len(body); */ - /* Add a debug log to verify the body content just before sending */ - flb_debug("[azure workload identity] Sending request body (len=%zu): %s", flb_sds_len(body), body); + /* Log only the body length, not the content (body contains sensitive credentials) */ + flb_debug("[azure workload identity] sending token exchange request (body_len=%zu)", flb_sds_len(body)); /* Issue request */ ret = flb_http_do(c, &b_sent); @@ -269,4 +302,4 @@ int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *to /* body already destroyed */ return -1; -} +} \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_msiauth.h b/plugins/out_azure_kusto/azure_msiauth.h index b36e0700a7d..af5233ce167 100644 --- a/plugins/out_azure_kusto/azure_msiauth.h +++ b/plugins/out_azure_kusto/azure_msiauth.h @@ -19,10 +19,12 @@ #include -/* MSAL authorization URL */ +/* MSAL authorization URL template: %s%s = optional client_id param, %s = resource URL */ #define FLB_AZURE_MSIAUTH_URL_TEMPLATE \ - "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2021-02-01%s%s&resource=https://api.kusto.windows.net" + "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2021-02-01%s%s&resource=%s" char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx); -int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, const char *client_id, const char *tenant_id); +int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, + const char *client_id, const char *tenant_id, + const char *scope);