Share via


Usar o Ray no Azure Databricks

Com o Ray 2.3.0 e superior, você pode criar clusters Ray e executar aplicativos Ray em clusters Apache Spark com o Azure Databricks. Para obter informações sobre como começar a usar o aprendizado de máquina no Ray, incluindo tutoriais e exemplos, consulte a documentação do Ray. Para obter mais informações sobre a integração do Ray e do Apache Spark, consulte a documentação da API do Ray on Spark.

Requisitos

  • Databricks Runtime 12.2 LTS ML e superior.
  • O modo de acesso ao cluster do Databricks Runtime deve ser o modo "Atribuído" ou o modo "Sem isolamento compartilhado".

Instalar o Ray

Use o seguinte comando para instalar o Ray. A [default] extensão é exigida pelo componente do painel Ray.

%pip install ray[default]>=2.3.0

Criar um cluster Ray específico do usuário em um cluster Databricks

Para criar um cluster Ray, use a API ray.util.spark.setup_ray_cluster .

Em qualquer bloco de anotações Databricks anexado a um cluster Databricks, você pode executar o seguinte comando:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

A ray.util.spark.setup_ray_cluster API cria um cluster Ray no Spark. Internamente, ele cria um trabalho em segundo plano do Spark. Cada tarefa Spark no trabalho cria um nó de trabalhador Ray e o nó principal Ray é criado no driver. O argumento num_worker_nodes representa o número de nós de trabalho Ray a serem criados. Para especificar o número de núcleos de CPU ou GPU atribuídos a cada nó de trabalho do Ray, defina o argumento num_cpus_worker_node (valor padrão: 1) ou num_gpus_worker_node (valor padrão: 0).

Depois que um cluster Ray é criado, você pode executar qualquer código de aplicativo Ray diretamente em seu notebook. Clique em Abrir painel do Ray Cluster em uma nova guia para exibir o painel do Ray para o cluster.

Gorjeta

Se você estiver usando um cluster de usuário único do Azure Databricks, poderá definir num_worker_nodes para ray.util.spark.MAX_NUM_WORKER_NODES usar todos os recursos disponíveis para seu cluster Ray.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

Defina o argumento collect_log_to_path para especificar o caminho de destino onde você deseja coletar os logs de cluster do Ray. A coleta de logs é executada depois que o cluster Ray é desligado. O Databricks recomenda que você defina um caminho começando com /dbfs/ para que os logs sejam preservados mesmo se você encerrar o cluster do Spark. Caso contrário, seus logs não serão recuperáveis, pois o armazenamento local no cluster será excluído quando o cluster for desligado.

Nota

"Para que seu aplicativo Ray use automaticamente o cluster Ray que foi criado, ligue ray.util.spark.setup_ray_cluster para definir a RAY_ADDRESS variável de ambiente para o endereço do cluster Ray." Você pode especificar um endereço de cluster alternativo usando o address argumento da API ray.init .

Executar um aplicativo Ray

Depois que o cluster Ray tiver sido criado, você poderá executar qualquer código de aplicativo Ray em um bloco de anotações do Azure Databricks.

Importante

O Databricks recomenda que você instale todas as bibliotecas necessárias para seu aplicativo para %pip install <your-library-dependency> garantir que elas estejam disponíveis para seu cluster Ray e aplicativo de acordo. A especificação de dependências na chamada da função Ray init instala as dependências em um local inacessível aos nós de trabalho do Spark, o que resulta em incompatibilidades de versão e erros de importação.

Por exemplo, você pode executar um aplicativo Ray simples em um bloco de anotações do Azure Databricks da seguinte maneira:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Criar um cluster Ray no modo de dimensionamento automático

No Ray 2.8.0 e superior, os clusters Ray começaram no Databricks suportando a integração com o dimensionamento automático do Databricks. Consulte Dimensionamento automático do cluster Databricks.

Com o Ray 2.8.0 e superior, você pode criar um cluster Ray em um cluster Databricks que suporta escalonamento para cima ou para baixo de acordo com cargas de trabalho. Essa integração de dimensionamento automático aciona o dimensionamento automático do cluster Databricks internamente dentro do ambiente Databricks.

Para habilitar o dimensionamento automático, execute o seguinte comando:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

Se o dimensionamento automático estiver habilitado, num_worker_nodes indica o número máximo de nós de trabalho do Ray. O número mínimo padrão de nós de trabalho do Ray é 0. Essa configuração padrão significa que, quando o cluster Ray está ocioso, ele é reduzido para zero nós de trabalho Ray. Isso pode não ser ideal para uma resposta rápida em todos os cenários, mas quando habilitado, pode reduzir muito os custos.

No modo de dimensionamento automático, num_worker_nodes não pode ser definido como ray.util.spark.MAX_NUM_WORKER_NODES.

Os argumentos a seguir configuram a velocidade de upscaling e downscaling:

  • autoscale_upscaling_speed representa o número de nós que podem estar pendentes como um múltiplo do número atual de nós. Quanto maior o valor, mais agressivo é o upscaling. Por exemplo, se isso for definido como 1.0, o cluster pode crescer em tamanho no máximo 100% a qualquer momento.
  • autoscale_idle_timeout_minutes Representa o número de minutos que precisam passar antes que um nó de trabalho ocioso seja removido pelo AutoScaler. Quanto menor o valor, mais agressivo é o downscaling.

Com o Ray 2.9.0 e superior, você também pode configurar autoscale_min_worker_nodes para evitar que o cluster Ray seja reduzido para zero trabalhadores quando o cluster Ray estiver ocioso.

Conecte-se ao cluster Ray remoto usando o cliente Ray

No Ray 2.9.3, crie um cluster Ray chamando a setup_ray_cluster API. No mesmo bloco de anotações, chame a ray.init() API para se conectar a esse cluster Ray.

Para um cluster Ray que não está no modo global, obtenha a cadeia de conexão remota com o seguinte código:

Para obter a cadeia de conexão remota usando o seguinte:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Conecte-se ao cluster remoto usando esta cadeia de conexão remota:

import ray
ray.init(remote_conn_str)

O cliente Ray não suporta a API do conjunto de dados Ray definida no ray.data módulo. Como solução alternativa, você pode encapsular seu código que chama a API do conjunto de dados Ray dentro de uma tarefa remota do Ray, conforme mostrado no código a seguir:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Carregar dados de um DataFrame do Spark

Para carregar um Spark DataFrame como um Ray Dataset, primeiro, você deve salvar o Spark DataFrame em volumes UC ou Databricks Filesystem (preterido) como formato Parquet. Para controlar o acesso ao sistema de arquivos Databricks com segurança, o Databricks recomenda que você monte o armazenamento de objetos em nuvem no DBFS. Em seguida, você pode criar uma ray.data.Dataset instância a partir do caminho Spark DataFrame salvo usando o seguinte método auxiliar:

import ray
import os
from urllib.parse import urlparse

def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Carregar dados de uma tabela do Unity Catalog através do armazém SQL do Databricks

Para o Ray 2.8.0 e superior, você pode chamar a ray.data.read_databricks_tables API para carregar dados de uma tabela do Databricks Unity Catalog.

Primeiro, você precisa definir a variável de ambiente para seu token de acesso ao DATABRICKS_TOKEN armazém do Databricks. Se você não estiver executando seu programa no Databricks Runtime, defina a DATABRICKS_HOST variável de ambiente para a URL do espaço de trabalho Databricks, conforme mostrado a seguir:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Em seguida, chame ray.data.read_databricks_tables() para ler a partir do armazém SQL Databricks.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Configurar recursos usados pelo nó principal Ray

Por padrão, para a configuração do Ray on Spark, o Databricks restringe os recursos alocados ao nó principal do Ray para:

  • 0 núcleos de CPU
  • 0 GPUs
  • 128 MB de memória de pilha
  • 128 MB de memória de armazenamento de objetos

Isso ocorre porque o nó principal Ray é geralmente usado para coordenação global, não para executar tarefas Ray. Os recursos do nó do driver do Spark são compartilhados com vários usuários, portanto, a configuração padrão salva recursos no lado do driver do Spark.

Com o Ray 2.8.0 e superior, você pode configurar os recursos usados pelo nó principal do Ray. Use os seguintes argumentos na setup_ray_cluster API:

  • num_cpus_head_node: configuração de núcleos de CPU usados pelo nó principal Ray
  • num_gpus_head_node: GPU de configuração usada pelo nó principal Ray
  • object_store_memory_head_node: definindo o tamanho da memória de armazenamento de objetos pelo nó principal do Ray

Apoio a agrupamentos heterogéneos

Para execuções de treinamento mais eficientes e econômicas, você pode criar um cluster Ray on Spark e definir configurações diferentes entre o nó principal Ray e os nós de trabalho Ray. No entanto, todos os nós de trabalho do Ray devem ter a mesma configuração. Os clusters Databricks não suportam totalmente clusters heterogêneos, mas você pode criar um cluster Databricks com diferentes tipos de instância de driver e trabalho definindo uma política de cluster.

Por exemplo:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Ajuste a configuração do cluster Ray

A configuração recomendada para cada nó de trabalho Ray é:

  • Mínimo de 4 núcleos de CPU por nó de trabalho Ray.
  • Memória de heap mínima de 10 GB para cada nó de trabalho Ray.

Ao chamar ray.util.spark.setup_ray_cluster, o Databricks recomenda definir num_cpus_worker_node como um valor >= 4.

Consulte Alocação de memória para nós de trabalho Ray para obter detalhes sobre como ajustar a memória de heap para cada nó de trabalhador Ray.

Alocação de memória para nós de trabalho Ray

Cada nó de trabalho Ray usa dois tipos de memória: memória de pilha e memória de armazenamento de objetos. O tamanho da memória alocada para cada tipo é determinado conforme descrito abaixo.

A memória total alocada para cada nó de trabalho Ray é:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES é o número máximo de nós de trabalho do Ray que podem ser iniciados no nó de trabalho do Spark. Isto é determinado pelo argumento num_cpus_worker_node ou num_gpus_worker_node.

Se você não definir o argumento object_store_memory_per_node, o tamanho da memória de pilha e o tamanho da memória de armazenamento de objeto alocados para cada nó de trabalho do Ray são:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

Se você definir o argumento object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

Além disso, o tamanho da memória de armazenamento de objetos por nó de trabalho Ray é limitado pela memória compartilhada do sistema operacional. O valor máximo é:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY é o tamanho do /dev/shm disco configurado para o nó de trabalho do Spark.

Melhores práticas

Como definir o número da CPU / GPU para cada nó de trabalho Ray?

O Databricks recomenda definir num_cpus_worker_node o número de núcleos de CPU por nó de trabalho do Spark e num_gpus_worker_node definir o número de GPUs por nó de trabalho do Spark. Nessa configuração, cada nó de trabalho do Spark inicia um nó de trabalhador do Ray que utiliza totalmente os recursos do nó de trabalhador do Spark.

Configuração de cluster GPU

O cluster Ray é executado sobre um cluster Databricks Spark. Um cenário comum é usar um trabalho do Spark e o Spark UDF para fazer tarefas simples de pré-processamento de dados que não precisam de recursos da GPU e, em seguida, usar o Ray para executar tarefas complicadas de aprendizado de máquina que se beneficiam das GPUs. Nesse caso, o Databricks recomenda definir o parâmetro spark.task.resource.gpu.amount de configuração de nível de cluster do Spark como , para 0que todas as transformações do Spark DataFrame e execuções do Spark UDF não usem recursos da GPU.

Os benefícios dessa configuração são os seguintes:

  • Isso aumenta o paralelismo de trabalho do Spark, porque o tipo de instância de GPU geralmente tem muito mais núcleos de CPU do que dispositivos de GPU.
  • Se o cluster do Spark for compartilhado com vários usuários, essa configuração impedirá que os trabalhos do Spark compitam pelos recursos da GPU com cargas de trabalho Ray em execução simultânea.

Desative a transformers integração mlflow do treinador se usá-lo em tarefas Ray

A transformers integração do treinador MLflow está ativada por padrão. Se você usar o trem Ray para treiná-lo, a tarefa Ray falhará porque a credencial de serviço Databricks MLflow não está configurada para tarefas Ray.

Para evitar esse problema, defina a DISABLE_MLFLOW_INTEGRATION variável de ambiente como 'TRUE' na configuração do cluster databricks. Para obter informações sobre como fazer login no MLflow em suas tarefas do Ray trainer, consulte a seção "Usando o MLflow em tarefas do Ray" para obter detalhes.

Endereço Ray função remota pickling erro

Para executar tarefas Ray, Ray usa pickle para serializar a função de tarefa. Se a decapagem falhar, determine a(s) linha(s) no código onde a falha ocorre. Muitas vezes, mover import comandos para a função de tarefa aborda erros comuns de decapagem. Por exemplo, datasets.load_dataset é uma função amplamente utilizada que passa a ser corrigida dentro do Databricks Runtime, potencialmente tornando uma importação externa inviável. Para corrigir esse problema, você pode atualizar seu código da seguinte forma:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Desative o monitor de memória Ray se a tarefa Ray for inesperadamente morta com erro OOM

No Ray 2.9.3, o monitor de memória Ray tem problemas conhecidos que fazem com que as tarefas Ray sejam erroneamente mortas.

Para resolver o problema, desative o monitor de memória Ray definindo a variável RAY_memory_monitor_refresh_ms de ambiente como 0 na configuração do cluster Databricks.

Configuração de recursos de memória para cargas de trabalho híbridas Spark e Ray

Se você executar cargas de trabalho híbridas do Spark e do Ray em um cluster Databricks, o Databricks recomenda que você reduza a memória do executor do Spark para um valor pequeno, como a configuração spark.executor.memory 4g na configuração do cluster Databricks. Isso se deve ao executor do Spark em execução dentro de um processo Java que dispara a coleta de lixo (GC) preguiçosamente. A pressão de memória para o cache do conjunto de dados Spark é bastante alta, causando uma redução na memória disponível que o Ray pode usar. Para evitar possíveis erros OOM, o Databricks recomenda que você reduza o valor configurado 'spark.executor.memory' para um valor menor do que o padrão.

Configuração de recursos computacionais para cargas de trabalho híbridas Spark e Ray

Se você executar cargas de trabalho híbridas do Spark e do Ray em um cluster Databricks, defina os nós do cluster do Spark como escalonáveis automaticamente, os nós de trabalho do Ray como escalonáveis automaticamente ou ambos com o dimensionamento automático habilitado.

Por exemplo, se você tiver um número fixo de nós de trabalho em um cluster Databricks, considere habilitar o dimensionamento automático do Ray-on-Spark, para que, quando não houver nenhuma carga de trabalho do Ray em execução, o cluster do Ray diminua. Como resultado, os recursos de cluster ociosos são liberados para que o trabalho do Spark possa usá-los.

Quando o trabalho do Spark é concluído e o trabalho do Ray é iniciado, ele aciona o cluster do Ray-on-Spark para aumentar a escala para atender às demandas de processamento.

Você também pode tornar o cluster Databricks e o cluster Ray-on-spark autoescaláveis. Especificamente, você pode configurar os nós autoescaláveis do cluster Databricks para um máximo de 10 nós e os nós de trabalho do Ray-on-Spark para um máximo de 4 nós (com um nó de trabalhador do Ray por trabalhador do Spark), deixando o Spark livre para alocar até 6 nós para tarefas do Spark. Isso significa que as cargas de trabalho do Ray podem usar no máximo 4 recursos de nós ao mesmo tempo, enquanto o trabalho do Spark pode alocar no máximo 6 nós no valor de recursos.

Aplicação da função de transformação a lotes de dados

Ao processar dados em lotes, o Databricks recomenda que você use a API Ray Data com map_batches função. Essa abordagem pode ser mais eficiente e escalável, especialmente para grandes conjuntos de dados ou ao executar cálculos complexos que se beneficiam do processamento em lote. Qualquer DataFrame do Spark pode ser convertido em dados Ray usando a ray.data.from_spark API e pode ser gravado na tabela UC databricks usando a API ray.data.write_databricks_table.

Usando MLflow em tarefas Ray

Para usar o MLflow em tarefas do Ray, configure o seguinte:

  • Credenciais do Databricks MLflow em tarefas Ray
  • O MLflow é executado no lado do driver Spark que passa os valores gerados run_id para as tarefas do Ray.

O código a seguir é um exemplo:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Usando bibliotecas python com escopo de notebook ou bibliotecas python cluster em tarefas Ray

Atualmente, o Ray tem um problema conhecido de que as tarefas do Ray não podem usar bibliotecas Python com escopo de notebook ou bibliotecas Python cluster. Para resolver essa limitação, execute o seguinte comando em seu bloco de anotações antes de iniciar um cluster Ray-on-Spark:

%pip install ray==<The Ray version you want to use> --force-reinstall

e, em seguida, execute o seguinte comando no seu bloco de anotações para reiniciar o kernel python:

dbutils.library.restartPython()

Habilite rastreamentos de pilha e gráficos de chama na página Ray Dashboard Actors

Na página Ray Dashboard Actors , você pode visualizar traços de pilha e gráficos de chama para atores Ray ativos.

Para visualizar essas informações, instale py-spy antes de iniciar o cluster Ray:

%pip install py-spy

Desligar um cluster Ray

Para desligar um cluster Ray em execução no Azure Databricks, chame a API ray.utils.spark.shutdown_ray_cluster .

Nota

Os clusters de raios também são desligados quando:

  • Você desanexa seu bloco de anotações interativo do cluster do Azure Databricks.
  • Seu trabalho do Azure Databricks é concluído.
  • Seu cluster do Azure Databricks é reiniciado ou encerrado.
  • Não há atividade para o tempo ocioso especificado.

Bloco de notas de exemplo

O bloco de anotações a seguir demonstra como criar um cluster Ray e executar um aplicativo Ray no Databricks.

Ray no notebook inicial Spark

Obter o bloco de notas

Limitações

  • Não há suporte para clusters compartilhados multiusuário do Azure Databricks (modo de isolamento habilitado).
  • Ao usar %pip para instalar pacotes, o cluster Ray será desligado. Certifique-se de iniciar o Ray depois de concluir a instalação de todas as suas bibliotecas com %pip.
  • Usar integrações que substituem a configuração de pode fazer com que o cluster Ray se torne instável e pode travar o contexto Ray ray.util.spark.setup_ray_cluster . Por exemplo, usar o pacote e a xgboost_ray configuração RayParams com um ator ou cpus_per_actor configuração que exceda a configuração do cluster Ray pode travar silenciosamente o cluster Ray.