question

JBagnato avatar image
0 Votes"
JBagnato asked KiranGali-5736 commented

PySpark UPSERT using Deltalake never ends in synapse (related to logging/telemetry on private Microsoft package)

Hello, I have a problem using deltalake format on Azure Synapse Analyics, when doing an upsert, hope you can help me.

I describe the steps:

First I write 4.000.000 records on a new folder on Datalake Gen2 using

 df.write.partitionBy("year").mode("overwrite").format("delta").save(fileName)

and it takes about 45 minutes.

Then in another session I try to write the same 4.000.000 records with UPSERT using the example code:

 deltaTable.alias("dimension") \
                 .merge(df_after_etl.alias("updates"), updateCondition) \
                 .whenMatchedUpdate(set = updateMap ) \
                 .whenNotMatchedInsertAll() \
                 .execute()

And was runnning for more than 40 hours and never ends (nor throws error).

Here I attach images with the situation and the DAG:
This is the big picture:
174252-issue01.jpg
The Job that has 24 stages (as you see it show 100% but no ending)
174263-issue03.jpg
The 24 stages overview:
174237-issue04.jpg
The problem is on task 99
174264-issue05.jpg
And its DAG is:
174265-issue06.jpg
AS you can see, they all are related to a line SynapseLoggingShim.scala:86
When I get the detail:
174238-issue07.jpg
So you can see that this is related to a telemetry package of Microsoft.

 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:86)
 com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:72)
 org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:201)
 org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:107)
 org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:92)
 org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:201)
 org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:253)
 io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:223)
 org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:103)
 org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:89)
 io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)
 io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:498)
 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 py4j.Gateway.invoke(Gateway.java:282)

Do you know if there's something I can do to avoid this and be capable of using upsert/merge pattern?

Can be a bug inside Microsoft's telemetry package?


Thanks in advance!




azure-synapse-analyticsazure-data-lake-storagedotnet-ml-big-data
issue01.jpg (76.6 KiB)
issue03.jpg (34.0 KiB)
issue04.jpg (115.1 KiB)
issue05.jpg (59.1 KiB)
issue06.jpg (86.2 KiB)
issue07.jpg (125.9 KiB)
· 4
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Hello @JBagnato ,
Thanks for the ask and using the Microsoft Q&A platform .
At this time, we are reaching out to the internal team to get some help on this . We will update you once we hear back from them.
Thanks
Himanshu

0 Votes 0 ·
JBagnato avatar image JBagnato HimanshuSinha-MSFT ·

Hello Himanshu I hope you can help me because I am still blocked due to that "SynapseLoggingShim.scala" file.
I did some more testing with lesser rows (with 100.000 instead of 4 million) but still got stucked.
I do also did testing with more excecutors but It still gets on that state for hours.
174948-issue09.jpg



Thank you

0 Votes 0 ·
issue09.jpg (62.1 KiB)

Hello @JBagnato ,
Just heard back from the internal team and the error is not related to the telemetry code, but the actual delta merge operation.
Spark driver is just waiting to finish the task but the task is lost due to an executor failure or other issue.
If you look into the the spark event log (spark history UI -> download) & spark driver log (and executor logs if possible) , we may find more info .
Thanks
Himanshu

0 Votes 0 ·
Show more comments

0 Answers