Share via


Azure Synapse Link'te Apache Spark 2 kullanarak Azure Cosmos DB ile etkileşim kurma

Not

Spark 3 kullanarak Azure Cosmos DB için Azure Synapse Bağlantısı için Spark 3'te Azure Cosmos DB için Bağlantı Azure Synapse bu makaleye bakın

Bu makalede Synapse Apache Spark 2 kullanarak Azure Cosmos DB ile etkileşim kurmayı öğreneceksiniz. Scala, Python, SparkSQL ve C# desteğiyle Synapse Apache Spark, Azure Cosmos DB için Azure Synapse Link'te analiz, veri mühendisliği, veri bilimi ve veri araştırma senaryolarının merkezinde yer alır.

Azure Cosmos DB ile etkileşim kurarken aşağıdaki özellikler desteklenir:

  • Synapse Apache Spark, azure cosmos DB kapsayıcılarınızda bulunan ve Azure Synapse Bağlantısı ile etkinleştirilen verileri işlem iş yüklerinizin performansını etkilemeden neredeyse gerçek zamanlı olarak analiz etmenizi sağlar. Spark'tan Azure Cosmos DB analiz deposunu sorgulamak için aşağıdaki iki seçenek sağlanır:
    • Spark DataFrame'e yükleme
    • Spark tablosu oluşturma
  • Synapse Apache Spark, Azure Cosmos DB'ye veri almanızı da sağlar. Verilerin her zaman işlem deposu aracılığıyla Azure Cosmos DB kapsayıcılarına alındığını unutmayın. Synapse Link etkinleştirildiğinde yeni eklemeler, güncelleştirmeler ve silmeler analiz deposuna otomatik olarak eşitlenir.
  • Synapse Apache Spark ayrıca hem kaynak hem de havuz olarak Azure Cosmos DB ile Spark yapılandırılmış akışını destekler.

Aşağıdaki bölümlerde yukarıdaki özelliklerin söz diziminde size yol gösterir. Ayrıca Azure Synapse Analytics için Apache Spark ile Azure Cosmos DB'yi sorgulama hakkındaki Learn modülüne de göz atabilirsiniz. Azure Synapse Analytics çalışma alanında hareketler, kullanmaya başlamak için kullanıma hazır kolay bir deneyim sağlamak üzere tasarlanmıştır. Synapse çalışma alanının Veri sekmesinde bir Azure Cosmos DB kapsayıcısını sağ tıklattığınızda hareketler görünür. Hareketler sayesinde hızlı bir şekilde kod oluşturabilir ve ihtiyaçlarınıza göre düzenleyebilirsiniz. Hareketler ayrıca tek tıklamayla verileri keşfetmek için de idealdir.

Önemli

Analitik şemada veri yükleme işlemlerinde beklenmeyen davranışa yol açabilecek bazı kısıtlamaları bilmeniz gerekir. Örneğin, analitik şemada işlem şemasından yalnızca ilk 1000 özellik kullanılabilir, boşluklu özellikler kullanılamaz vb. Beklenmeyen sonuçlarla karşılaşıyorsanız daha fazla ayrıntı için analiz deposu şema kısıtlamalarını denetleyin.

Azure Cosmos DB analiz deposunu sorgulama

Azure Cosmos DB analiz deposunu sorgulama, Spark DataFrame'e yükleme ve Spark tablosu oluşturma gibi iki olası seçenek hakkında bilgi edinmeden önce, ihtiyaçlarınıza uygun seçeneği belirleyebilmeniz için deneyim farklarını keşfetmeye değer.

Deneyim farkı, Azure Cosmos DB kapsayıcısında temel alınan veri değişikliklerinin Spark'ta gerçekleştirilen analize otomatik olarak yansıtılıp yansıtılmayacağıdır. Bir Kapsayıcının analiz deposunda Spark DataFrame kaydedildiğinde veya Spark tablosu oluşturulduğunda, analiz deposundaki verilerin geçerli anlık görüntüsüyle ilgili meta veriler sonraki analizin etkili bir şekilde gönderilmelerini sağlamak için Spark'a getirilir. Spark gecikmeli bir değerlendirme ilkesi izlediğinden, Spark DataFrame'de bir eylem çağrılmadığı veya Spark tablosuna yönelik bir SparkSQL sorgusu yürütülmediği sürece, temel kapsayıcının analiz deposundan gerçek verilerin getirilmediğini unutmayın.

Spark DataFrame'e yükleme durumunda, getirilen meta veriler Spark oturumunun kullanım ömrü boyunca önbelleğe alınır ve dolasıyla DataFrame'de bundan sonra çağrılan eylemler DataFrame'i oluşturma sırasındaki analiz deposu anlık görüntüsü üzerinde gerçekleştirilir.

Öte yandan Spark tablosu oluşturma durumunda, analiz deposu durumunun meta verileri Spark'ta önbelleğe alınmaz ve Spark tablosunda yürütülen her SparkSQL sorgusu için yeniden yüklenir.

Bu nedenle, Spark analizinizin analiz deposunun sabit bir anlık görüntüsü üzerinde mi yoksa analiz deposunun en son anlık görüntüsü üzerinde mi gerçekleştirilmesini istediğinize bağlı olarak, sırasıyla Spark DataFrame'i yüklemeyi veya Spark tablosu oluşturmayı seçebilirsiniz.

Analiz sorgularınızda sık kullanılan filtreler varsa, daha iyi sorgu performansı için bu alanları temel alarak bölümleme seçeneğiniz vardır. Analiz deposunda bölümlemeye neden olmak için bir Azure Synapse Spark not defterinden bölümleme işini düzenli aralıklarla yürütebilirsiniz. Bu bölümlenmiş depo, Azure Synapse çalışma alanınıza bağlı ADLS 2. Nesil birincil depolama hesabına işaret eder. Daha fazla bilgi edinmek için özel bölümlemeye giriş ve özel bölümlememakalelerini yapılandırma konularına bakın.

Not

MongoDB hesapları için Azure Cosmos DB'yi sorgulamak için analiz deposundaki tam uygunluk şeması gösterimi ve kullanılacak genişletilmiş özellik adları hakkında daha fazla bilgi edinin.

Not

Aşağıdaki komutlarda yer alan tüm options komutların büyük/küçük harfe duyarlı olduğunu lütfen unutmayın. Örneğin, bir hata döndürürken gateway komutunu kullanmanız Gateway gerekir.

Spark DataFrame'e yükleme

Bu örnekte, Azure Cosmos DB analiz deposuna işaret eden bir Spark DataFrame oluşturacaksınız. Daha sonra DataFrame'de Spark eylemlerini çağırarak ek analiz gerçekleştirebilirsiniz. Bu işlem işlem depoyu etkilemez.

Python'da söz dizimi aşağıdaki gibi olabilir:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Scala'daki eşdeğer söz dizimi aşağıdaki gibidir:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Spark tablosu oluşturma

Bu örnekte, Azure Cosmos DB analiz deposunu gösteren bir Spark tablosu oluşturacaksınız. Daha sonra, SparkSQL sorgularını tabloya çağırarak ek analiz gerçekleştirebilirsiniz. Bu işlem işlem depoyu etkilemez veya veri taşımaya neden olmaz. Bu Spark tablosunu silmeye karar verirseniz, temel alınan Azure Cosmos DB kapsayıcısı ve ilgili analiz deposu etkilenmez.

Bu senaryo, Spark tablolarını üçüncü taraf araçlar aracılığıyla yeniden kullanmak ve çalışma zamanı için temel alınan verilere erişilebilirlik sağlamak için kullanışlıdır.

Spark tablosu oluşturmak için söz dizimi aşağıdaki gibidir:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Not

Temel alınan Azure Cosmos DB kapsayıcısının şemasının zaman içinde değiştiği senaryolarınız varsa; güncelleştirilmiş şemanın Spark tablosundaki sorgulara otomatik olarak yansıtılmasını istiyorsanız, Spark tablosu seçeneklerinde seçeneğini true olarak ayarlayarak spark.cosmos.autoSchemaMerge bunu elde edebilirsiniz.

Azure Cosmos DB kapsayıcısına Spark DataFrame yazma

Bu örnekte, Bir Azure Cosmos DB kapsayıcısına Spark DataFrame yazacaksınız. Bu işlem işlem iş yüklerinin performansını etkiler ve Azure Cosmos DB kapsayıcısı veya paylaşılan veritabanında sağlanan istek birimlerini kullanır.

Python'da söz dizimi aşağıdaki gibi olabilir:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

Scala'daki eşdeğer söz dizimi aşağıdaki gibidir:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Kapsayıcıdan akış DataFrame yükleme

Bu harekette, bir kapsayıcıdaki verileri bir veri çerçevesine yüklemek için Spark Stream özelliğini kullanacaksınız. Veriler, çalışma alanına bağladığınız birincil data lake hesabında (ve dosya sisteminde) depolanır.

Not

Synapse Apache Spark'ta dış kitaplıklara başvurmak istiyorsanız burada daha fazla bilgi edinebilirsiniz. Örneğin, Spark DataFrame'i MongoDB için Azure Cosmos DB kapsayıcısına almak istiyorsanız Spark için MongoDB bağlayıcısını kullanabilirsiniz.

Azure Cosmos DB kapsayıcısından akış DataFrame yükleme

Bu örnekte, Azure Cosmos DB'deki değişiklik akışı işlevini kullanarak Bir Azure Cosmos DB kapsayıcısından Spark akış Veri Çerçevesine veri yüklemek için Spark'ın yapılandırılmış akış özelliğini kullanacaksınız. Spark tarafından kullanılan denetim noktası verileri, çalışma alanına bağladığınız birincil data lake hesabında (ve dosya sisteminde) depolanır.

/localReadCheckpointFolder klasörü oluşturulmadıysa (aşağıdaki örnekte), otomatik olarak oluşturulur. Bu işlem işlem iş yüklerinin performansını etkiler ve Azure Cosmos DB kapsayıcısı veya paylaşılan veritabanında sağlanan İstek Birimlerini kullanır.

Python'da söz dizimi aşağıdaki gibi olabilir:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Scala'daki eşdeğer söz dizimi aşağıdaki gibidir:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Akış DataFrame'i Azure Cosmos DB kapsayıcısına yazma

Bu örnekte Azure Cosmos DB kapsayıcısına bir akış DataFrame yazacaksınız. Bu işlem işlem iş yüklerinin performansını etkiler ve Azure Cosmos DB kapsayıcısı veya paylaşılan veritabanında sağlanan İstek Birimlerini kullanır. /localWriteCheckpointFolder klasörü oluşturulmadıysa (aşağıdaki örnekte), otomatik olarak oluşturulur.

Python'da söz dizimi aşağıdaki gibi olabilir:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

Scala'daki eşdeğer söz dizimi aşağıdaki gibidir:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Sonraki adımlar