Share via


Usar o Ray no Azure Databricks

Com o Ray 2.3.0 e versões superiores, você pode criar clusters Ray e executar aplicativos Ray em clusters do Apache Spark com o Azure Databricks. Para obter informações sobre como começar a usar o aprendizado de máquina no Ray, incluindo tutoriais e exemplos, consulte a Documentação do Ray. Para obter mais informações sobre a integração do Ray e do Apache Spark, consulte a Documentação do API do Ray no Spark.

Requisitos

  • Databricks Runtime 12.2 LTS ML e superior.
  • O modo de acesso de cluster do Databricks Runtime precisa ser o modo “Atribuído” ou o “Nenhum isolamento compartilhado”.

Instalar o Ray

Use o comando a seguir para instalar o Ray. A extensão [default] é exigida pelo componente do painel do Ray.

%pip install ray[default]>=2.3.0

Criar um cluster Ray específico a um usuário em um cluster do Databricks

Para criar um cluster do Ray, use a API ray.util.spark.setup_ray_cluster.

Em qualquer notebook do Databricks anexado a um cluster do Databricks, você pode executar o seguinte comando:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

A API ray.util.spark.setup_ray_cluster cria um cluster do Ray no Spark. Internamente, ele cria um trabalho do Spark em segundo plano. Cada tarefa do Spark no trabalho cria um nó de trabalho do Ray e o nó principal do Ray é criado no driver. O argumento num_worker_nodes representa o número de nós de trabalho do Ray a serem criados. Para especificar o número de núcleos de CPU ou GPU atribuídos a cada nó de trabalho Ray, defina o argumento num_cpus_worker_node (valor padrão: 1) ou num_gpus_worker_node (valor padrão: 0).

Após a criação de um cluster do Ray, você poderá executar qualquer código do aplicativo do Ray diretamente no seu notebook. Clique em Abrir painel do cluster do Ray em uma nova guia para exibir o painel do Ray para o cluster.

Dica

Se você estiver usando um cluster de usuário único do Azure Databricks, poderá definir num_worker_nodes como ray.util.spark.MAX_NUM_WORKER_NODES para usar todos os recursos disponíveis para o cluster do Ray.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

Configure o argumento collect_log_to_path para especificar o caminho de destino onde você deseja coletar os logs do cluster Ray. A coleção de logs é executada depois que o cluster Ray é desligado. O Databricks recomenda que você defina um caminho começando com /dbfs/ para que os logs sejam preservados mesmo se você terminar o cluster Spark. Caso contrário, seus logs não poderão ser recuperados, pois o armazenamento local no cluster é excluído quando o cluster é desligado.

Observação

“Para que seu aplicativo Ray use automaticamente o cluster do Ray criado, chame ray.util.spark.setup_ray_cluster para definir a variável de ambiente RAY_ADDRESS para o endereço do cluster do Ray.” Você pode especificar um endereço de cluster alternativo usando o argumento address da API ray.init.

Executar um aplicativo Ray

Depois que o cluster Ray for criado, você poderá executar qualquer código do aplicativo Ray em um notebook do Azure Databricks.

Importante

O Databricks recomenda que você instale as bibliotecas necessárias para seu aplicativo com %pip install <your-library-dependency> para garantir que elas estejam disponíveis para seu cluster e aplicativo Ray adequadamente. Especificar dependências na chamada de função de inicialização Ray instala as dependências em um local inacessível para os nós de trabalho do Spark, o que resulta em incompatibilidades de versão e erros de importação.

Por exemplo, você pode executar um aplicativo Ray simples em um notebook do Azure Databricks da seguinte maneira:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Criar um cluster do Ray no modo de dimensionamento automático

No Ray 2.8.0 e superior, os clusters do Ray iniciados no Databricks dão suporte à integração com o dimensionamento automático do Databricks. Confira Dimensionamento automático do cluster do Databricks.

Com o Ray 2.8.0 e superior, você pode criar um cluster do Ray em um cluster do Databricks que dá suporte a escalonamento ou redução vertical, de acordo com as cargas de trabalho. Essa integração do dimensionamento automático dispara o dimensionamento automático do cluster do Databricks internamente, dentro do ambiente do Databricks.

Para habilitar o dimensionamento automático, execute o seguinte comando:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

Se o dimensionamento automático estiver habilitado, num_worker_nodes indicará o número máximo de nós de trabalho do Ray. O número mínimo padrão de nós do trabalhador Ray é 0. Essa configuração padrão significa que, quando o cluster Ray está ocioso, ele reduz verticalmente para zero nós de trabalho do Ray. Isso pode não ser ideal para capacidade de resposta rápida em todos os cenários, mas quando habilitado, pode reduzir consideravelmente os custos.

No modo de dimensionamento automático, num_worker_nodes não pode ser definido como ray.util.spark.MAX_NUM_WORKER_NODES.

Os seguintes argumentos configuram a velocidade de escalonamento e redução verticais:

  • autoscale_upscaling_speed representa o número de nós que podem estar pendentes como um múltiplo do número atual de nós. Quanto maior o valor, mais agressivo o escalonamento vertical. Por exemplo, se isso for definido como 1,0, o cluster poderá aumentar de tamanho em no máximo 100% em qualquer determinado momento.
  • autoscale_idle_timeout_minutes representa o número de minutos que precisam transcorrer antes que um nó de trabalho ocioso seja removido pelo dimensionador automático. Quanto maior o valor, mais agressiva a redução vertical.

Com o Ray 2.9.0 e superior, você também pode definir autoscale_min_worker_nodes para impedir que o cluster do Ray reduza verticalmente para zero trabalhos quando o cluster do Ray estiver ocioso.

Conectar-se ao cluster do Ray remoto usando o cliente Ray

No Ray 2.9.3, crie um cluster Ray chamando a API setup_ray_cluster. No mesmo notebook, chame a API ray.init() para se conectar a esse cluster Ray.

Para um cluster Ray que não está no modo global, obtenha a cadeia de conexão remota com o seguinte código:

Para obter a cadeia de conexão remota, use o seguinte:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Conecte-se ao cluster remoto usando essa cadeia de conexão remota:

import ray
ray.init(remote_conn_str)

O cliente Ray não dá suporte à API do conjunto de dados do Ray definida no módulo ray.data. Como solução alternativa, você pode encapsular seu código que chama a API do conjunto de dados do Ray dentro de uma tarefa Ray remota, conforme mostrado no seguinte código:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Carregar dados de um DataFrame do Spark

Para carregar um Spark DataFrame como um Ray Dataset, primeiro você deve salvar o Spark DataFrame em volumes UC ou Databricks Filesystem (obsoleto) como formato Parquet. Para controlar o acesso ao Sistema de Arquivos do Databricks com segurança, o Databricks recomenda que você monte o armazenamento de objetos de nuvem no DBFS. Em seguida, você pode criar uma instância ray.data.Dataset do caminho do DataFrame do Spark salvo usando o seguinte método auxiliar:

import ray
import os
from urllib.parse import urlparse

def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Carregar dados de uma tabela do Catálogo do Unity por meio do SQL warehouse do Databricks

Para o Ray 2.8.0 e superior, você pode chamar a API ray.data.read_databricks_tables para carregar dados de uma tabela do Catálogo do Unity do Databricks.

Primeiro, você precisa definir a variável de ambiente DATABRICKS_TOKEN para o token de acesso do warehouse do Databricks. Se você não estiver executando seu programa no Databricks Runtime, defina a variável de ambiente DATABRICKS_HOST para a URL do espaço de trabalho do Databricks, conforme mostrado a seguir:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Em seguida, chame ray.data.read_databricks_tables() para ler do SQL warehouse do Databricks.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Configurar recursos usados pelo nó de cabeçalho do Ray

Por padrão, para a configuração do Ray no Spark, o Databricks restringe os recursos alocados no nó de cabeçalho do Ray para:

  • 0 núcleos de CPU
  • 0 GPUs
  • 128 MB de memória de heap
  • 128 MB de memória do repositório de objetos

Isso ocorre porque o nó principal do Ray geralmente é usado para coordenação global, não para executar tarefas do Ray. Os recursos do nó de driver do Spark são compartilhados com vários usuários, portanto, a configuração padrão salva recursos no lado do driver spark.

Com o Ray 2.8.0 e superior, você pode configurar os recursos usados pelo nó de cabeçalho do Ray. Adicione os seguintes argumentos à API setup_ray_cluster:

  • num_cpus_head_node: definindo núcleos de CPU usados pelo nó de cabeçalho do Ray
  • num_gpus_head_node: definindo a GPU usada pelo nó de cabeçalho do Ray
  • object_store_memory_head_node: definindo o tamanho da memória do repositório de objetos pelo nó de cabeçalho do Ray

Suporte para clusters heterogêneos

Para execuções de treinamento mais eficientes e econômicas, você pode criar um cluster do Ray no Spark e definir configurações diferentes entre o nó principal do Ray e os nós de trabalho do Ray. No entanto, todos os nós de trabalho do Ray precisam ter a mesma configuração. Os clusters do Databricks não dão suporte total a clusters heterogêneos, mas você pode criar um cluster do Databricks com diferentes tipos de instância de trabalho e de driver definindo uma política de cluster.

Por exemplo:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Ajustar a configuração do cluster Ray

A configuração recomendada para cada nó de trabalho do Ray é:

  • Mínimo de 4 núcleos de CPU por nó de trabalho do Ray.
  • Memória mínima de 10 GB de heap para cada nó de trabalho do Ray.

Ao chamar ray.util.spark.setup_ray_cluster, o Databricks recomenda definir num_cpus_worker_node como um valor >= 4.

Consulte Alocação de memória para nós de trabalho do Ray para obter detalhes sobre como ajustar a memória do heap para cada nó de trabalho do Ray.

Alocação de memória para nós de trabalho do Ray

Cada nó de trabalho do Ray usa dois tipos de memória: memória de heap e memória do repositório de objetos. O tamanho da memória alocada para cada tipo é determinado conforme descrito abaixo.

A memória total alocada para cada nó de trabalho do Ray é:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES é o número máximo de nós de trabalho do Ray que podem ser iniciados no nó de trabalho do Spark. Isso é determinado pelo argumento num_cpus_worker_node ou num_gpus_worker_node.

Se você não definir o argumento object_store_memory_per_node, o tamanho da memória do heap e o tamanho da memória do repositório de objetos alocados para cada nó de trabalho do Ray serão:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

Se você definir o argumento object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

Além disso, o tamanho da memória do repositório de objetos por nó de trabalho do Ray é limitado pela memória compartilhada do sistema operacional. O valor máximo é:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY é o tamanho do disco /dev/shm configurado para o nó de trabalho do Spark.

Práticas recomendadas

Como definir o número de CPU / GPU para cada nó de trabalho Ray?

A Databricks recomenda definir num_cpus_worker_node como o número de núcleos de CPU por nó de trabalho do Spark e definir num_gpus_worker_node como o número de GPUs por nó de trabalho do Spark. Nessa configuração, cada nó de trabalho Spark inicia um nó de trabalho Ray que utiliza totalmente os recursos do nó de trabalho Spark.

Configuração do cluster GPU

O cluster do Ray é executado sobre um cluster do Spark do Databricks. Um cenário comum é usar um trabalho Spark e Spark UDF para realizar tarefas simples de pré-processamento de dados que não precisam de recursos de GPU e, em seguida, usar Ray para executar tarefas complicadas de aprendizado de máquina que se beneficiam de GPUs. Nesse caso, o Databricks recomenda definir o parâmetro spark.task.resource.gpu.amount de configuração de nível de cluster do Spark como 0, para que nenhuma das transformações do DataFrame do Spark e nenhuma das execuções do UDF do Spark usem recursos de GPU.

Os benefícios dessa configuração são os seguintes:

  • Ela aumenta o paralelismo de trabalho do Spark, pois o tipo de instância de GPU geralmente tem muito mais núcleos de CPU do que dispositivos GPU.
  • Se o cluster do Spark for compartilhado com vários usuários, essa configuração impedirá que trabalhos do Spark concorram por recursos de GPU com cargas de trabalho do Ray em execução simultânea.

Desabilite a integração transformers trainer mlflow se estiver usando-o em tarefas Ray

A integração transformers trainer MLflow está ativada por padrão. Se você usar o Ray train para treiná-lo, a tarefa Ray falhará porque a credencial de serviço Databricks MLflow não está configurada para tarefas Ray.

Para evitar esse problema, defina a variável de ambiente DISABLE_MLFLOW_INTEGRATION como “TRUE” na configuração do cluster do databricks. Para obter informações sobre como fazer login no MLflow em suas tarefas do Ray Trainer, consulte a seção “Usando o MLflow em tarefas do Ray” para obter detalhes.

Erro de decapagem da função remota do Address Ray

Para executar tarefas Ray, Ray usa pickle para serializar a função da tarefa. Se a decapagem falhar, determine as linhas do seu código onde a falha ocorre. Frequentemente, mover comandos import para a função de tarefa resolve erros comuns de decapagem. Por exemplo, datasets.load_dataset é uma função amplamente usada que foi corrigida no Databricks Runtime, potencialmente tornando uma importação externa impossível de ser selecionada. Para corrigir esse problema, você pode atualizar seu código assim:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Desabilite o monitor de memória Ray se a tarefa Ray for interrompida inesperadamente com erro OOM

No Ray 2.9.3, o monitor de memória Ray tem problemas conhecidos que fazem com que as tarefas do Ray sejam encerradas erroneamente.

Para resolver o problema, desabilite o monitor de memória Ray definindo a variável de ambiente RAY_memory_monitor_refresh_ms como 0 na configuração do cluster do Databricks.

Configuração de recursos de memória para cargas de trabalho híbridas Spark e Ray

Se executar cargas de trabalho híbridas Spark e Ray num cluster Databricks, a Databricks recomenda que reduza a memória do executor Spark para um valor pequeno, como a definição spark.executor.memory 4g na configuração do cluster Databricks. Isso se deve ao fato de o executor Spark ser executado em um processo Java que aciona a coleta de lixo (GC) preguiçosamente. A pressão de memória para o cache do conjunto de dados Spark é bastante alta, causando uma redução na memória disponível que Ray pode usar. Para evitar potenciais erros de OOM, a Databricks recomenda que reduza o valor configurado “spark.executor.memory” para um valor inferior ao padrão.

Configuração de recursos de computação para cargas de trabalho híbridas Spark e Ray

Se você executar cargas de trabalho híbridas Spark e Ray em um cluster Databricks, defina os nós do cluster Spark como autoescaláveis, os nós de trabalho Ray como autoescaláveis ou ambos com escalonamento automático habilitado.

Por exemplo, se você tiver um número fixo de nós de trabalho em um cluster do Databricks, considere habilitar o escalonamento automático do Ray-on-Spark, para que quando não houver nenhuma carga de trabalho do Ray em execução, o cluster do Ray seja reduzido. Como resultado, os recursos do cluster ocioso são liberados para que o trabalho do Spark possa usá-los.

Quando o trabalho do Spark é concluído e o trabalho do Ray é iniciado, ele aciona o cluster Ray-on-Spark para aumentar a escala para atender às demandas de processamento.

Você também pode tornar o cluster Databricks e o cluster Ray-on-spark autoescaláveis. Especificamente, você pode configurar os nós autoescaláveis do cluster Databricks para um máximo de 10 nós e os nós de trabalho Ray-on-Spark para um máximo de 4 nós (com um nó de trabalho Ray por trabalhador spark), deixando o Spark livre para alocar para 6 nós para tarefas Spark. Isso significa que as cargas de trabalho Ray podem usar no máximo 4 nós de recursos ao mesmo tempo, enquanto o trabalho Spark pode alocar no máximo 6 nós em recursos.

Aplicando função de transformação a lotes de dados

Ao processar dados em lotes, a Databricks recomenda que você use a API Ray Data com a função map_batches. Essa abordagem pode ser mais eficiente e escalável, especialmente para grandes conjuntos de dados ou ao realizar cálculos complexos que beneficiam do processamento em lote. Qualquer Spark DataFrame pode ser convertido em dados Ray usando a API ray.data.from_spark e pode ser gravado na tabela UC de databricks usando a API ray.data.write_databricks_table.

Usando MLflow em tarefas Ray

Para usar o MLflow em tarefas Ray, configure o seguinte:

  • Credenciais do Databricks MLflow em tarefas Ray
  • O MLflow é executado no lado do driver Spark que passa os valores run_id gerados para tarefas Ray.

O código a seguir é um exemplo:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Usando bibliotecas python com escopo de notebook ou bibliotecas python de cluster em tarefas Ray

Atualmente, Ray tem um problema conhecido de que as tarefas Ray não podem usar bibliotecas Python com escopo de notebook ou bibliotecas Python de cluster. Para resolver essa limitação, execute o seguinte comando em seu notebook antes de iniciar um cluster Ray-on-Spark:

%pip install ray==<The Ray version you want to use> --force-reinstall

e execute o seguinte comando em seu notebook para reiniciar o kernel python:

dbutils.library.restartPython()

Habilitar rastreamentos de pilha e gráficos de chama na página Atores do Painel do Ray

Na página Atores do Painel do Ray, você pode exibir rastreamentos de pilha e gráficos de chama para atores do Ray ativos.

Para visualizar essas informações, instale py-spy antes de iniciar o cluster Ray:

%pip install py-spy

Desligar um cluster Ray

Para encerrar um cluster Ray em execução no Azure Databricks, chame a API ray.utils.spark.shutdown_ray_cluster.

Observação

Os clusters Ray também são desligados quando:

  • Desanexe seu notebook interativo do cluster do Azure Databricks.
  • Seu trabalho do Azure Databricks é concluído.
  • O cluster do Azure Databricks é reiniciado ou encerrado.
  • Não há nenhuma atividade para o tempo ocioso especificado.

Caderno de exemplo

O notebook a seguir demonstra como criar um cluster Ray e executar um aplicativo Ray no Databricks.

Notebook inicial do Ray no Spark

Obter notebook

Limitações

  • Não há suporte para clusters compartilhados do Azure Databricks de vários usuários (modo de isolamento habilitado).
  • Ao usar %pip para instalar pacotes, o cluster Ray será desligado. Certifique-se de iniciar o Ray depois de concluir a instalação de todas as suas bibliotecas com %pip.
  • O uso de integrações que substituem a configuração de ray.util.spark.setup_ray_cluster pode fazer com que o cluster Ray se torne instável e possa falhar no contexto do Ray. Por exemplo, usar o pacote xgboost_ray e a configuração RayParams com um ator ou configuração cpus_per_actor acima da configuração do cluster Ray pode falhar silenciosamente no cluster Ray.