Udostępnij za pośrednictwem


Importowanie modułów języka Python z folderów git lub plików obszaru roboczego

Kod języka Python można przechowywać w folderach Git usługi Databricks lub w plikach obszaru roboczego, a następnie importować ten kod języka Python do potoków delta Live Tables. Aby uzyskać więcej informacji na temat pracy z modułami w folderach Git lub plikach obszarów roboczych, zobacz Praca z modułami python i R.

Uwaga

Nie można zaimportować kodu źródłowego z notesu przechowywanego w folderze Git usługi Databricks lub pliku obszaru roboczego. Zamiast tego dodaj notes bezpośrednio podczas tworzenia lub edytowania potoku. Zobacz Tworzenie potoku.

Importowanie modułu języka Python do potoku delta live tables

W poniższym przykładzie pokazano importowanie zapytań zestawu danych jako modułów języka Python z plików obszaru roboczego. Mimo że w tym przykładzie opisano używanie plików obszaru roboczego do przechowywania kodu źródłowego potoku, można go użyć z kodem źródłowym przechowywanym w folderze Git.

Aby uruchomić ten przykład, wykonaj następujące kroki:

  1. Kliknij pozycję Ikona obszarów roboczychObszar roboczy na pasku bocznym obszaru roboczego usługi Azure Databricks, aby otworzyć przeglądarkę obszarów roboczych.

  2. Użyj przeglądarki obszaru roboczego, aby wybrać katalog dla modułów języka Python.

  3. Kliknij prawym przyciskiem myszy Menu Kebab kolumnę wybranego katalogu, a następnie kliknij pozycję Utwórz > plik.

  4. Wprowadź nazwę pliku, na przykład clickstream_raw_module.py. Zostanie otwarty edytor plików. Aby utworzyć moduł odczytu danych źródłowych do tabeli, wprowadź następujące polecenie w oknie edytora:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. Aby utworzyć moduł, który tworzy nową tabelę zawierającą przygotowane dane, utwórz nowy plik w tym samym katalogu, wprowadź nazwę pliku, na przykład clickstream_prepared_module.py, i wprowadź następujące polecenie w nowym oknie edytora:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. Następnie utwórz notes potoku. Przejdź do strony docelowej usługi Azure Databricks i wybierz pozycję Utwórz notes lub kliknij pozycję Nowy na pasku bocznym i wybierz pozycję Nowa ikonaNotes. Notes można również utworzyć w przeglądarce obszaru roboczego, klikając Menu Kebab i klikając pozycję Utwórz > notes.

  7. Nadaj notesowi nazwę i potwierdź, że język Python jest językiem domyślnym.

  8. Kliknij pozycję Utwórz.

  9. Wprowadź przykładowy kod w notesie.

    Uwaga

    Jeśli notes importuje moduły lub pakiety ze ścieżki plików obszaru roboczego lub ścieżki folderów Usługi Git innej niż katalog notesu, należy ręcznie dołączyć ścieżkę do plików przy użyciu polecenia sys.path.append().

    Jeśli importujesz plik z folderu Git, musisz wstępnie utworzyć ścieżkę /Workspace/ . Na przykład sys.path.append('/Workspace/...'). Pominięcie /Workspace/ ścieżki powoduje wystąpienie błędu.

    Jeśli moduły lub pakiety są przechowywane w tym samym katalogu co notes, nie trzeba ręcznie dołączać ścieżki. Nie trzeba również ręcznie dołączać ścieżki podczas importowania z katalogu głównego folderu Git, ponieważ katalog główny jest automatycznie dołączany do ścieżki.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        dlt.read("clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Zastąp <module-path> ciąg ścieżką do katalogu zawierającego moduły języka Python do zaimportowania.

  10. Utwórz potok przy użyciu nowego notesu.

  11. Aby uruchomić potok, na stronie Szczegóły potoku kliknij przycisk Start.

Możesz również zaimportować kod języka Python jako pakiet. Poniższy fragment kodu z notesu delta Live Tables importuje test_utils pakiet z dlt_packages katalogu w tym samym katalogu co notes. Katalog dlt_packages zawiera pliki test_utils.py i __init__.py, i test_utils.py definiuje funkcję create_test_table():

import dlt

@dlt.table
def my_table():
  return dlt.read(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)