Quickstart: Gerencie os dados com o Conector AZure Cosmos DB Spark 3 OLTP para SQL API

APLICA-SE A: SQL API

Este tutorial é um guia de início rápido para mostrar como usar cosmos DB Spark Connector para ler ou escrever para Cosmos DB. Cosmos DB Spark Connector é baseado em Spark 3.1.x.

Ao longo deste tutorial rápido, contamos com O Azure Databricks Runtime 8.0 com Spark 3.1.1 e um Jupyter Notebook para mostrar como usar o Cosmos DB Spark Connector.

Você também pode usar qualquer outra oferta de faísca Spark 3.1.1, também você deve ser capaz de usar qualquer idioma suportado por Spark (PySpark, Scala, Java, etc.), ou qualquer interface Spark que você esteja familiarizado (Jupyter Notebook, Livy, etc.).

Pré-requisitos

O SLF4J só é necessário se planeia utilizar o registo, também descarrega uma ligação SLF4J, que ligará a API SLF4J com a implementação de registo da sua escolha. Consulte o manual do utilizador SLF4J para obter mais informações.

Instale o Conector cosmos DB Spark, no seu cluster de faíscas azure-cosmos-spark_3-1_2-12-4.3.1.jar

O guia de arranque baseia-se no PySpark, no entanto, também pode utilizar a versão scala equivalente, e pode executar o seguinte código de corte num portátil PySpark Azure Databricks.

Criar bases de dados e contentores

Primeiro, deite as credenciais de conta da Cosmos DB e o nome da base de dados cosmos DB e o nome do contentor.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

Em seguida, pode utilizar o novo Catálogo API para criar uma Base de Dados e Contentor Cosmos DB através da Spark.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

Ao criar recipientes com a API do Catálogo, pode definir o caminho chave de produção e partição para o recipiente a ser criado.

Para mais informações, consulte a documentação completa da API do Catálogo.

Ingerir dados

O nome da fonte de dados é cosmos.oltp , e o exemplo a seguir mostra como pode escrever um dataframe de memória composto por dois itens para Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Note que id é um campo obrigatório para Cosmos DB.

Para obter mais informações relacionadas com a ingestão de dados, consulte a documentação completa de configuração de escrita.

Consultar dados

Utilizando a mesma cosmos.oltp fonte de dados, podemos consultar dados e usar filter para empurrar os filtros:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Para obter mais informações relacionadas com os dados de consulta, consulte a documentação completa de configuração de consulta.

Inferência do esquema

Ao consultar dados, o Conector de Faíscas pode inferir o esquema com base na amostragem de itens existentes, definindo spark.cosmos.read.inferSchema.enabled para true .

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()

Em alternativa, pode passar o esquema personalizado que pretende ser usado para ler os dados:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Se não for especificado nenhum esquema personalizado e se a inferência do esquema for desativada, os dados resultantes serão retornados ao conteúdo bruto do Json dos itens:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Para obter mais informações relacionadas com a inferência do esquema, consulte a documentação completa de configuração da inferência do esquema.

Referência de configuração

Configuração genérica

Nome da propriedade Config Predefinição Description
spark.cosmos.accountEndpoint Nenhum Cosmos DB Conta Endpoint Uri
spark.cosmos.accountKey Nenhuma Chave de conta Cosmos DB
spark.cosmos.database Nenhuma Nome da base de dados cosmos DB
spark.cosmos.container Nenhuma Nome do recipiente Cosmos DB

Afinação extra

Nome da propriedade Config Predefinição Description
spark.cosmos.useGatewayMode false Utilize o modo gateway para as operações do cliente
spark.cosmos.read.forceEventualConsistency true Faz o cliente usar eventual consistência para operações de leitura em vez de usar a consistência do nível de conta padrão
spark.cosmos.applicationName Nenhuma Nome da aplicação
spark.cosmos.preferredRegionsList Nenhuma Lista de regiões preferenciais para ser usada para uma conta cosmos DB de várias regiões. Trata-se de um valor separado por vírgula (por exemplo, [East US, West US] ou ) desde que as East US, West US regiões preferenciais sejam utilizadas como sugestão. Você deve usar um cluster de faíscas collocada com a sua conta Cosmos DB e passar a região do cluster de faíscas como região preferida. Consulte a lista das regiões de Azure aqui. Também pode usar spark.cosmos.preferredRegions como pseudónimo
spark.cosmos.diagnostics Nenhuma Pode ser usado para permitir diagnósticos mais verbosos. Atualmente, a única opção suportada é definir esta propriedade para simple - o que resultará em registos extras sendo emitidos como INFO registos nos registos do Driver e do Executor.

Escreva config

Nome da propriedade Config Predefinição Description
spark.cosmos.write.strategy ItemOverwrite Cosmos DB Itens escrevem Estratégia: ItemOverwrite (usando upsert), ItemAppend (usando criar, ignorar itens pré-existentes que são, Conflitos), ItemDelete (eliminar todos os documentos), ItemDeleteIfNotModified (excluir todos os documentos para os quais o etag não mudou)
spark.cosmos.write.maxRetryCount 10 Cosmos DB Escrever Tentativas de Retriptação de Max em falhas retripáveis (por exemplo, erro de ligação)
spark.cosmos.write.point.maxConcurrency Nenhuma Cosmos DB Item Escrever Max concurrency. Se não for especificado, será determinado com base no tamanho VM do executor de faíscas
spark.cosmos.write.bulk.maxPendingOperations Nenhuma Item Cosmos DB Escreva o modo máximo pendente de operações pendentes. Define um limite de operações a granel sendo processadas simultaneamente. Se não for especificado, será determinado com base no tamanho VM do executor de faíscas. Se o volume de dados for grande para a produção prevista no contentor de destino, esta definição pode ser ajustada seguindo a estimativa de 1000 x Cores
spark.cosmos.write.bulk.enabled true Artigo de Cosmitologia cosmos DB Ativado

Consulta config

Nome da propriedade Config Predefinição Description
spark.cosmos.read.customQuery Nenhum Quando for fornecida, a consulta personalizada será processada contra o ponto final do Cosmos em vez de gerar dinamicamente a consulta através de um impulso predicado para baixo. Normalmente é recomendado confiar no impulso predicado de Spark porque isso permitirá gerar o conjunto de filtros mais eficiente com base no plano de consulta. Mas há um par de predicados como agregados (contagem, grupo por, avg, soma etc.) que ainda não podem ser empurrados para baixo (pelo menos em Spark 3.1) - por isso a consulta personalizada é uma regressão para permitir que sejam empurrados para a consulta enviada para Cosmos. Se especificado, com inferência de esquema ativado, a consulta personalizada também será usada para inferir o esquema.
spark.cosmos.read.maxItemCount 1000 Substitui o número máximo de documentos que podem ser devolvidos para uma única consulta ou alterar o pedido de alimentação. O valor predefinido é 1000 - considere aumentar isto apenas para tamanhos médios de documento inferior a 1 KB ou quando a projeção reduz significativamente o número de propriedades selecionadas em consultas (como quando seleciona apenas "ID" de documentos, etc.).

Schema inferência config

Ao fazer operações de leitura, os utilizadores podem especificar um esquema personalizado ou permitir que o conector o infera. A inferência do esquema é ativada por defeito.

Nome da propriedade Config Predefinição Description
spark.cosmos.read.inferSchema.enabled true Quando a inferência do esquema for desativada e o utilizador não estiver a fornecer um esquema, o json cru será devolvido.
spark.cosmos.read.inferSchema.query SELECT * FROM r Quando a inferência do esquema estiver ativada, usada como consulta personalizada para inferir. Por exemplo, se armazenar várias entidades com esquemas diferentes dentro de um recipiente e quiser garantir que a inferência apenas olha para determinados tipos de documentos ou pretende projetar apenas colunas específicas.
spark.cosmos.read.inferSchema.samplingSize 1000 Tamanho de amostragem a utilizar ao inferir esquema e não utilizar uma consulta.
spark.cosmos.read.inferSchema.includeSystemProperties false Quando a inferência do esquema estiver ativada, se o esquema resultante incluirá todas as propriedades do sistema Cosmos DB.
spark.cosmos.read.inferSchema.includeTimestamp false Quando a inferência do esquema estiver ativada, se o esquema resultante incluirá o calendário do documento ( _ts ). Não é necessário se spark.cosmos.read.inferSchema.includeSystemProperties estiver ativado, uma vez que já incluirá todas as propriedades do sistema.
spark.cosmos.read.inferSchema.forceNullableProperties true Quando a inferência do esquema estiver ativada, se o esquema resultante tornará todas as colunas anuladas. Por padrão, todas as colunas (exceto as propriedades do sistema cosmos) serão tratadas como anuladas, mesmo que todas as linhas dentro do conjunto de amostras tenham valores não nulos. Quando desativadas, as colunas inferidas são tratadas como nulas ou não, dependendo se qualquer registo no conjunto de amostras tem valores nulos dentro de uma coluna.

Config de serialização

Usado para influenciar o comportamento de serialização/desserialização json

Nome da propriedade Config Predefinição Description
spark.cosmos.serialization.inclusionMode Always Determina se os valores nulos/predefinidos serão serializados para json ou se as propriedades com valor nulo/predefinido serão ignoradas. O comportamento segue as mesmas ideias que o JsonInclude de Jackson.Incluir. Always significa que as propriedades json são criadas mesmo para valores nulos e predefinidos. NonNull significa que não serão criadas propriedades json para valores nulos explícitos. NonEmpty significa que as propriedades json não serão criadas para valores de cordas vazios ou matrizes/mpas vazias. NonDefault significa que as propriedades json serão ignoradas não só para nulos/vazios, mas também quando o valor é idêntico ao valor padrão 0 para propriedades numéricas, por exemplo.

Alterar a configuração do feed (apenas para Spark-Streaming utilizando cosmos.oltp.changeFeed a origem de dados, que é apenas para leitura)

Nome da propriedade Config Predefinição Description
spark.cosmos.changeFeed.startFrom Beginning ChangeFeed Start a partir de configurações Now (, Beginning ou um determinado ponto no tempo (UTC) por exemplo 2020-02-10T14:15:03 ) - o valor padrão é Beginning . Se o config de escrita contiver um checkpointLocation e quaisquer pontos de verificação existentes, o fluxo é sempre continuado independente das spark.cosmos.changeFeed.startFrom definições - é necessário alterar checkpointLocation ou apagar pontos de verificação para reiniciar o fluxo se for essa a intenção.
spark.cosmos.changeFeed.mode Incremental Modo ChangeFeed Incremental (ou FullFidelity ) - NOTA: está em estado experimental neste FullFidelity momento. Requer que a subscrição/conta tenha sido ativada para a pré-visualização privada e que se saiba que as alterações de rutura ocorrerão FullFidelity (esquema dos documentos devolvidos). Recomenda-se a utilização FullFidelity apenas para cenários de não produção neste momento.
spark.cosmos.changeFeed.itemCountPerTriggerHint Nenhuma Número máximo aproximado de itens lidos a partir de alteração de alimentação para cada micro-lote/gatilho

Configuração de conversão Json

Nome da propriedade Config Predefinição Description
spark.cosmos.read.schemaConversionMode Relaxed O comportamento de conversão do esquema Relaxed (, Strict . . Ao ler documentos json, se um documento contiver um atributo que não mapear para o tipo de esquema, o utilizador pode decidir se deve usar um null valor (Relaxado) ou uma exceção (Estrita).

Estratégia de partição config

Nome da propriedade Config Predefinição Description
spark.cosmos.read.partitioning.strategy Default A estratégia de partição utilizada (Padrão, Personalizado, Restritivo ou Agressivo)
spark.cosmos.partitioning.targetedCount Nenhuma O conde de partição alvo. Este parâmetro é opcional e ignorado a menos que a estratégia==Costume seja usada. Neste caso, o Conector de Faíscas não calculará dinamicamente o número de divisórias, mas manter-se-ão com este valor.

Config de controlo de produção

Nome da propriedade Config Predefinição Description
spark.cosmos.throughputControl.enabled false Se o controlo de produção está ativado
spark.cosmos.throughputControl.name Nenhuma Nome do grupo de controlo de produção
spark.cosmos.throughputControl.targetThroughput Nenhuma Produção do alvo do grupo de controlo de produção
spark.cosmos.throughputControl.targetThroughputThreshold Nenhuma Limiar de produção do grupo de controlo de produção
spark.cosmos.throughputControl.globalControl.database Nenhuma Base de dados, que será usada para o controlo global de produção
spark.cosmos.throughputControl.globalControl.container Nenhuma Contentor, que será usado para o controlo global de produção
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s Quantas vezes o cliente vai atualizar o uso de produção de si mesmo
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s A rapidez com que um cliente offline será detetado

Passos seguintes