자동 로더

자동 로더는 클라우드 스토리지에 도착하는 새 데이터 파일을 증분하고 효율적으로 처리합니다.

자동 로더는 라는 구조적 스트리밍 원본을 cloudFiles 제공합니다. 클라우드 파일 스토리지의 입력 디렉터리 경로가 지정된 경우 원본은 도착하는 새 cloudFiles 파일을 자동으로 처리하고 해당 디렉터리에 있는 기존 파일도 처리하는 옵션을 제공합니다.

자동 로더를 사용하는 방법에 대한 자세한 내용은 다음을 참조하세요.

예제

다음 코드 예제에서는 자동 로더가 클라우드 스토리지에 도착하는 새 데이터 파일을 검색하는 방법을 보여 있습니다. Azure Databricks 클러스터에연결된 Notebook 내에서 예제 코드를 실행할 수 있습니다.

  1. 파일 업로드 디렉터리를 만듭니다. 예를 들면 다음과 같습니다.

    Python

    user_dir = '<my-name>@<my-organization.com>'
    upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    

    Scala

    val user_dir = "<my-name>@<my-organization.com>"
    val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    
  2. 다음 샘플 CSV 파일을 만든 다음 DBFS파일 브라우저 를 사용하여 파일 업로드 디렉터리에 업로드합니다.

    WA.csv:

    city,year,population
    Seattle metro,2019,3406000
    Seattle metro,2020,3433000
    

    OR.csv:

    city,year,population
    Portland metro,2019,2127000
    Portland metro,2020,2151000
    
  3. 다음 코드를 실행하여 자동 로더를 시작합니다.

    Python

    checkpoint_path = '/tmp/delta/population_data/_checkpoints'
    write_path = '/tmp/delta/population_data'
    
    # Set up the stream to begin reading incoming files from the
    # upload_path location.
    df = spark.readStream.format('cloudFiles') \
      .option('cloudFiles.format', 'csv') \
      .option('header', 'true') \
      .schema('city string, year int, population long') \
      .load(upload_path)
    
    # Start the stream.
    # Use the checkpoint_path location to keep a record of all files that
    # have already been uploaded to the upload_path location.
    # For those that have been uploaded since the last check,
    # write the newly-uploaded files' data to the write_path location.
    df.writeStream.format('delta') \
      .option('checkpointLocation', checkpoint_path) \
      .start(write_path)
    

    Scala

    val checkpoint_path = "/tmp/delta/population_data/_checkpoints"
    val write_path = "/tmp/delta/population_data"
    
    // Set up the stream to begin reading incoming files from the
    // upload_path location.
    val df = spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", "true")
      .schema("city string, year int, population long")
      .load(upload_path)
    
    // Start the stream.
    // Use the checkpoint_path location to keep a record of all files that
    // have already been uploaded to the upload_path location.
    // For those that have been uploaded since the last check,
    // write the newly-uploaded files' data to the write_path location.
    df.writeStream.format("delta")
      .option("checkpointLocation", checkpoint_path)
      .start(write_path)
    
  4. 3단계의 코드가 계속 실행 중인 상태에서 다음 코드를 실행하여 쓰기 디렉터리에서 데이터를 쿼리합니다.

    Python

    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    '''
    

    Scala

    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    */
    
  5. 3단계의 코드가 계속 실행 중인 상태에서 다음 추가 CSV 파일을 만든 다음 DBFS 파일 브라우저를사용하여 업로드 디렉터리에 업로드합니다.

    ID.csv:

    city,year,population
    Boise,2019,438000
    Boise,2020,447000
    

    MT.csv:

    city,year,population
    Helena,2019,81653
    Helena,2020,82590
    

    Misc.csv:

    city,year,population
    Seattle metro,2021,3461000
    Portland metro,2021,2174000
    Boise,2021,455000
    Helena,2021,81653
    
  6. 3단계의 코드가 계속 실행 중인 상태에서 다음 코드를 실행하여 자동 로더가 업로드 디렉터리에서 검색한 다음 쓰기 디렉터리에 쓴 파일의 새 데이터 외에도 쓰기 디렉터리에 있는 기존 데이터를 쿼리합니다.

    Python

    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    '''
    

    Scala

    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    */
    
  7. 정리하려면 3단계에서 실행 중인 코드를 취소한 다음 업로드, 검사점 및 쓰기 디렉터리를 삭제하는 다음 코드를 실행합니다.

    Python

    dbutils.fs.rm(write_path, True)
    dbutils.fs.rm(upload_path, True)
    

    Scala

    dbutils.fs.rm(write_path, true)
    dbutils.fs.rm(upload_path, true)