Problem Statement

Stream Analytics Jobs are a powerful means of analyzing and distributing incoming data to various target storages or services in Azure and other locations. You can only update the definition of the job, when it is stopped. However, initiating and halting Stream Analytics Jobs can be time-consuming, often requiring several minutes depending on the query’s complexity and the input/output sources.

When it comes to updating the query within an automated process like CI/CD pipelines, waiting for the job to stop, updating it, and then restarting the service can present a significant challenge.

In a recent project, I devised several PowerShell routines to streamline this task.

Step 1: Stoping a Stream Analytics Job Step 2: Updating the job Step 3: Restart the job

Stoping a Stream Analytics Job

[CmdletBinding()]
param (

    [Parameter(Mandatory=$true)]
    [string]$streamAnalyticsJobName,
    
    [Parameter(Mandatory=$true)]
    [string]$resourceGroup
)

# Stop Job
Write-Host "Stop Stream Analytics Job"
Write-Host "- streamAnalyticsJobName: $streamAnalyticsJobName"
Write-Host "- resourceGroup         : $resourceGroup"
Write-Host "- We wait max 5 minutes for job to stop"

$isStopping = $false
$waitSeconds = 5
$counter = 0

# try for 5 minutes to start
do {
  $counter++
  $seconds = $counter * $waitSeconds
  
  $result=az stream-analytics job list --resource-group $resourceGroup --query "[?contains(name, '$streamAnalyticsJobName')].name" --output table
  $count = ($result | Measure-Object).Count
  if ($count -eq 1) {
    # Job not found, it is new
    Write-Host "- Job not found in list. We will create new one."
    break
  }

  # Current Status
  $resultRaw = az stream-analytics job show --job-name $streamAnalyticsJobName --resource-group $resourceGroup

  if ($? -eq $false) {
    # Job not found, it is new
    Write-Host "- Job not found. We will create new one."
    break
  }

  $result = $resultRaw | ConvertFrom-Json
  if ($null -eq $result) {
    # Job not found, it is new
    Write-Host "- Job not found. We will create new one."
    break
  } 

  # Job already exists, get job state

  $jobstate = $result.Jobstate
  Write-Host "- Current Jobstate: $jobstate"

  if ($jobstate -eq 'Stopped' -or $jobstate -eq 'Created' -or $jobstate -eq 'Failed') {
    break
  }

  # Only send stop command once
  if ($isStopping -eq $true) {
    Write-Host "- Job is already stopping, waiting for it to stop"
  } else {
    $isStopping = $true
    Write-Host "- Job is not stopped, stopping"
    az stream-analytics job stop --job-name $streamAnalyticsJobName --resource-group $resourceGroup
  }

  Write-Host "- Still stopping ($seconds seconds passed)"
  Start-Sleep -Seconds $waitSeconds

} while ($seconds -lt 300)

if ($seconds -gt 290) {
  Write-Error "- Job did not stop after 5 minutes"
  return
}

This script attempts to halt the specified job and will wait for 300 seconds, equivalent to 5 minutes, for a response. If the process exceeds this time frame, it indicates that stopping the job has likely failed.

Restarting the job

[CmdletBinding()]
param (
[Parameter(Mandatory=$true)]
[Alias('name')]
[string]$streamAnalyticsJobName,

[Parameter(Mandatory=$true)]
[Alias('rg')] 
[string]$resourceGroup    
)

Write-Host "Start Stream Analytics Job"
Write-Verbose "- streamAnalyticsJobName: $streamAnalyticsJobName"
Write-Verbose "- resourceGroup         : $resourceGroup"

az stream-analytics job start --job-name $streamAnalyticsJobName --resource-group $resourceGroup --no-wait

$counter = 0
$maxRetries = 60 # 60*10 seconds = 6 minutes

$waitTime = $maxRetries * 10
Write-Host "- Job should start within 120 seconds, we try max $waitTime seconds"

do {
$counter++

# Current Status
$result = az stream-analytics job show --job-name $streamAnalyticsJobName --only-show-errors --resource-group $resourceGroup | ConvertFrom-Json

if ($null -eq $result) {
    # Job not found, it is new
    Write-Error "- Job not found, we have a problem"
    break
}

# Job already exists
$jobstate = $result.Jobstate
$seconds = $counter * 10
Write-Host "- Waiting - Current Jobstate: $jobstate ($seconds seconds passed)"

if ($jobstate -eq 'Started') {
    Write-Host "- Job started successfully"
    break
} 

if ($jobstate -eq 'Running') {
    Write-Host "- Job is running and started successfully"
    break
}

if ($jobstate -eq 'Failed') {
    Write-Error "- Job failed to start"
    break
}

Start-Sleep -Seconds 10

} while ($counter -lt $maxRetries) 

if ($counter -eq $maxRetries || $counter -gt $maxRetries) {
Write-Error "- Job did not start successfully after 5 minutes"
return $false
}

return $true    

Stopping and Starging the Job in a Pipeline

The update of the job is done during a Azure DevOps Pipeline:

parameters:
  - name: deploymentName
    type: string
  
  # ...

jobs:
- deployment: ${{ parameters.deploymentName }}
  displayName: ${{ parameters.deploymentTitle }}
  environment: ${{ parameters.environmentName }}
  workspace:
    clean: all
  strategy: 
    runOnce:
      deploy:
        steps:
        - download: current
          displayName: Download Artifacts

        - task: AzureCLI@2
          displayName: Stop ASA Job
          inputs:
            azureSubscription: ${{ parameters.azConnectionName }}
            scriptType: pscore
            scriptPath: $(Pipeline.Workspace)/Stop-StreamAnalyticsJob.ps1
            scriptArguments: >
              -streamAnalyticsJobName ${{ parameters.streamAnalyticsJobName }}
              -resourceGroup ${{ parameters.resourceGroup}}              

        # ... update 
        
        - task: AzureCLI@2
          displayName: Restart ASA Job
          inputs:
            azureSubscription: ${{ parameters.azConnectionName }}
            scriptType: pscore
            scriptPath: $(Pipeline.Workspace)/Start-StreamAnalyticsJob.ps1
            scriptArguments: >
              -streamAnalyticsJobName ${{ parameters.streamAnalyticsJobName }} 
              -resourceGroup ${{ parameters.resourceGroup}}              

Conclusion

With this approach, you are able to wait automatically in a pipeline for Azure Stream Analytics Job to stop and start again, to update the service.