The following script provides an example of directly writing to an Azure Data Explorer table using Powershell. For resiliency, the preference for ADX data ingest remains a data connection against an Event Hub (Kafka) but there can be situations where ad-hoc writing is needed.
Prerequisites
Streaming ingestion must be enabled on the cluster level of ADX. This may be done through the portal under Settings -> Configurations.
Secondarily, streaming ingestion must be enabled against an individual table using KQL in the Query plane against a specific database.
.alter table Syslog policy streamingingestion '{"IsEnabled": true}'
You will also need to ensure that the Application you use is granted an Ingestor role permission either to the database or table. Database Ingestor role can be added through the portal on the database Overview -> Permissions -> Add.
This example passes a record to a Syslog table within ADX. Variables will need to be altered (of course) for a deployed ADX.
Of interest, ADX uses JSON Lines for record ingestion.
# Variables
$clusterUrl = "https://securitylogs.australiaeast.kusto.windows.net"
$databaseName = "mydb"
$tableName = "Syslog"
$appId = "e3bd024a-b849-4dd2-8ce6-af88b4323ddf"
$secret = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
$tenantId = "08a3b042-84bd-4cc7-81eb-7d2879fb1d61"
$resource = "https://kusto.kusto.windows.net"
$mappingName = "" # Add your mapping name if needed
# Get Access Token
$body = @{
grant_type = "client_credentials"
client_id = $appId
client_secret = $secret
resource = $resource
}
$response = Invoke-RestMethod -Method Post -Uri "https://login.microsoftonline.com/$tenantId/oauth2/token" -ContentType "application/x-www-form-urlencoded" -Body $body
$token = $response.access_token
$currentTime = Get-Date ([datetime]::UtcNow) -Format O
# JSON record to insert (newline-delimited JSON format)
$jsonRecords = @"
{"TimeGenerated":"$currentTime","EventTime":"$currentTime","Facility":"local0","HostName":"host1","ProcessID":1234,"ProcessName":"process1","SeverityLevel":"info","SourceSystem":"Linux","SyslogMessage":"Test syslog message","TenantId":"tenant1","MG":"mg1","HostIP":"192.168.1.1","Computer":"computer1","CollectorHostName":"collector1","Type":"Syslog"}
"@
# Set headers
$headers = @{
'Content-Type' = 'application/json'
'Authorization' = "Bearer $token"
'Host' = $clusterUrl.Split('/')[2]
'Accept-Encoding' = 'gzip, deflate'
}
# Ingestion URI with optional mapping name
$ingestionUri = "$($clusterUrl)/v1/rest/ingest/$($databaseName)/$($tableName)?streamFormat=Json"
if ($mappingName) {
$ingestionUri += "&mappingName=$mappingName"
}
Write-Output "Ingestion URI: $ingestionUri"
try {
$response = Invoke-RestMethod -Uri $ingestionUri -Method Post -Headers $headers -Body $jsonRecords -Verbose
Write-Output $response
} catch {
Write-Error "Failed to ingest data: $_"
}
- Log in to post comments