question

AmarAgnihotri-6676 avatar image
0 Votes"
AmarAgnihotri-6676 asked AmarAgnihotri-6676 commented

want to filter the data of source table on date >= "dd-mm-yyyy" condition on one of the source table in adf pipeline

Hello i am having a folder in S3 bucket which is containing JSON files. I have created an adf pipeline to copy those JSON files to blob.

I created a source and destination data source of JSON type. I also added the path of folder in the user policy of aws . Please see the below snaps -

This is my folder inside S3 bucket containing JSON files

196002-image.png

This is my source data set in adf

196071-image.png

This is my sink data set in adf -

196022-image.png

You can see here that i have added the resource path of aws to the policy tagged with aws user as shown -

196043-image.png

Its a simple copy pipeline as shown -

196052-image.png
but i am getting this error

196033-image.png

195909-image.png

Its working fine when i changed both source and sink data source type to binary.. See this -

195910-image.png

Please tell am i doing any mistake in data source when i am creating it as a JSON.

Please check where i am wrong.


azure-data-factory
image.png (104.3 KiB)
image.png (81.4 KiB)
image.png (26.7 KiB)
image.png (44.2 KiB)
image.png (38.5 KiB)
image.png (29.7 KiB)
image.png (42.2 KiB)
image.png (32.9 KiB)
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

NandanHegde-7720 avatar image
0 Votes"
NandanHegde-7720 answered NandanHegde-7720 edited

Hey,
You can use the below steps :

1) define a variable
2) using lookup file, you would get the table name that would be used for FOR each iteration
3) within foreach , use set variable activity and set it to 01-01-2019 if the table name is the one that you needs to be filtered else assign it as 01-01-1900
4) within copy activity
use the below logic :
select * from tablename where date> variablename



 {
     "name": "pipeline1",
     "properties": {
         "activities": [
             {
                 "name": "ForEach1",
                 "type": "ForEach",
                 "dependsOn": [
                     {
                         "activity": "Set variable2",
                         "dependencyConditions": [
                             "Succeeded"
                         ]
                     }
                 ],
                 "userProperties": [],
                 "typeProperties": {
                     "items": {
                         "value": "@split(pipeline().parameters.lookup,',' )",
                         "type": "Expression"
                     },
                     "isSequential": true,
                     "activities": [
                         {
                             "name": "Set variable1",
                             "type": "SetVariable",
                             "dependsOn": [],
                             "userProperties": [],
                             "typeProperties": {
                                 "variableName": "delta",
                                 "value": {
                                     "value": "@if(equals(item(),'abc'), '2022-01-01','1900-01-01')",
                                     "type": "Expression"
                                 }
                             }
                         },
                         {
                             "name": "Wait1",
                             "type": "Wait",
                             "dependsOn": [
                                 {
                                     "activity": "Set variable1",
                                     "dependencyConditions": [
                                         "Succeeded"
                                     ]
                                 }
                             ],
                             "userProperties": [],
                             "typeProperties": {
                                 "waitTimeInSeconds": 1
                             }
                         }
                     ]
                 }
             },
             {
                 "name": "Set variable2",
                 "type": "SetVariable",
                 "dependsOn": [],
                 "userProperties": [],
                 "typeProperties": {
                     "variableName": "test",
                     "value": {
                         "value": "@split(pipeline().parameters.lookup,',' )",
                         "type": "Expression"
                     }
                 }
             }
         ],
         "parameters": {
             "lookup": {
                 "type": "string",
                 "defaultValue": "abc,def"
             }
         },
         "variables": {
             "delta": {
                 "type": "String"
             },
             "test": {
                 "type": "Array"
             }
         },
         "annotations": []
     }
 }





Latest one (for any tables)

     {
         "name": "pipeline1",
         "properties": {
             "activities": [
                 {
                     "name": "ForEach1",
                     "type": "ForEach",
                     "dependsOn": [
                         {
                             "activity": "Set variable2",
                             "dependencyConditions": [
                                 "Succeeded"
                             ]
                         }
                     ],
                     "userProperties": [],
                     "typeProperties": {
                         "items": {
                             "value": "@split(pipeline().parameters.lookup,',' )",
                             "type": "Expression"
                         },
                         "isSequential": true,
                         "activities": [
                             {
                                 "name": "Set variable1",
                                 "type": "SetVariable",
                                 "dependsOn": [],
                                 "userProperties": [],
                                 "typeProperties": {
                                     "variableName": "delta",
                                     "value": {
                                         "value": "@if(equals(item(),'abc'), concat('Select * from ',item(),' where date>=''01-01-2019'''),concat('Select * from ',item()))",
                                         "type": "Expression"
                                     }
                                 }
                             },
                             {
                                 "name": "Wait1",
                                 "type": "Wait",
                                 "dependsOn": [
                                     {
                                         "activity": "Set variable1",
                                         "dependencyConditions": [
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "userProperties": [],
                                 "typeProperties": {
                                     "waitTimeInSeconds": 1
                                 }
                             }
                         ]
                     }
                 },
                 {
                     "name": "Set variable2",
                     "type": "SetVariable",
                     "dependsOn": [],
                     "userProperties": [],
                     "typeProperties": {
                         "variableName": "test",
                         "value": {
                             "value": "@split(pipeline().parameters.lookup,',' )",
                             "type": "Expression"
                         }
                     }
                 }
             ],
             "parameters": {
                 "lookup": {
                     "type": "string",
                     "defaultValue": "abc,def"
                 }
             },
             "variables": {
                 "delta": {
                     "type": "String"
                 },
                 "test": {
                     "type": "Array"
                 }
             },
             "annotations": []
         }
     }
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

AmarAgnihotri-6676 avatar image
0 Votes"
AmarAgnihotri-6676 answered

@NandanHegde-7720 Thanks . Actually this is my pipeline -

195535-image.png

195519-image.png

195560-image.png

Please tell me
1- where should i define a variable and variable should be of type or string.
2- after defining a variable should i take set variable activity inside for each loop like this -

195506-image.png

id step 2 is correct then where should i run this query

select * from tablename where date> variablename


Please clear all these steps





image.png (44.0 KiB)
image.png (89.9 KiB)
image.png (55.4 KiB)
image.png (32.8 KiB)
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

AmarAgnihotri-6676 avatar image
0 Votes"
AmarAgnihotri-6676 answered NandanHegde-7720 commented

@NandanHegde-7720 ,

i tried this way

variable declaration -

195567-image.png

Inside foreach loop

195612-image.png

inside copy activity i pout this query in copy behaviour

195526-image.png

195575-image.png

but it is not making any effect still all the data of task table is getting copied

i want this to be as it works in mysql workbench - You can see only jan 2019 on wards data is showing

195537-image.png



image.png (49.8 KiB)
image.png (65.3 KiB)
image.png (91.1 KiB)
image.png (43.0 KiB)
image.png (98.6 KiB)
· 5
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

The expression seems to a pblm :


 Select * from task where  date > '@{variables('variablenm')}'
0 Votes 0 ·

@NandanHegde-7720 Hi

I made this change but it is still not working

195983-image.png

Also tell me it is fine to write the query here in the copy behaviour for the task table as i am doing now-

195907-image.png

I am not able to understand why my task table is not getting filtered


0 Votes 0 ·
image.png (5.2 KiB)
image.png (20.1 KiB)

Hey,
Just for debugging purpose, can you please provide the screenshot /query of the query that is getting created at run time via logs:

196046-image.png

196063-image.png


The Select query can provide us further understanding .

And also try executing this query as is via SSMS and check whether it is providing you necessary output.

0 Votes 0 ·
image.png (8.4 KiB)
image.png (21.8 KiB)
Show more comments
AmarAgnihotri-6676 avatar image
0 Votes"
AmarAgnihotri-6676 answered AmarAgnihotri-6676 commented

Hi @NandanHegde-7720 ,
That query is working now as you can see my changes -
196059-image.png

Now facing another issue and that is as yopu can see total tables in sql db is 310 and io am copying only few tables using lookup file . Right now just for testing purpose my lookup file is having only two file name as shown -

196133-image.png

And i want to filter only task table because it is very large . But since i passed this query to source so it is running for another table (billing_action) as well and what it is doing? It is saving that table as the same table name "billing_action in blob but the data inside the table is same as that of task table. I mean for both the tables it is copying the data of task table . You can see the size of both the tables is same in blob after copying -

196060-image.png

How to get rid of this problem. I have to copy toptal 50 tables from Mysql in blob out of 310 tables of db. I have created lookup file for 50 table but out of those 50 tables i want to filter only task table for date condition. The rest of the 49 tables need to be copied as they are in Mysql db. Hope you understand this new issue.


image.png (44.4 KiB)
image.png (26.6 KiB)
image.png (18.0 KiB)
· 8
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

hey,
i have added a sample pipeline in my original answer.

You can follow somewhat similar logic wherein you can set date variable based on the table name.
For all tables you can set it up as 1900-01-01 except for the one wherein you want to filter

0 Votes 0 ·

Hey @NandanHegde-7720
I understand. You mean to say that for other 49 tables i have to set the variable value to 1900-01-01 and for the task table it should be 2019-01-01. Now i have these points ion my mind . Please correct me if i am wrong.

1)- I will declare variable once and will give default value = 1900-01-01 as shown -
196138-image.png

and will intialize it with 2019-01-01 as shown -

196154-image.png

Now in source query pane which query should be written because if i will write this

select * from task where date >= '@{variables('date')}'

then only select query will run for all the tables in every iteration.
I have to run queries in such a way -

select from booking
select
from billing_action
select * from task where date >= '2019-01-01'

i am not able to understand how will i run multiple query for each dynamic table name and only date condition will be applicable to the task table. Also date column is absent in many other tables so it doesn't make sense to apply such condition on those tables.


0 Votes 0 ·
image.png (32.6 KiB)
image.png (33.0 KiB)

Hey @AmarAgnihotri-6676 ,
Sorry to have missed out on this thread.

Note : I have updated my original answer wherein I have provided a new sample code wherein below is the logic :
@if(equals(item(),'abc'), concat('Select from ',item(),' where date>=''01-01-2019'''),concat('Select from ',item()))


0 Votes 0 ·
Show more comments