question

ernecastro avatar image
0 Votes"
ernecastro asked KranthiPakala-MSFT commented

Azure Databrick Delta Table slow for small (few records) operations

Hi all,

We are using at work Databricks (with Spark Streaming) + Azure Blob Storage + Delta Table to process a streaming from our on premise infrastructure to have an online (just seconds/minutes behind) replica of our on premise databases (SQL Server).

These on premise databases have frequent but small upserts (tens of records / minute), so everytime we get to process a micro batch with Spark to do an upsert into the cloud delta lake, we just get, in general, no more than 100 records to upsert, most of the time, just something between 1 to 10 records per microbatch.

What we have found is that each upsert in the delta lake, takes, at least, 5s. If a micro batch has 1 row, it takes 5s, if it has 10 rows, 5s, if it has 10k rows, also 5s. Above 10k rows, we start seeing higher times (for instance a 100k records microbatch takes 15s to be processed and upsert into the delta lake).

My first question is if it is normal to expect a minium time of 5s per operation against a delta table (we have the delta tables optimized according to Delta best practice) and also we did a test of deleting everything, starting all over again, and we still had 5s per microbatch when we were doing a 1 record upsert into a deltatable that had only 10 records (one parquet file, a few kb in size).

If the answer is no, and we should expect lower times (I'd expect <1s), then what do you suggest we start looking for? We tried moving from Standard to Premium storage accounts and the result is the same.

azure-databricksazure-data-lake-storage
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

KranthiPakala-MSFT avatar image
0 Votes"
KranthiPakala-MSFT answered

Hi @ernecastro,

Welcome to Microsoft Q&A forum and thanks for reaching out.

The short answer is no, it doesn't take a default of 5 secs for every action, Could you please verify if you have a trigger interval set? Or it might be dashboard refresh thing, like are you refreshing your streaming update dashboard every 5 secs? As per my conversation with internal team could you please set the trigger interval to 1 sec and see how it behaves?

Related doc: Structured Streaming Programming Guide - Spark 3.1.2 Documentation (apache.org).

Hope this info helps. Do let us know how it goes.



Please don’t forget to Accept Answer and Up-Vote wherever the information provided helps you, this can be beneficial to other community members.


5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

ernecastro avatar image
0 Votes"
ernecastro answered KranthiPakala-MSFT commented

Hi Kranti,

How are you? Thanks for your response!

The trigger interval is not set (what default to some milliseconds).

To be specific, this is the code:



start_time = datetime.datetime.now()


windowDf = (Window
.partitionBy(fn.col("eventIdPk"))
.orderBy(fn.desc(fn.col("header.eventUpdDateTime"))))

filterValueDF = (sourceDF
.withColumn('data', from_avro('data', dataSchema, fromAvroOptions))
.withColumn("insdatetime", fn.from_unixtime(fn.col("data.InsDateTime") / 1000))
.withColumn("year", fn.year(fn.col("insdatetime")))
.withColumn("month", fn.month(fn.col("insdatetime")))
.withColumn("day", fn.dayofmonth(fn.col("insdatetime")))
.withColumn("eventIdPk", fn.col('header.eventPrimaryKey'))
.select('header', 'data', 'year', 'month', 'day', 'eventIdPk')
)

filterValueDF = (
filterValueDF.withColumn("ranking", fn.row_number().over(windowDf))
.where("ranking = 1")
.drop("ranking")
.drop('eventIdPk')
)

partitions = filterValueDF.select('year', 'month', 'day').distinct().collect()


partitionFilters = map(lambda x: f"(d.year={x['year']} and d.month={x['month']} and d.day={x['day']})", partitions)
partitionFilterString = " OR ".join(partitionFilters)
partitionFilterString = f"({partitionFilterString})"


dtTarget = DeltaTable.forPath(spark, storageLocation)


(dtTarget.alias("d").merge(filterValueDF.alias("src"),
partitionFilterString +
"and d.header.eventPrimaryKey = src.header.eventPrimaryKey"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

execution_time = (datetime.datetime.now() - start_time).total_seconds()



The time that is 5s is the execution_time variable. We do a print of that value and we get 5s, event if the source table and the target table only have a few (less than 10) records and a size of just a couple of kB and they are optimize (meaning just one parquet file).

So I don't consider this to be a problem of trigger or refresh. For the record, the function above is called in a processBatch inside the streaming.

Regards.
Ernesto.

· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Hi @ernecastro,

Thanks for getting back and apologies for delayed response. I did reached out to internal team about this issue but haven't got any clue. For immediate assistance, I would recommend you to file a support ticket if you have a support plan, so that a support engineer can take a deeper look into it. And if you don't have a support plan, please let us know here so that we can work with you offline.

Incase if you file a support ticket, please do share the support ticket number.

Thank you

0 Votes 0 ·

Hi @ernecastro,

We still have not heard back from you. Just wanted to check if you are still facing the issue or need assistance on this? In case If you already found a solution, would you please share it here with the community? Otherwise, let us know and we will continue to engage with you on the issue. Incase if you had filed a support ticket already, could you please do share the support ticket number.

Thanks

1 Vote 1 ·