Share via


クエリのシャッフル

shuffle クエリは、 shuffle 戦略をサポートする一連の演算子で使用されるセマンティック保持変換です。 関連するデータに応じて、shuffle 戦略を使用してクエリを実行すると、パフォーマンスが向上する可能性があります。 キー (joinキー、キー、キー、またはpartitionキー) のカーディナリティが高く、summarize通常の演算子クエリがクエリの制限に達した場合shuffleは、make-seriesシャッフル クエリ戦略を使用することをお勧めします。

次の演算子をシャッフル コマンドと一緒に使用できます。

shuffle クエリ戦略を使用するには、 式 hint.strategy = shuffle または hint.shufflekey = <key> を追加します。 hint.strategy=shuffle を使用すると、すべてのキーによって演算子データがシャッフルされます。 複合キーが一意であるが、各キーが十分に一意でない場合は、この式を使用します。そのため、シャッフルされた演算子のすべてのキーを使用してデータをシャッフルします。

シャッフル戦略を使用してデータをパーティション分割すると、すべてのクラスター ノードでデータの負荷が共有されます。 各ノードは、データの 1 つのパーティションを処理します。 既定のパーティション数は、クラスター ノードの数と同じになります。

パーティション番号は、パーティションの数を制御する構文 hint.num_partitions = total_partitions を使用してオーバーライドできます。 これは、クラスターのクラスター ノード数が少ない場合に、既定のパーティション番号が小さかったり、クエリが失敗したり、実行時間が長かったりする場合に便利です。

Note

多数のパーティションを使用すると、より多くのクラスター リソースが消費され、パフォーマンスが低下する可能性があります。 hint.strategy = shuffle から始め、パーティション番号を慎重に選択し、パーティションを徐々に増やします。

場合によっては、 hint.strategy = shuffle は無視され、クエリは戦略で shuffle 実行されません。 これは、次の場合に発生することがあります。

  • join 演算子には、左側または右側に別の shuffle の互換性のある演算子 (joinsummarizemake-series または partition) があります。
  • summarize 演算子は、クエリ内の別 shuffle の互換性のある演算子 (joinsummarizemake-series または partition) の後に表示されます。

構文

With hint.strategy = shuffle

T|DataExpression|joinhint.strategy = shuffle()

T|summarizehint.strategy = shuffleDataExpression

T|Query| partition hint.strategy = shuffle(SubQuery)

With hint.shufflekey = key

T|DataExpression|joinhint.shufflekey = key(DataExpression)

T|summarizehint.shufflekey = keyDataExpression

T|make-serieshint.shufflekey = keyDataExpression

T|Query| partition hint.shufflekey = key(SubQuery)

構文規則について詳しく知る。

パラメーター

名前 必須 説明
T string ✔️ 演算子によって処理されるデータを持つ表形式のソース。
DataExpression string 暗黙的または明示的な表形式変換式。
クエリ string 変換式は 、T のレコードに対して実行されます。
key string キー、 join キー、 summarize キー、 make-series または partition キーを使用します。
サブクエリ string 変換式。

注意

選択した構文に応じて、 DataExpression または Query を指定する必要があります。

シャッフルで集計を使用する

演算子を使用したsummarize戦略クエリはshuffle、各ノードがデータの 1 つのパーティションを処理するすべてのクラスター ノードの負荷を共有します。

StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count 

出力

Count
67

シャッフルで結合を使用する

StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle 
    (
    StormEvents
    | where EventType has "Hail"
    | project EpisodeId, State, DamageProperty
    )
    on State
| count

出力

Count
103

シャッフルで make-series を使用する

StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by State

出力

State sum_DamageProperty StartTime
ノースダコタ [60000,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
ノースカロライナ [20000,0,1000] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
北大西洋 [0,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]

シャッフルでパーティションを使用する

StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
    top 3 by DamageProperty
    | project EpisodeId, State, DamageProperty
)
| count

出力

Count
22345

hint.strategy=shuffle と hint.shufflekey=key を比較する

hint.strategy=shuffle を使用すると、すべてのキーによってシャッフル済みの演算子データがシャッフルされます。 次の例では、クエリはキーとして EpisodeIdEventId の両方を使用してデータをシャッフルします。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| count

出力

Count
14

次のクエリでは hint.shufflekey = key を使用します。 上記のクエリは、このクエリと同等です。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

出力

Count
14

複数のキーを使用してデータをシャッフルする

場合によっては、 hint.strategy=shuffle は無視され、クエリはシャッフル戦略では実行されません。 たとえば、次の例では、結合の左側に集計があるため、 を使用 hint.strategy=shuffle してもクエリにシャッフル戦略は適用されません。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

出力

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

この問題を克服し、シャッフル戦略で実行するには、 と join 操作に共通するキーをsummarize選択します。 この場合、このキーは EpisodeId になります。 ヒント hint.shufflekey を使用して、 join のシャッフル キーを hint.shufflekey = EpisodeId に指定します。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

出力

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

集計とシャッフルを使用してパフォーマンスを向上させる

この例では、summarize 演算子を shuffle 戦略と一緒に使用して、パフォーマンスを向上させます。 ソース テーブルには 1 億 5,000 万のレコードが含まれ、キーによるグループのカーディナリティは 10M であり、10 クラスター ノードにまたがっています。

次のように、shuffle 戦略なしの summarize 演算子を使用すると、クエリは 1:08 より後に終了し、メモリ使用量のピークは最大 3 GB です。

orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

出力

Count
1086

次のように、summarizeshuffle 戦略を使用している間、クエリは約 7 秒後に終了し、メモリ使用量のピークは 0.43 GB です。

orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

出力

Count
1086

次の例は、2 つのクラスター ノードを持つクラスターのパフォーマンスを示しており、6,000 万のレコードを持つテーブルでは、キーによるグループのカーディナリティは 2M です。

次のように、hint.num_partitions なしでクエリを実行すると、(クラスター ノード番号として) 2 つのパーティションだけが使用され、次のクエリには最大 1 時間 10 分かかる場合があります。

lineitem 
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

次のように、パーティション番号を 10 に設定すると、クエリは 23 秒後に終了します。

lineitem 
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

結合とシャッフルを使用してパフォーマンスを向上させる

次の例は、 join 演算子で shuffle 戦略を使用してパフォーマンスを向上させる方法を示しています。

この例では、データがこれらすべてのノードに広がっている 10 のノードを含むクラスターでサンプリングされました。

クエリの左側のソース テーブルには、join キーのカーディナリティが最大 14M である 1 億 5,000 万のレコードがあります。 クエリの右側のソースには 1 億 5,000 万のレコードが含まれ、join キーのカーディナリティは 10M です。 次のように、クエリは最大 28 秒後に終了し、メモリ使用量のピークは 1.43 GB です。

customer
| join
    orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

次のように、join 演算子で shuffle 戦略を使用する場合、クエリは最大 4 秒後に終了し、メモリ使用量のピークは 0.3 GB です。

customer
| join
    hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

別の例では、次の条件を使用して、より大きなデータセットに対して同じクエリを試します。

  • join の左側のソースは 150M、キーのカーディナリティは 148M です。
  • join の右側のソースは 1.5B、キーのカーディナリティは 最大 100M です。

演算子だけを含むクエリは join 、4 分後に制限とタイムアウトに達します。 ただし、join 演算子で shuffle 戦略を使用する場合、クエリは最大 34 秒後に終了し、メモリ使用量のピークは 1.23 GB です。

次の例は、2 つのクラスター ノードを持つクラスターの機能強化を示しています。テーブルは 6,000 万のレコードがあり、join キーのカーディナリティは 2M です。 次のように、hint.num_partitions なしでクエリを実行すると、(クラスター ノード番号として) 2 つのパーティションだけが使用され、次のクエリには最大 1 時間 10 分かかる場合があります。

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey   part
on $left.l_partkey == $right.p_partkey
| consume

次のように、パーティション番号を 10 に設定する場合、クエリは 23 秒後に終了します。

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey  hint.num_partitions = 10    part
on $left.l_partkey == $right.p_partkey
| consume