watermark keeps increasing and event are not processed in Stream Analytics

Audrius 1 Reputation point
2023-08-08T08:38:14.5+00:00

I have IoT package data coming in from Event Hub to Stream Analytics and then outputted to a Delta Lake

before switching to Delta Lake, I had output as CSV and it worked fine, no watermark delays, resource utilization was stable and low.

After switching to output type Delta Lake (preview) - data stopped being processed and watermark kept increasing. All the spike drops in the graph are manual restarts of SA, because that's then it actually processes data and then watermark starts rising again. after restart - it has no problem processing data from past day almost instantly, so I don't think it's related to streaming units
User's image

Other metrics show no indications (or at least I cannot see it) - there are no errors, CPU utilization is low, there are no early, late or out of order events.

I'm using 1 SU (V1 pricing SKU, lowest option) but I did try to scale up to 3 SUs and result was the same. scaling is not enabled.

Environment: standard (multi-tenant)

Compatibility level: 1.2

Error policy: Retry

Input:

Event Hub, Json format, no compression.

Data is usually comes in in batches of up to 1k events (event hub still receives each event individually). data is also not real time - timestamps could be from few days ago.

Query:

    WITH NewInput AS
    (
    SELECT
        udf.transformInput([eventhub-input]) AS UpdatedJson
    FROM
        [eventhub-input]
    )

    SELECT
        UpdatedJson.*
    INTO
        [output-deltalake-type1]
    FROM
        NewInput
    WHERE UpdatedJson.DocumentType = 'type1'

    SELECT
        UpdatedJson.*
    INTO
        [output-deltalake-type2]
    FROM
        NewInput
    WHERE UpdatedJson.DocumentType = 'type2'

	SELECT          
		UpdatedJson.*      
	INTO          
		[output-deltalake-type3]      
	FROM          
		NewInput      
	WHERE UpdatedJson.DocumentType = 'type3'

all 3 types have different schemas so they have are outputted to different paths within same storage account container

Job simulation says that this job is parallel.

udf.transformInput removes some fields from input, and does some other small updates.

Output

all output configuration is identical except for delta table path

type: Blob storage/ADLS Gen2

serialization format: Delta Lake (preview)
delta table path: data/{type1-3}

Partition column: userId

minimum rows: 10000

maximum time: 1 hour

Am I missing something in my configuration? any help would be appreciated

Azure Stream Analytics
Azure Stream Analytics
An Azure real-time analytics service designed for mission-critical workloads.
333 questions
{count} votes