PySpark UPSERT using Deltalake never ends in synapse (related to logging/telemetry on private Microsoft package)

Juan Bagnato 1 Reputation point
2022-02-14T22:54:30.453+00:00

Hello, I have a problem using deltalake format on Azure Synapse Analyics, when doing an upsert, hope you can help me.

I describe the steps:

First I write 4.000.000 records on a new folder on Datalake Gen2 using

df.write.partitionBy("year").mode("overwrite").format("delta").save(fileName)  

and it takes about 45 minutes.

Then in another session I try to write the same 4.000.000 records with UPSERT using the example code:

deltaTable.alias("dimension") \  
                .merge(df_after_etl.alias("updates"), updateCondition) \  
                .whenMatchedUpdate(set = updateMap ) \  
                .whenNotMatchedInsertAll() \  
                .execute()  

And was runnning for more than 40 hours and never ends (nor throws error).

Here I attach images with the situation and the DAG:
This is the big picture:
174252-issue01.jpg
The Job that has 24 stages (as you see it show 100% but no ending)
174263-issue03.jpg
The 24 stages overview:
174237-issue04.jpg
The problem is on task 99
174264-issue05.jpg
And its DAG is:
174265-issue06.jpg
AS you can see, they all are related to a line SynapseLoggingShim.scala:86
When I get the detail:
174238-issue07.jpg
So you can see that this is related to a telemetry package of Microsoft.

scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:86)  
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:72)  
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:201)  
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:107)  
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:92)  
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:201)  
org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:253)  
io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:223)  
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:103)  
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:89)  
io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)  
io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)  
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  
java.lang.reflect.Method.invoke(Method.java:498)  
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)  
py4j.Gateway.invoke(Gateway.java:282)  

Do you know if there's something I can do to avoid this and be capable of using upsert/merge pattern?

Can be a bug inside Microsoft's telemetry package?

Thanks in advance!

Azure Data Lake Storage
Azure Data Lake Storage
An Azure service that provides an enterprise-wide hyper-scale repository for big data analytic workloads and is integrated with Azure Blob Storage.
1,338 questions
.NET
.NET
Microsoft Technologies based on the .NET software framework.
3,369 questions
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,364 questions
{count} votes