Databricks OutOfMemory error on code that previously worked without issue

Shane McGarry 0 Reputation points
2024-04-26T22:23:15.07+00:00

I have a notebook in Azure Databricks that does some transformations on a bronze tier table and inserts the transformed data into a silver tier table. This notebook is used to do an initial load of the data from our existing system into our new datalake. The section at issue converts a list of columns from a bronze table and inserts them as rows into a silver table. I've been running this code for weeks without issue: same code, same data, same cluster configuration (no changes). Suddenly, I am now consistently receiving an OutOfMemory error when I execute this particular section of the notebook. The data is not large, and I'm only inserting a dataframe with 6 columns.

I've tried numerous approaches (both code and server configuration) to fix the issue with no luck. Hoping someone can help me out. The original source code is below.


dfCallData = fetchCallSummaryData(useArchive=False) \
                .join(_callMap, on=(col('UniqueCallID') == _callMap.systemCallId) & (_callMap.systemType != lit('NOVA')), how='inner') \
                .select(col('UniqueCallId'), col('tenantId'), col('agentId'), col('channelType'), col('callerPhone'), col('callerName'), col('terminationType'), 
                        col('callDuration'), col('transferDuration'), col('callOutcome'), col('transferReason'), col('endEvent'), col('lastStepName'), col('reportDate'), 
                        col('callInitiatedOn'), col('lastSystemUpdateOn'), col('isFirstTimeCaller'), col('isAfterHours'), col('finalStatus'), col('clk.isArchived'), 
                        col('clk.archivedOn'), col('Conversation'), col('QueueDuration'), col('AgentDuration'), col('TransferTalkDuration'), col('WhisperDuration'), 
                        col('BillingRatePerIVRMinute'), col('BillingRatePerTransferMinute'), col('BillingRatePerCall'), col('BillingGracePeriod'), col('CustomerData'), 
                        col('CustomerDataFlattened'), col('RunMode'), col('SurveyResults'), col('RedactedConversation'), col('Disposition'), col('Col1'), col('Col2'), 
                        col('Col3'), col('Col4'), col('Col5'), col('Col6'), col('Col7'), col('Col8'), col('Pub1'), col('Pub2'), col('Pub3'), col('Pub4'), col('Pub5'), 
                        col('Pub6'), col('Pub7'), col('Pub8'), col('PubBreadCrumbs'), col('PubFlattened'), col('PubLastBreadCrumb'), col('PubIntent'), 
                        col('PubAuthenticated'), col('Transcript'), col('clk.callId'))

pub_cols = ['Pub1', 'Pub2', 'Pub3', 'Pub4', 'Pub5', 'Pub6', 'Pub7', 'Pub8']

# Original method signature 
def processMappedCallData(columns_to_convert:list):
    dfNewMap = spark.read.table('silver.appdata.call_data_map') \
                         .where(col('mapType') == 'columns') \
                         .alias('dm')

    dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
                              .alias('cd')

    dfData = None
    for c in columns_to_convert:
        df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
                          .where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
                          .withColumn('callDataId', lit(None)) \
                          .withColumn('callDataType', lit('columns:mapped')) \
                          .select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),  
                                  col(f'cd.{c}').alias('dataValue'))

    dfData = dfData.union(df) if dfData is not None else df

    return dfData

dfPubCols = processMappedCallData(pub_cols) 
_pipeline.execute_call_data_pipeline(dfPubCols, callDataType='columns')

# Upsert 
def execute_call_data_pipeline(self, dfMappedData:DataFrame, callDataType='columns:mapped'):
        dtCallData = DeltaTable.forName(self._spark, f'{self.get_catalog()}.{self.get_schema()}.call_data')
        dtCallData.alias('old').merge(
                source=dfMappedData.alias('new'),
                condition=expr('old.callDataId = new.callDataId')
        ).whenMatchedUpdate(set=
            {
                'callId': col('new.callId') if 'callId' in dfMappedData.columns else col('old.callId'),
                'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else col('old.callDataMapId'),
                'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else col('old.callDataType'),
                'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else col('old.legacyColumn'),
                'dataValue': col('new.dataValue') if 'dataValue' in dfMappedData.columns else col('old.dataValue'),
                'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else col('old.isEncrypted'),
                'silverUpdateOn': lit(datetime.now(timezone.utc).timestamp())
            }
        ).whenNotMatchedInsert(values=
            {
                'callId': col('new.callId'),
                'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else lit(None),
                'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else lit(callDataType),
                'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else lit(None),
                'dataValue': col('new.dataValue'),
                'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else lit(False),
                'silverCreateOn': lit(datetime.now(timezone.utc).timestamp())
            }
        ).execute()

As one example of the changes I made, here is a change to the processMappedCallData() method to break the data into multiple upsert calls in smaller chunks of ~300M rows rather than one large dataframe of ~2.4B rows. Both the original and this one failed. The error is always the same: java.lang.OutOfMemoryError: Java heap space

def processMappedCallData(columns_to_convert:list):
    dfNewMap = spark.read.table('silver.appdata.call_data_map') \
                         .where(col('mapType') == 'columns') \
                         .alias('dm')

    dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
                              .alias('cd')

    dfData = None
    for c in columns_to_convert:
        df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
                          .where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
                          .withColumn('callDataId', lit(None)) \
                          .withColumn('callDataType', lit('columns:mapped')) \
                          .select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),  
                                  col(f'cd.{c}').alias('dataValue'))
        _pipeline.execute_call_data_pipeline(df, callDataType='columns')
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,942 questions
{count} votes

1 answer

Sort by: Most helpful
  1. PRADEEPCHEEKATLA-MSFT 77,901 Reputation points Microsoft Employee
    2024-04-29T02:35:57.8+00:00

    @Shane McGarry - Thanks for the question and using MS Q&A platform.

    The java.lang.OutOfMemoryError error typically occurs when the JVM heap space is exhausted.

    Based on the information you provided, it seems that the OutOfMemory error is caused by the large size of the dataframe that you are trying to insert into the silver tier table. One possible solution is to increase the memory allocation for your cluster. You can do this by going to the Azure Databricks workspace, selecting the cluster that you are using, and then increasing the values for the "Driver Memory" and "Executor Memory" settings.

    Another possible solution is to optimize your code to reduce the memory usage. For example, you can try breaking up the dataframe into smaller chunks and inserting them into the silver tier table one at a time. You can also try using more efficient data structures and algorithms to reduce the memory footprint of your code.

    You mentioned that you have tried numerous approaches to fix the issue with no luck. Can you please provide more information about the approaches you have tried so far?

    Also, can you please provide more information about the error message you are receiving? It would be helpful if you could provide the full error message.

    Looking forward for your response.

    0 comments No comments