want to filter the data of source table on date >= "dd-mm-yyyy" condition on one of the source table in adf pipeline

Amar Agnihotri 891 Reputation points
2022-04-22T09:45:52.577+00:00

Hello i am having a folder in S3 bucket which is containing JSON files. I have created an adf pipeline to copy those JSON files to blob.

I created a source and destination data source of JSON type. I also added the path of folder in the user policy of aws . Please see the below snaps -

This is my folder inside S3 bucket containing JSON files

196002-image.png

This is my source data set in adf

196071-image.png

This is my sink data set in adf -

196022-image.png

You can see here that i have added the resource path of aws to the policy tagged with aws user as shown -

196043-image.png

Its a simple copy pipeline as shown -

196052-image.png
but i am getting this error

196033-image.png

195909-image.png

Its working fine when i changed both source and sink data source type to binary.. See this -

195910-image.png

Please tell am i doing any mistake in data source when i am creating it as a JSON.

Please check where i am wrong.

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
9,599 questions
0 comments No comments
{count} votes

Accepted answer
  1. Nandan Hegde 29,891 Reputation points MVP
    2022-04-22T10:57:54.657+00:00

    Hey,
    You can use the below steps :

    1) define a variable
    2) using lookup file, you would get the table name that would be used for FOR each iteration
    3) within foreach , use set variable activity and set it to 01-01-2019 if the table name is the one that you needs to be filtered else assign it as 01-01-1900
    4) within copy activity
    use the below logic :
    select * from tablename where date> variablename

    {
        "name": "pipeline1",
        "properties": {
            "activities": [
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Set variable2",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@split(pipeline().parameters.lookup,',' )",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Set variable1",
                                "type": "SetVariable",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "delta",
                                    "value": {
                                        "value": "@if(equals(item(),'abc'), '2022-01-01','1900-01-01')",
                                        "type": "Expression"
                                    }
                                }
                            },
                            {
                                "name": "Wait1",
                                "type": "Wait",
                                "dependsOn": [
                                    {
                                        "activity": "Set variable1",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "userProperties": [],
                                "typeProperties": {
                                    "waitTimeInSeconds": 1
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Set variable2",
                    "type": "SetVariable",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "test",
                        "value": {
                            "value": "@split(pipeline().parameters.lookup,',' )",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "parameters": {
                "lookup": {
                    "type": "string",
                    "defaultValue": "abc,def"
                }
            },
            "variables": {
                "delta": {
                    "type": "String"
                },
                "test": {
                    "type": "Array"
                }
            },
            "annotations": []
        }
    }
    

    Latest one (for any tables)

        {
            "name": "pipeline1",
            "properties": {
                "activities": [
                    {
                        "name": "ForEach1",
                        "type": "ForEach",
                        "dependsOn": [
                            {
                                "activity": "Set variable2",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ],
                        "userProperties": [],
                        "typeProperties": {
                            "items": {
                                "value": "@split(pipeline().parameters.lookup,',' )",
                                "type": "Expression"
                            },
                            "isSequential": true,
                            "activities": [
                                {
                                    "name": "Set variable1",
                                    "type": "SetVariable",
                                    "dependsOn": [],
                                    "userProperties": [],
                                    "typeProperties": {
                                        "variableName": "delta",
                                        "value": {
                                            "value": "@if(equals(item(),'abc'), concat('Select * from ',item(),' where date>=''01-01-2019'''),concat('Select * from ',item()))",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                {
                                    "name": "Wait1",
                                    "type": "Wait",
                                    "dependsOn": [
                                        {
                                            "activity": "Set variable1",
                                            "dependencyConditions": [
                                                "Succeeded"
                                            ]
                                        }
                                    ],
                                    "userProperties": [],
                                    "typeProperties": {
                                        "waitTimeInSeconds": 1
                                    }
                                }
                            ]
                        }
                    },
                    {
                        "name": "Set variable2",
                        "type": "SetVariable",
                        "dependsOn": [],
                        "userProperties": [],
                        "typeProperties": {
                            "variableName": "test",
                            "value": {
                                "value": "@split(pipeline().parameters.lookup,',' )",
                                "type": "Expression"
                            }
                        }
                    }
                ],
                "parameters": {
                    "lookup": {
                        "type": "string",
                        "defaultValue": "abc,def"
                    }
                },
                "variables": {
                    "delta": {
                        "type": "String"
                    },
                    "test": {
                        "type": "Array"
                    }
                },
                "annotations": []
            }
        }
    
    0 comments No comments

3 additional answers

Sort by: Most helpful
  1. Amar Agnihotri 891 Reputation points
    2022-04-22T11:15:45.843+00:00

    @Nandan Hegde Thanks . Actually this is my pipeline -

    195535-image.png

    195519-image.png

    195560-image.png

    Please tell me
    1- where should i define a variable and variable should be of type or string.
    2- after defining a variable should i take set variable activity inside for each loop like this -

    195506-image.png

    id step 2 is correct then where should i run this query

    select * from tablename where date> variablename

    Please clear all these steps

    0 comments No comments

  2. Amar Agnihotri 891 Reputation points
    2022-04-22T12:11:52.647+00:00

    @Nandan Hegde ,

    i tried this way

    variable declaration -

    195567-image.png

    Inside foreach loop

    195612-image.png

    inside copy activity i pout this query in copy behaviour

    195526-image.png

    195575-image.png

    but it is not making any effect still all the data of task table is getting copied

    i want this to be as it works in mysql workbench - You can see only jan 2019 on wards data is showing

    195537-image.png


  3. Amar Agnihotri 891 Reputation points
    2022-04-25T10:15:59.617+00:00

    Hi @Nandan Hegde ,
    That query is working now as you can see my changes -
    196059-image.png

    Now facing another issue and that is as yopu can see total tables in sql db is 310 and io am copying only few tables using lookup file . Right now just for testing purpose my lookup file is having only two file name as shown -

    196133-image.png

    And i want to filter only task table because it is very large . But since i passed this query to source so it is running for another table (billing_action) as well and what it is doing? It is saving that table as the same table name "billing_action in blob but the data inside the table is same as that of task table. I mean for both the tables it is copying the data of task table . You can see the size of both the tables is same in blob after copying -

    196060-image.png

    How to get rid of this problem. I have to copy toptal 50 tables from Mysql in blob out of 310 tables of db. I have created lookup file for 50 table but out of those 50 tables i want to filter only task table for date condition. The rest of the 49 tables need to be copied as they are in Mysql db. Hope you understand this new issue.