PySpark UPSERT using Deltalake never ends in synapse (related to logging/telemetry on private Microsoft package)
Hello, I have a problem using deltalake format on Azure Synapse Analyics, when doing an upsert, hope you can help me.
I describe the steps:
First I write 4.000.000 records on a new folder on Datalake Gen2 using
df.write.partitionBy("year").mode("overwrite").format("delta").save(fileName)
and it takes about 45 minutes.
Then in another session I try to write the same 4.000.000 records with UPSERT using the example code:
deltaTable.alias("dimension") \
.merge(df_after_etl.alias("updates"), updateCondition) \
.whenMatchedUpdate(set = updateMap ) \
.whenNotMatchedInsertAll() \
.execute()
And was runnning for more than 40 hours and never ends (nor throws error).
Here I attach images with the situation and the DAG:
This is the big picture:
The Job that has 24 stages (as you see it show 100% but no ending)
The 24 stages overview:
The problem is on task 99
And its DAG is:
AS you can see, they all are related to a line SynapseLoggingShim.scala:86
When I get the detail:
So you can see that this is related to a telemetry package of Microsoft.
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:86)
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:72)
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:201)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:107)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:92)
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:201)
org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:253)
io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:223)
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:103)
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:89)
io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)
io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
Do you know if there's something I can do to avoid this and be capable of using upsert/merge pattern?
Can be a bug inside Microsoft's telemetry package?
Thanks in advance!