I'm convinced that Security teams within every major organisation will be running Azure Data Explorer (ADX) clusters in the near future. In August 2024 the Australian Signals Directorate incorporated M-21-31 logging expectations as the benchmark for all government agencies and enterprises of national interest. To comply with M-21-31 Security teams need to be ingesting Terabytes or Petabytes of log data daily. A columnar database is the only current technology that can address this challenge and Azure Data Explorer's tight integration with Microsoft Sentinel makes it the obvious choice for our emerging Big Data needs.
Azure Data Explorer is an evolution in Microsoft's Security landscape, providing Big Data storage capability previously only availble with Splunk. Sentinel too is changing with the Normalization and the Advanced Security Information Model (ASIM) being an essential step toward Security becoming a 'Big Data' analysis capability.
Being able to use the same hunting queries in Sentinel against large data storage in Azure Data Explorer is one of the reasons that ADX is well suited as the companion of Sentinel. Now that ASIM s entered Public Preview, we need to ensure that we can use ASIM queries against our enormous ADX data store.
ASIM Functions and Parsers
All of the ASIM functions and parsers are available
Library functions are found here: https://github.com/Azure/Azure-Sentinel/tree/master/ASIM/lib/functions
Parsers are found here: https://github.com/Azure/Azure-Sentinel/tree/master/Parsers/ASimAuditEvent/Parsers
There is a dependency function that will need to be created on ADX for _GetWatchlist:
.create-or-alter function with (skipvalidation=true) _GetWatchlist(
['watchlistAlias']:string,
['keys']:dynamic=dynamic([])
)
{
let function = (watchlists:string, keys:dynamic = dynamic([])) {
Watchlist
| where TimeGenerated < now()
| where _DTItemType == 'watchlist-item'
| where WatchlistAlias in (watchlists)
| where array_length(keys) == 0 or SearchKey in (keys)
| summarize hint.shufflekey=_DTItemId arg_max(_DTTimestamp, _DTItemStatus, LastUpdatedTimeUTC, SearchKey, WatchlistItem) by _DTItemId
| where _DTItemStatus != 'Delete'
| project-away _DTTimestamp, _DTItemStatus
};
function(watchlistAlias, keys)
}
KQL Conversion Script
There are some very minor differences between the KQL engine of Log Analytics and that of Azure Data Explorer.
The following PowerShell script can be used for converting the Log Analytics parsers into a variant that can be used with Azure Data Explorer. It will convert each of the parsers and create a "complete" KQL script that combines all the yaml based parsers of the directory.
There are two parameters to set when using the script.
$DirectoryPath is the local directory containing the yaml files you want converted.
$CreateDefaultFunctions is a switch that will also create ADX functions with the default names used b Microsoft Sentinel. This must be set to true for the library functions as some of the parsers default to using the inbult functions and will fail if they don't exist.
# Requires PowerShell module 'powershell-yaml'
# Install using: Install-Module powershell-yaml -Force
param (
[Parameter(Mandatory=$false)]
[string]$DirectoryPath="C:\<path to yaml parser directory>\ASIM\Parsers\ASimWebSession\Parsers",
[Parameter(Mandatory=$false)]
[switch]$CreateDefaultFunctions=$true
)
function Convert-YamlToAdxFunction {
param (
[Parameter(Mandatory=$true)]
[string]$YamlContent,
[Parameter(Mandatory=$false)]
[switch]$IsDefaultFunction=$false
)
# Convert YAML to PowerShell object
$yamlObj = ConvertFrom-Yaml $YamlContent
# Initialize variables
$functionName = if ($IsDefaultFunction -and $yamlObj.EquivalentBuiltInFunction) {
$yamlObj.EquivalentBuiltInFunction
} elseif ($yamlObj.FunctionName) {
$yamlObj.FunctionName
} elseif ($yamlObj.ParserName) {
$yamlObj.ParserName
} else {
throw "No function or parser name found in YAML"
}
$query = if ($yamlObj.FunctionQuery) {
$yamlObj.FunctionQuery.Trim()
} elseif ($yamlObj.ParserQuery) {
$yamlObj.ParserQuery.Trim()
} else {
throw "No query found in YAML"
}
# Build parameters string if they exist
$paramsString = ""
if ($yamlObj.FunctionParams -or $yamlObj.ParserParams) {
$params = if ($yamlObj.FunctionParams) { $yamlObj.FunctionParams } else { $yamlObj.ParserParams }
$paramsList = foreach ($param in $params) {
# Handle special case for table type
$paramType = if ($param.Type -match "table:\(\*\)") {
"(*)" # ADX syntax for table type
} else {
$param.Type
}
# Handle special cases for default values
$defaultValue = if ($null -ne $param.Default) {
$defVal = $param.Default.ToString()
if ($defVal -eq "*") {
"='*'" # Wrap asterisk in quotes
}
elseif ($defVal -match "^datetime\(.*\)$") {
"=$defVal" # Don't wrap datetime functions
}
elseif ($defVal -match "^dynamic\(.*\)$") {
"=$defVal" # Don't wrap dynamic functions
}
elseif ($defVal -match "^int\(.*\)$") {
"=$defVal" # Don't wrap int functions
}
elseif ($defVal -match "^(true|false|True|False)$") {
"=" + $defVal.ToLower() # Convert boolean to lowercase
}
elseif ($defVal -match "^\d+$") {
"=$defVal" # Don't wrap numeric values
}
elseif ($defVal -match "^null$") {
"=$defVal" # Don't wrap null
}
else {
"='$defVal'" # Wrap other string values in quotes
}
} else {
""
}
# Wrap parameter name in square brackets
" ['$($param.Name)']:$paramType$defaultValue"
}
$paramsString = "(" + ($paramsList -join ",`n") + ")"
}
# Build function documentation
$documentation = @"
//
// Function Name: $functionName
// Description: $($yamlObj.Description)
// Version: $($yamlObj.Version)
// Last Updated: $($yamlObj.LastUpdated)
//
"@
# Generate the ADX function creation command with skipvalidation
$adxFunction = @"
$documentation
.create-or-alter function with (skipvalidation=true) $functionName$paramsString
{
$query
}
"@
return $adxFunction
}
try {
# Ensure directory exists
if (-not (Test-Path -Path $DirectoryPath -PathType Container)) {
throw "Directory not found: $DirectoryPath"
}
# Get all YAML files in the directory
$yamlFiles = Get-ChildItem -Path $DirectoryPath -Filter "*.yaml"
if ($yamlFiles.Count -eq 0) {
Write-Warning "No YAML files found in directory: $DirectoryPath"
exit 0
}
# Create array to store all functions
$allFunctions = @()
# Process each YAML file
foreach ($yamlFile in $yamlFiles) {
Write-Host "Processing: $($yamlFile.Name)"
try {
# Read and convert the YAML file
$yamlContent = Get-Content -Path $yamlFile.FullName -Raw
$yamlObj = ConvertFrom-Yaml $yamlContent
# Create the primary function
$adxFunction = Convert-YamlToAdxFunction -YamlContent $yamlContent
# Save individual KQL file
$outputPath = [System.IO.Path]::ChangeExtension($yamlFile.FullName, "kql")
$adxFunction | Out-File -FilePath $outputPath -Encoding UTF8
Write-Host "Created: $($yamlFile.BaseName).kql"
# Add to combined functions array
$allFunctions += $adxFunction
# Check for EquivalentBuiltInFunction
if ($CreateDefaultFunctions -and $yamlObj.EquivalentBuiltInFunction) {
Write-Host "Creating equivalent built-in function: $($yamlObj.EquivalentBuiltInFunction)"
$builtInFunction = Convert-YamlToAdxFunction -YamlContent $yamlContent -IsDefaultFunction
# Save built-in function to separate file
$builtInOutputPath = [System.IO.Path]::ChangeExtension($yamlFile.FullName, ".builtin.kql")
$builtInFunction | Out-File -FilePath $builtInOutputPath -Encoding UTF8
Write-Host "Created: $($yamlFile.BaseName).builtin.kql"
# Add to combined functions array
$allFunctions += "`n`n" # Add spacing between functions
$allFunctions += $builtInFunction
}
$allFunctions += "`n`n" # Add spacing between files
}
catch {
Write-Error "Error processing $($yamlFile.Name): $_"
continue # Continue with next file even if one fails
}
}
# Create combined output file
if ($allFunctions.Count -gt 0) {
$combinedOutputPath = Join-Path $DirectoryPath "_Complete.kql"
$allFunctions | Out-File -FilePath $combinedOutputPath -Encoding UTF8
Write-Host "`nCreated combined file: _Complete.kql"
Write-Host "Total functions processed: $($yamlFiles.Count)"
}
} catch {
Write-Error "Error: $_"
}
When each of the parsers are converted and imported in ADX you'll be able to run ASIM hunting queries across the ADX data warehouse.
ASIM Impact with Azure Data Explorer Schema Planning
As security teams move toward working with Terabytes or Petabytes of data a day data normalisation becomes critical.
- To effectively compliment Microsoft Sentinel, adopting Microsoft's Azure Monitor table schemas for enterprise data is an absolute requirement. Deriving the schema syntax for ADX can be read about here.
- ASIM parsers also leverage a number of "standard columns" such as _ItemId and _ResourceId. The underscore has typically been seen as identifying these record columns as system reserved rather than being core event data. If we want to use ASIM parsers we need to incorporate standard columns into our table schemas.
- I've previously the problem of inconsistency on Sentinel with the TimeGenerated field. As ASIM forces the use of underscored standard columns in ADX, we should probably also incorporate _TimeReceived as the event field that records when ADX receives a record are return to mapping TimeGenerated as the event time with our ingestion functions.
ASIM is a really big step toward Microsoft handling Big Data at scale with it's security capability. Supporting data normalisation with Security Data Warehousing is extremely important and should be a core design factor with ADX deployments for Security teams.
- Log in to post comments