question

Cataster-7485 avatar image
0 Votes"
Cataster-7485 asked Cataster-7485 answered

How to generate/retrieve file in datalake using message queue?

I have an Azure function QueueTrigger1 that executes a function executeTemplateProcess to upload a tsv file on Google Drive and update a Jira ticket.

I need to create a Message Queue to generate a tsv file on datalake, run a python code, and then retrieve the tsv file (location) from the datalake and add it to the queue.

I have a basic foundation for the queue today, but i am not sure how to generate the file on the datalake and retrieve its location. We need to pass the file into the python code as input, thats why I am thinking we would need the file location on the datalake to be enqueued, but I am not sure how to perform this.

This is the namespaces for both the QueueTrigger1 and the executeTemplateProcess()

 namespace DI
 {
     public class DIProcess
     {
         public static void executeTemplateProcess(string jiraKey, string jiraIssueType, string jiraSummary, Component component, string jiraDescription)
         {
             if (rowCount > 0)
             {   //python code would run somewhere here following queue process before jira code executes below
                 string dfileId = CopyTemplate(component.FileId, sheetName);
    
                 // stop process if copy template not sucessfull
                 if (string.IsNullOrEmpty(dfileId))
                     return;
    
                 jira.AddComment("Google File copied.");
    
                 // Update JIRA with the web link
                 webLink = $"https://docs.google.com/spreadsheets/d/{dfileId}";
                 jira.AddWebLink(webLink, sheetName);
                 jira.AddComment("Jira weblink added.");
             }
             else
             {
                 jira.UpdateStatus("Abandoned");
                 jira.AddComment("Jira status updated to Abandoned.");
             }
         }
     }
 }   
    
 namespace companyxyzjira.QueueTrigger1
 {
     public static class JiraQueueTrigger
     {
         [FunctionName("QueueTrigger1")]
         public static void Run([QueueTrigger("companyxyz-jira-dev-am", Connection = "storageaccountcompanyxyzji42f6_STORAGE")]string myQueueItem
             , ILogger log, ExecutionContext context)
         {
             dynamic jira;
             string jiraKey;
             string jiraIssueType;
             string jiraSummary;
             string jiraDescription;
             string[] jiraComponentNames;
             Component jiraComponent;
    
             log.LogInformation("Queue trigger function processing");
    
             jira = JsonConvert.DeserializeObject(myQueueItem);
    
             jiraKey = jira.issue.key;
             jiraIssueType = jira.issue.fields.issuetype.name;
             jiraSummary = jira.issue.fields.summary;
             jiraDescription = jira.issue.fields.description;
    
             try
             {
                 DIProcess.executeTemplateProcess(jiraKey, jiraIssueType, jiraSummary, jiraComponent, jiraDescription);
             }
             catch (System.Exception e)
             {
                 log.LogError(e.ToString());
                 log.LogError(e.Message);
                 log.LogError(e.StackTrace);
             }
         }
     }
 }

I suppose this is my line of thinking but I am not sure how to communicate with the datalake...

[FunctionName("HttpTriggerCSharp")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]
HttpRequest req, [Queue("companyxyz-jira-dev-pm-mapping-done")] ICollector<string> QueueItem, ILogger log)
{
log.LogInformation("HTTP trigger function processed a request.");

 string name = req.Query["name"];
    
 string requestBody = String.Empty;
 using (StreamReader streamReader =  new  StreamReader(req.Body))
 {
     requestBody = await streamReader.ReadToEndAsync();
     QueueItem.Add(requestBody); //i think?
 }
 dynamic data = JsonConvert.DeserializeObject(requestBody);
 name = name ?? data?.name;
    
 return name != null
     ? (ActionResult)new OkObjectResult($"{name}")
     : new BadRequestObjectResult("Please pass a name on the query string or in the request body"); }

datalake snapshot with input/output files (uploaded manually but thats what we want to automate from now on so we need to generate/retrieve these artifacts from/to message queue as described above)

datalake








dotnet-csharpazure-functions
image.png (54.4 KiB)
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

PramodValavala-MSFT avatar image
0 Votes"
PramodValavala-MSFT answered PramodValavala-MSFT commented

@Cataster-7485 You can use the Blob Input Binding directly with ADLSv2 thanks to multi-protocol access. For ADLSv1, you would have to use the ADLSv1 C# SDK directly.


· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

@PramodValavala-MSFT
Thank you pramod, I checked out ADSLV2 docs but didnt find code examples. Could you provide me how to utilize ADSL2 with message queue in the context of the code I have in my post?
Thanks!


0 Votes 0 ·

@Cataster-7485 ADLSv2 can be accessed with any HDFS library and supports many blob storage features. This allows for blob storage bindings to be used as is. Check this issue for more details.


0 Votes 0 ·
Cataster-7485 avatar image
0 Votes"
Cataster-7485 answered PramodValavala-MSFT commented

@PramodValavala-MSFT for some reason, its not allowing me to post comment, so im posting as placeholder answer here to continue our conversation

Is this all thats needed to generate/retrieve tsv files on Datalake?

 {
   "bindings": [
     {
       "queueName": "myqueue-items",
       "connection": "MyStorageConnectionAppSetting",
       "name": "myQueueItem",
       "type": "queueTrigger",
       "direction": "in"
     },
     {
       "name": "myInputBlob",
       "type": "blob",
       "path": "samples-workitems/{queueTrigger}",
       "connection": "MyStorageConnectionAppSetting",
       "direction": "in"
     },
     {
       "name": "myOutputBlob",
       "type": "blob",
       "path": "samples-workitems/{queueTrigger}-Copy",
       "connection": "MyStorageConnectionAppSetting",
       "direction": "out"
     }
   ],
   "disabled": false
 }
    
 public static void Run(string myQueueItem, string myInputBlob, out string myOutputBlob, ILogger log)
 {
        
     myOutputBlob = myInputBlob;
 }


· 1
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

@Cataster-7485 Yes. For ADLSv2, that should do the trick with the path updated accordingly. You could also leverage an Event Grid Trigger if required.

0 Votes 0 ·
Cataster-7485 avatar image
0 Votes"
Cataster-7485 answered

@PramodValavala-MSFT
sry to get back late; ive tried out Azure.Storage.Blobs and Azure.Storage.Files.DataLake and they both allow file management on the datalake...
why would i for example choose to code in ASDL2 vs in Azure.Storage.Blobs?
Are they just a multitude of options available? is one better than the other?

The other thing i dont understand in ASDL2 is how can GetDirectoryClient and CreateDirectory lead to the same result? both lines below create a directory!

 DataLakeDirectoryClient dir1 = filesystem.GetDirectoryClient("sample-file");
 dir1.Create();
 DataLakeDirectoryClient Dir2 = filesystem.CreateDirectory("sample-directory");
 Dir2.Create();

81705-image.png

Why would i chose then one over the other?



image.png (8.5 KiB)
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.