Sql to sparksql

Shambhu Rai 1,406 Reputation points

Hi Expert,

How to convert query from sql server to spark sql


Azure SQL Database
Azure Blob Storage
Azure Blob Storage
An Azure service that stores unstructured data in the cloud as blobs.
2,436 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,935 questions
SQL Server Analysis Services
SQL Server Analysis Services
A Microsoft online analytical data engine used in decision support and business analytics, providing the analytical data for business reports and client applications such as Power BI, Excel, Reporting Services reports, and other data visualization tools.
1,245 questions
A Microsoft extension to the ANSI SQL language that includes procedural programming, local variables, and various support functions.
4,553 questions
{count} votes

3 answers

Sort by: Oldest
  1. Alberto Morillo 32,891 Reputation points MVP

    Here you can have a basic guide on how to do it. Here you will also find a basic python code to convert a SQL statement to SparkSQL.

  2. Alberto Morillo 32,891 Reputation points MVP

    He shared the python code to make the conversion:

    from moz_sql_parser import parse  
    from moz_sql_parser import format  
    import json  
    query = """  
    SELECT product_id,  
        Count(star_rating) as total_rating,  
        Max(star_rating)   AS best_rating,  
        Min(star_rating)   AS worst_rating  
    FROM   tbl_books  
    WHERE  verified_purchase = 'Y'  
        AND review_date BETWEEN '1995-07-22' AND '2015-08-31'  
        AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' )  
    GROUP  BY product_id  
    ORDER  BY total_rating asc,product_id desc,best_rating  
    LIMIT  10;  
    v_parse = parse(query)  
    v_json = json.loads(json.dumps(v_parse,indent=4))  
    def fn_from(value):  
        if type(value) is str:  
            result_from = format({ "from": value })  
            result_from = result_from[5:]  
        elif type(value) is dict:  
            if "name" in value.keys():  
                result_from = result_from + value['value']+".alias(\""+value['name']+"\")"  
                result_from = result_from + value['value']+""  
        elif type(value) is list:  
            for item_from in value:  
                if type(item_from) is dict:  
                    if "name" in item_from.keys():  
                        result_from = result_from + item_from['value']+".alias(\""+item_from['name']+"\"),"  
                        result_from = result_from + item_from['value']+","  
                elif type(item_from) is str:  
                    result_from = result_from + item_from+","  
        return result_from  
    def fn_select(value):  
        if type(value) is str:  
            result_select = result_select + "\""+value+"\","  
        elif type(value) is dict:  
            if "name" in value.keys():  
                result_select = result_select + "\""+value['value']+"\".alias(\""+value['name']+"\")"  
                result_select = result_select + "\""+value['value']+"\""  
        elif type(value) is list:  
            for item_select in value:  
                if type(item_select) is dict:  
                    if type(item_select['value']) is dict:  
                        if "name" in item_select.keys():  
                            result_select = result_select + "\""+item_select['name']+"\","  
                            result_select = result_select + "\""+item_select['value']+"\".alias(\""+item_select['name']+"\"),"  
                        result_select = result_select + "\""+item_select['value']+"\","  
        return result_select[:-1]  
    def fn_where(value):  
        result_where = format({ "where": value })[6:]  
        return result_where  
    def fn_groupby(value):  
        result_groupby = format({ "groupby": value })[9:]  
        return result_groupby  
    def fn_agg(query):  
        v_parse = parse(query)  
        v_agg = ""  
        for i in v_parse["select"]:  
            if type(i["value"]) is dict:  
                for key,value in i["value"].items():  
                    v_agg = v_agg + (key+"("+"col(\""+str(value)+"\")"+").alias('"+i["name"]+"')") +","  
        v_agg = v_agg.replace("\n", "")  
        return v_agg[:-1]  
    def fn_orderby(query):  
        v_parse = parse(query)  
        v_orderby = v_parse["orderby"]  
        for i in v_orderby:  
            if i.get("sort", "asc") == "desc":  
                v_sortorder = "desc()"  
                v_sortorder = "asc()"  
            v_orderby_collist = v_orderby_collist + "col(\""+str(i.get("value", ""))+"\")." +v_sortorder+","  
        return v_orderby_collist[:-1]  
    def fn_limit(query):  
        v_parse = parse(query)  
        v_limit = v_parse["limit"]  
        return v_limit  
    def fn_genSQL(data):  
        v_fn_from = v_fn_where = v_fn_groupby = v_fn_agg = v_fn_select = v_fn_orderby = v_fn_limit = ""  
        for key,value in data.items():  
            # handle from  
            if str(key)=="from":  
                v_fn_from = fn_from(value)  
            #handle where  
            if str(key) =="where":  
                v_fn_where = fn_where(value)  
            #handle groupby  
            if str(key) =="groupby":  
                v_fn_groupby = fn_groupby(value)  
            #handle agg  
            if str(key) =="groupby":  
                v_fn_agg = fn_agg(query)  
            #handle select  
            if str(key) =="select":  
                v_fn_select = fn_select(value)  
            #handle sort  
            if str(key) =="orderby":  
                v_fn_orderby = fn_orderby(query)  
            #handle limit  
            if str(key) =="limit":  
                v_fn_limit = fn_limit(query)  
        v_final_stmt = ""  
        if v_fn_from:  
            v_final_stmt = v_final_stmt + v_fn_from  
        if v_fn_where:  
            v_final_stmt = v_final_stmt + "\n.filter(\""+v_fn_where+"\")"  
        if v_fn_groupby:  
            v_final_stmt = v_final_stmt + "\n.groupBy(\""+v_fn_groupby+"\")"  
        if v_fn_agg:  
            v_final_stmt = v_final_stmt + "\n.agg("+v_fn_agg+"\")"  
        if v_fn_select:  
            v_final_stmt = v_final_stmt + "\n.select("+v_fn_select+")"  
        if v_fn_orderby:  
            v_final_stmt = v_final_stmt + "\n.orderBy("+v_fn_orderby+")"  
        if v_fn_limit:  
            v_final_stmt = v_final_stmt + "\n.limit("+str(v_fn_limit)+")"  
        return v_final_stmt  
    print (fn_genSQL(v_json))  

  3. Oury Ba-MSFT 16,241 Reputation points Microsoft Employee

    @Shambhu Rai Please try something like this

    create table test (newdate string, status string, value string);

    insert into test values('01-07-2020','Newone','segmentone'),('07-07-2020','Newone','segmenwo'),('09-07-2020','Newtwo','segmenthee'),('10-07-2020','Newtwo','segmenthee');

    with t(x) as (select row_number() OVER(ORDER by newdate desc) as rownumber from test)
    select x from t where x ='1'

    Reference: https://spark.apache.org/docs/3.0.0/sql-ref-syntax-ddl-create-table-datasource.html
