Sql to sparksql

Shambhu Rai 1,406 Reputation points
2022-08-08T17:59:17.637+00:00

Hi Expert,

How to convert query from sql server to spark sql

Regards

Azure SQL Database
Azure Blob Storage
Azure Blob Storage
An Azure service that stores unstructured data in the cloud as blobs.
2,415 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,907 questions
SQL Server Analysis Services
SQL Server Analysis Services
A Microsoft online analytical data engine used in decision support and business analytics, providing the analytical data for business reports and client applications such as Power BI, Excel, Reporting Services reports, and other data visualization tools.
1,242 questions
Transact-SQL
Transact-SQL
A Microsoft extension to the ANSI SQL language that includes procedural programming, local variables, and various support functions.
4,547 questions
{count} votes

3 answers

Sort by: Most helpful
  1. Alberto Morillo 32,806 Reputation points MVP
    2022-08-08T20:03:29.367+00:00

    Here you can have a basic guide on how to do it. Here you will also find a basic python code to convert a SQL statement to SparkSQL.


  2. Alberto Morillo 32,806 Reputation points MVP
    2022-08-08T20:14:44.667+00:00

    He shared the python code to make the conversion:

    from moz_sql_parser import parse  
    from moz_sql_parser import format  
    import json  
      
    query = """  
    SELECT product_id,  
        Count(star_rating) as total_rating,  
        Max(star_rating)   AS best_rating,  
        Min(star_rating)   AS worst_rating  
    FROM   tbl_books  
    WHERE  verified_purchase = 'Y'  
        AND review_date BETWEEN '1995-07-22' AND '2015-08-31'  
        AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' )  
    GROUP  BY product_id  
    ORDER  BY total_rating asc,product_id desc,best_rating  
    LIMIT  10;  
    """  
      
    v_parse = parse(query)  
    v_json = json.loads(json.dumps(v_parse,indent=4))  
      
      
    def fn_from(value):  
        result_from=""  
        if type(value) is str:  
            result_from = format({ "from": value })  
            result_from = result_from[5:]  
        elif type(value) is dict:  
            if "name" in value.keys():  
                result_from = result_from + value['value']+".alias(\""+value['name']+"\")"  
            else:  
                result_from = result_from + value['value']+""  
        elif type(value) is list:  
            for item_from in value:  
                if type(item_from) is dict:  
                    if "name" in item_from.keys():  
                        result_from = result_from + item_from['value']+".alias(\""+item_from['name']+"\"),"  
                    else:  
                        result_from = result_from + item_from['value']+","  
                elif type(item_from) is str:  
                    result_from = result_from + item_from+","  
        return result_from  
              
      
    def fn_select(value):  
        result_select=""  
        if type(value) is str:  
            result_select = result_select + "\""+value+"\","  
        elif type(value) is dict:  
            if "name" in value.keys():  
                result_select = result_select + "\""+value['value']+"\".alias(\""+value['name']+"\")"  
            else:  
                result_select = result_select + "\""+value['value']+"\""  
        elif type(value) is list:  
            for item_select in value:  
                if type(item_select) is dict:  
                    if type(item_select['value']) is dict:  
                        if "name" in item_select.keys():  
                            result_select = result_select + "\""+item_select['name']+"\","  
                        else:  
                            result_select = result_select + "\""+item_select['value']+"\".alias(\""+item_select['name']+"\"),"  
                    else:  
                        result_select = result_select + "\""+item_select['value']+"\","  
        return result_select[:-1]  
      
    def fn_where(value):  
        result_where=""  
        result_where = format({ "where": value })[6:]  
        return result_where  
      
      
    def fn_groupby(value):  
        result_groupby=""  
        result_groupby = format({ "groupby": value })[9:]  
        return result_groupby  
      
    def fn_agg(query):  
        v_parse = parse(query)  
        v_agg = ""  
        for i in v_parse["select"]:  
            if type(i["value"]) is dict:  
                for key,value in i["value"].items():  
                    v_agg = v_agg + (key+"("+"col(\""+str(value)+"\")"+").alias('"+i["name"]+"')") +","  
        v_agg = v_agg.replace("\n", "")  
        return v_agg[:-1]  
      
      
    def fn_orderby(query):  
        v_parse = parse(query)  
        v_orderby_collist=""  
        v_orderby = v_parse["orderby"]  
        for i in v_orderby:  
            if i.get("sort", "asc") == "desc":  
                v_sortorder = "desc()"  
            else:  
                v_sortorder = "asc()"  
            v_orderby_collist = v_orderby_collist + "col(\""+str(i.get("value", ""))+"\")." +v_sortorder+","  
        return v_orderby_collist[:-1]  
      
      
    def fn_limit(query):  
        v_parse = parse(query)  
        v_limit = v_parse["limit"]  
        return v_limit  
      
      
    def fn_genSQL(data):  
        v_fn_from = v_fn_where = v_fn_groupby = v_fn_agg = v_fn_select = v_fn_orderby = v_fn_limit = ""  
        for key,value in data.items():  
            # handle from  
            if str(key)=="from":  
                v_fn_from = fn_from(value)  
      
            #handle where  
            if str(key) =="where":  
                v_fn_where = fn_where(value)  
      
            #handle groupby  
            if str(key) =="groupby":  
                v_fn_groupby = fn_groupby(value)  
      
            #handle agg  
            if str(key) =="groupby":  
                v_fn_agg = fn_agg(query)  
      
            #handle select  
            if str(key) =="select":  
                v_fn_select = fn_select(value)  
      
            #handle sort  
            if str(key) =="orderby":  
                v_fn_orderby = fn_orderby(query)  
      
            #handle limit  
            if str(key) =="limit":  
                v_fn_limit = fn_limit(query)  
      
        v_final_stmt = ""  
        if v_fn_from:  
            v_final_stmt = v_final_stmt + v_fn_from  
        if v_fn_where:  
            v_final_stmt = v_final_stmt + "\n.filter(\""+v_fn_where+"\")"  
        if v_fn_groupby:  
            v_final_stmt = v_final_stmt + "\n.groupBy(\""+v_fn_groupby+"\")"  
        if v_fn_agg:  
            v_final_stmt = v_final_stmt + "\n.agg("+v_fn_agg+"\")"  
        if v_fn_select:  
            v_final_stmt = v_final_stmt + "\n.select("+v_fn_select+")"  
        if v_fn_orderby:  
            v_final_stmt = v_final_stmt + "\n.orderBy("+v_fn_orderby+")"  
        if v_fn_limit:  
            v_final_stmt = v_final_stmt + "\n.limit("+str(v_fn_limit)+")"  
          
        return v_final_stmt  
          
      
    print (fn_genSQL(v_json))  
    

  3. Oury Ba-MSFT 16,076 Reputation points Microsoft Employee
    2022-08-08T22:20:46.183+00:00

    @Shambhu Rai Please try something like this

    create table test (newdate string, status string, value string);

    insert into test values('01-07-2020','Newone','segmentone'),('07-07-2020','Newone','segmenwo'),('09-07-2020','Newtwo','segmenthee'),('10-07-2020','Newtwo','segmenthee');

    with t(x) as (select row_number() OVER(ORDER by newdate desc) as rownumber from test)
    select x from t where x ='1'

    Reference: https://spark.apache.org/docs/3.0.0/sql-ref-syntax-ddl-create-table-datasource.html
    https://spark.apache.org/docs/latest/sql-ref-syntax.html

    Regards,
    Oury