question

AjayNarvekar-0081 avatar image
0 Votes"
AjayNarvekar-0081 asked KranthiPakala-MSFT commented

Error "Stream #default not found" in the azure data factory data flow.

I'm getting following error from whetever component I use after a specific join component in the data flow of data factory:

{"StatusCode":"DFExecutorUserError","Message":"Job failed due to reason: at Select 'Select1'(Line 97/Col 25): Stream #default not found","Details":"at Select 'Select1'(Line 97/Col 25): Stream #default not found"}

This is one of the run IDs I got this error at:

d2f77396-c7a7-451d-910a-a3c2a669808f

azure-data-factory
· 1
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Can you post the DSL script for the dataflow? (find it next to the code button)

0 Votes 0 ·

1 Answer

AjayNarvekar-0081 avatar image
0 Votes"
AjayNarvekar-0081 answered KranthiPakala-MSFT commented

@Kiran-MSFT

I figured out why it was failing, and worked around to get unblocked. Basically, the conditional split component before the component which the aforementioned error was throwing did not have the default output stream specified. I specified one and the error was gone. However, why is it necessary to specify a default output stream, if we don't want to handle the rows not satisfying any of the conditions we specify in the conditional split component? And, if it is necessary, why is it not a mandatory field and can be removed? I'm coming from SSIS background and the SSIS equivalent of conditional split in the data flow has default output stream mandatory and it cannot be removed from the component UI.

I have another conditional split further down the data flow, however I didn't see the same error at this one despite not specifying the default output stream in it. I guess this is because there's no row reaching this component which does not satisfy any of the condition and would go on default output stream.

Per your request, here is the DSL script for your reference:

 source(output(
         DEPARTMENT_ID as decimal(19,0),
         DEPARTMENT_NAME as string,
         DEPARTMENT_DESC as string,
         DELETE_FLAG as string,
         CREATE_DATE as timestamp,
         UPDATE_VERSION as binary
     ),
     allowSchemaDrift: true,
     validateSchema: false,
     isolationLevel: 'READ_COMMITTED',
     format: 'table') ~> departmentDataFromSource
 source(output(
         ITEM_URN as integer,
         ITEM_BIZ_URN as integer,
         PKEY as integer,
         DEPARTMENT_NAME as string,
         DESCRIPTION as string,
         DELETED as boolean,
         EFFECTIVE_FROM as timestamp,
         EFFECTIVE_TO as timestamp
     ),
     allowSchemaDrift: true,
     validateSchema: false,
     isolationLevel: 'READ_COMMITTED',
     format: 'table') ~> departmentDataFromSink
 source(output(
         DEPARTMENT_ID as decimal(19,0),
         DEPARTMENT_NAME as string,
         DEPARTMENT_DESC as string,
         DELETE_FLAG as string,
         CREATE_DATE as timestamp,
         UPDATE_VERSION as binary
     ),
     allowSchemaDrift: true,
     validateSchema: false,
     isolationLevel: 'READ_COMMITTED',
     format: 'table') ~> departmentDataFromSource2
 departmentDataFromSink split(EFFECTIVE_TO == toTimestamp('2050-12-31 23:59:59.000'),
     disjoint: false) ~> CurrentActiveRecords@(CurrentActiveRecords, Default)
 DerivedColumn2, CurrentActiveRecords@CurrentActiveRecords exists(DEPARTMENT_ID_INT == PKEY,
     negate:true,
     broadcast: 'auto')~> NewDepartments
 DerivedColumn1, CurrentActiveRecords@CurrentActiveRecords join(DEPARTMENT_ID_INT == PKEY,
     joinType:'inner',
     broadcast: 'auto')~> JoinWithPreimportedDimensions
 Select1 split(DELETE_FLAG == 'y' && DELETED == false(),
     SOURCE_DEPARTMENT_NAME != SINK_DEPARTMENT_NAME,
     disjoint: false) ~> SCDType1Or2@(Type1, Type2)
 NewDepartments derive(PKEY = toInteger(DEPARTMENT_ID),
         DELETED = iif(DELETE_FLAG == 'y' || DELETE_FLAG == 'Y', true(), false()),
         EFFECTIVE_FROM = CREATE_DATE,
         EFFECTIVE_TO = toTimestamp('2050-12-31 23:59:59.000', 'yyyy-MM-dd HH:mm:ss.SSS', 'UTC')) ~> DerivedColumns
 Type1ToBeUpdated alterRow(updateIf(DEPARTMENT_ID==PKEY&&DELETED!=iif(DELETE_FLAG=='y'||DELETE_FLAG=='Y',true(),false()))) ~> AlterRow
 SCDType1Or2@Type1 select(mapColumn(
         DEPARTMENT_ID,
         DELETE_FLAG,
         PKEY,
         DELETED,
         ITEM_URN
     ),
     skipDuplicateMapInputs: true,
     skipDuplicateMapOutputs: true) ~> Type1ToBeUpdated
 AlterRow derive(EFFECTIVE_TO = currentTimestamp(),
         PKEY = toInteger(DEPARTMENT_ID)) ~> EffectiveTo
 SCDType1Or2@Type2 select(mapColumn(
         DEPARTMENT_ID,
         DEPARTMENT_NAME = SOURCE_DEPARTMENT_NAME,
         DEPARTMENT_DESC,
         DELETE_FLAG,
         ITEM_BIZ_URN
     ),
     skipDuplicateMapInputs: true,
     skipDuplicateMapOutputs: true) ~> ToBeInserted
 SCDType1Or2@Type2 select(mapColumn(
         ITEM_URN,
         PKEY
     ),
     skipDuplicateMapInputs: true,
     skipDuplicateMapOutputs: true) ~> ToBeUpdated
 ToBeInserted derive(EFFECTIVE_FROM = currentTimestamp(),
         EFFECTIVE_TO = toTimestamp('2050-12-31 23:59:59.000', 'yyyy-MM-dd HH:mm:ss.SSS', 'UTC'),
         PKEY = toInteger(DEPARTMENT_ID),
         DELETED = iif(DELETE_FLAG == 'y' || DELETE_FLAG == 'Y', true(), false())) ~> DerivedColumnToBeInserted
 ToBeUpdated derive(EFFECTIVE_TO = currentTimestamp()) ~> DerivedColumnToBeUpdated
 DerivedColumnToBeUpdated alterRow(updateIf(true())) ~> AlterRow1
 JoinWithPreimportedDimensions select(mapColumn(
         DEPARTMENT_ID,
         SOURCE_DEPARTMENT_NAME = departmentDataFromSource@DEPARTMENT_NAME,
         DEPARTMENT_DESC,
         DELETE_FLAG,
         CREATE_DATE,
         UPDATE_VERSION,
         ITEM_URN,
         ITEM_BIZ_URN,
         PKEY,
         SINK_DEPARTMENT_NAME = CurrentActiveRecords@CurrentActiveRecords@DEPARTMENT_NAME,
         DESCRIPTION,
         DELETED,
         EFFECTIVE_FROM,
         EFFECTIVE_TO
     ),
     skipDuplicateMapInputs: true,
     skipDuplicateMapOutputs: true) ~> Select1
 departmentDataFromSource derive(DEPARTMENT_ID_INT = toInteger(DEPARTMENT_ID)) ~> DerivedColumn1
 departmentDataFromSource2 derive(DEPARTMENT_ID_INT = toInteger(DEPARTMENT_ID)) ~> DerivedColumn2
 DerivedColumns sink(allowSchemaDrift: true,
     validateSchema: false,
     input(
         ITEM_URN as integer,
         ITEM_BIZ_URN as integer,
         PKEY as integer,
         DEPARTMENT_NAME as string,
         DESCRIPTION as string,
         DELETED as boolean,
         EFFECTIVE_FROM as timestamp,
         EFFECTIVE_TO as timestamp
     ),
     deletable:false,
     insertable:true,
     updateable:false,
     upsertable:false,
     format: 'table',
     skipDuplicateMapInputs: true,
     errorHandlingOption: 'stopOnFirstError',
     mapColumn(
         PKEY,
         DEPARTMENT_NAME,
         DESCRIPTION = DEPARTMENT_DESC,
         DELETED,
         EFFECTIVE_FROM,
         EFFECTIVE_TO
     )) ~> InsertNewDimensions
 EffectiveTo sink(allowSchemaDrift: true,
     validateSchema: false,
     input(
         ITEM_URN as integer,
         ITEM_BIZ_URN as integer,
         PKEY as integer,
         DEPARTMENT_NAME as string,
         DESCRIPTION as string,
         DELETED as boolean,
         EFFECTIVE_FROM as timestamp,
         EFFECTIVE_TO as timestamp
     ),
     deletable:false,
     insertable:false,
     updateable:true,
     upsertable:false,
     keys:['PKEY','ITEM_URN'],
     skipKeyWrites:true,
     format: 'table',
     skipDuplicateMapInputs: true,
     skipDuplicateMapOutputs: true,
     errorHandlingOption: 'stopOnFirstError',
     mapColumn(
         DELETED,
         EFFECTIVE_TO,
         PKEY,
         ITEM_URN
     )) ~> DeletedDepartments
 DerivedColumnToBeInserted sink(allowSchemaDrift: true,
     validateSchema: false,
     input(
         ITEM_URN as integer,
         ITEM_BIZ_URN as integer,
         PKEY as integer,
         DEPARTMENT_NAME as string,
         DESCRIPTION as string,
         DELETED as boolean,
         EFFECTIVE_FROM as timestamp,
         EFFECTIVE_TO as timestamp
     ),
     deletable:false,
     insertable:true,
     updateable:false,
     upsertable:false,
     format: 'table',
     skipDuplicateMapInputs: true,
     skipDuplicateMapOutputs: true,
     errorHandlingOption: 'stopOnFirstError',
     mapColumn(
         ITEM_BIZ_URN,
         PKEY,
         DEPARTMENT_NAME,
         DESCRIPTION = DEPARTMENT_DESC,
         DELETED,
         EFFECTIVE_FROM,
         EFFECTIVE_TO
     )) ~> UpdatedDepartmentsType2Insertion
 AlterRow1 sink(allowSchemaDrift: true,
     validateSchema: false,
     input(
         ITEM_URN as integer,
         ITEM_BIZ_URN as integer,
         PKEY as integer,
         DEPARTMENT_NAME as string,
         DESCRIPTION as string,
         DELETED as boolean,
         EFFECTIVE_FROM as timestamp,
         EFFECTIVE_TO as timestamp
     ),
     deletable:false,
     insertable:false,
     updateable:true,
     upsertable:false,
     keys:['PKEY','ITEM_URN'],
     skipKeyWrites:true,
     format: 'table',
     skipDuplicateMapInputs: true,
     skipDuplicateMapOutputs: true,
     errorHandlingOption: 'stopOnFirstError',
     mapColumn(
         ITEM_URN,
         PKEY,
         EFFECTIVE_TO
     )) ~> UpdatedDepartmentsType2Update
· 5
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

If you add an extra stream that will be the default. You do not need to give a condition.

  Select1 split(DELETE_FLAG == 'y' && DELETED == false(),
      SOURCE_DEPARTMENT_NAME != SINK_DEPARTMENT_NAME,
      disjoint: false) ~> SCDType1Or2@(Type1, Type2, EverythingElse)








0 Votes 0 ·

Right, I understand what the default output stream is and that it doesn't take any condition. My point was if it is mandatory to specify a default stream, it should be included as a mandatory field in the conditional split component UI. As it's a non-mandatory field and can be removed, it's creating confusion. In my case I don't need to process the rows which do not satisfy any criteria specified in the conditional split component and hence I did not add the default output stream. However, took another day to figure out the cause behind this error is that it is mandatory to specify if you foresee that there can be rows which may not satisfy any criteria.

I'm unblocked here. However, would like to summarize this as a candidate for an enhancement: make the default output stream in conditional split component a mandatory field. Let me know if it is made non-mandatory purposefully.

0 Votes 0 ·

Split may drop rows if the default is not specified. This behavior is as designed. However we can add a UI feature to let the user know of this behavior.

1 Vote 1 ·
Show more comments