Apache Kafka HDInsight 叢集的效能最佳化

本文會提供在 HDInsight 中最佳化 Apache Kafka 工作負載效能的建議。 重點在調整產生者、訊息代理程式和取用者設定。 有時候,也需要調整 OS 設定,以調整大量工作負載的效能。 測量效能的方式各有不同,而適用的最佳化措施也會決於商務需求。

架構概觀

Kafka 主題用以組織記錄。 產生者會產生記錄,而取用者會取用它們。 產生者會將記錄傳送至 Kafka 訊息代理程式,然後儲存資料。 HDInsight 叢集中的每個背景工作節點都是 Kafka 訊息代理程式。

主題會將記錄分割至各個訊息代理程式。 取用記錄時,您可以針對每個分割區最多使用一個取用者,以達到資料平行處理。

複寫用於將分割區複製到各個節點。 此分割區可防止節點 (訊息代理程式) 中斷。 複本群組中的單一分割區會被指定為分割區領導者。 使用 ZooKeeper 所管理的狀態,可將生產者流量路由傳送至每個節點的前端項目。

識別您的案例

Apache Kafka 效能有兩大層面 – 輸送量和延遲。 輸送量是資料處理的最高速率。 輸送量愈高愈好。 延遲是儲存或擷取資料所需的時間。 延遲愈低愈好。 找到輸送量、延遲和應用程式基礎結構成本間的正確平衡,會是一項挑戰。 您的效能需求應符合以下三種常見情況之一 (取決於您是否需要高輸送量、低延遲或兩者):

  • 高輸送量,低延遲。 此案例需要兼具高輸送量和低延遲 (~100 毫秒)。 這種類型的應用程式範例是服務可用性監視。
  • 高輸送量,高延遲。 此案例需要高輸送量 (~1.5 GBps),但可以容忍較高的延遲 (< 250 毫秒)。 這種類型的應用程式範例是近乎即時的處理遙測資料擷取,例如安全性和入侵偵測應用程式。
  • 低輸送量,低延遲。 此案例需要低延遲 (< 10 毫秒) 的即時處理,但可以容忍較低的輸送量。 這種類型的應用程式範例是線上拼字和文法檢查。

生產者設定

下列各節強調一些可最佳化 Kafka 產生者效能的最重要一般設定屬性。 如需所有設定屬性的詳細說明,請參閱有關產生者設定的 Apache Kafka 文件

批次大小

Apache Kafka 產生者會組合訊息群組 (稱為批次),以單元形式傳送,儲存在單一儲存體分割區中。 批次大小表示在傳輸該群組之前必須存在的位元組數目。 增加 batch.size 參數會增加輸送量,因為其可減少網路和 IO 要求的處理額外負荷。 在輕量負載下,因為產生者要等候批次準備就緒,所以增加的批次大小可能會增加 Kafka 傳送延遲。 大量負載則建議增加批次大小,以改善輸送量和延遲。

產生者必要通知

需要 acks 設定的產生者會決定分割區領導者在寫入要求被視為完成之前所需的通知數目。 此設定會影響資料可靠性,而且會接受 01-1 的值。 -1 的值表示在寫入完成之前,必須收到來自所有複本的通知。 設定 acks = -1 可針對資料遺失提供更強大的保證,但也會導致延遲更高和輸送量更低。 如果您的應用程式需求要求較高的輸送量,請嘗試設定 acks = 0acks = 1。 請記住,不認可所有複本會降低資料可靠性。

壓縮

Kafka 產生者可設定為先壓縮訊息,再傳送至訊息代理程式。 compression.type 設定會指定要使用的壓縮轉碼器。 支援的壓縮轉碼器為 “gzip”、“snappy” 和 “lz4”。 壓縮很有用,如果磁碟容量有限制,建議考慮使用。

gzipsnappy 這兩個常用的壓縮轉碼器中,gzip 的壓縮比率較高,但 CPU 負載成本較高而磁碟使用量較低。 snappy 轉碼器提供較少的壓縮,CPU 額外負荷也較低。 您可以根據訊息代理程式磁碟或產生者 CPU 限制決定要使用的轉碼器。 gzip 資料壓縮比率是 snappy 的五倍。

資料壓縮會增加磁碟上可以儲存的記錄數目。 如果產生者和訊息代理程式所用的壓縮格式不符,也可能會增加 CPU 額外負荷。 因為資料必須先壓縮再傳送,接著解壓縮後再處理。

訊息代理程式設定

下列各節強調一些可最佳化 Kafka 訊息代理程式效能的最重要設定。 如需所有訊息代理程式設定的詳細說明,請參閱有關代理程式設定的 Apache Kafka 文件

磁碟數目

儲存體磁碟的 IOPS (每秒的輸入/輸出作業) 和每秒讀取/寫入位元組有限。 建立新的分割區時,Kafka 會將每個新的分割區儲存在現有分割區最少的磁碟上,以平衡可用磁碟的分割區。 儘管有儲存體策略,但在處理每個磁碟的數百個分割區複本時,Kafka 很容易就讓可用的磁碟輸送量達到飽和。 這時要在輸送量和成本間有所取捨。 如果您的應用程式需要更高的輸送量,請為每個訊息代理程式建立具有更多受控磁碟的叢集。 HDInsight 目前不支援將受控磁碟新增至執行中的叢集。 如需如何設定受控磁碟數目的詳細資訊,請參閱 在 HDInsight 上設定 Apache Kafka 的儲存體和可擴縮性。 了解增加叢集節點儲存空間的成本影響。

主題和分割區數量

Kafka 產生者寫入主題。 Kafka 取用者讀取主題。 主題與記錄相關聯,這是磁碟上的資料結構。 Kafka 會將產生者的記錄附加至主題記錄的結尾。 主題記錄包含許多分散在多個檔案的分割區。 這些檔案接著會分散到多個 Kafka 叢集節點。 取用者依自己的節奏讀取 Kafka 主題,並可在主題記錄中挑選位置 (位移)。

每個 Kafka 分割區都是系統上的記錄檔,而產生者執行緒可以同時寫入多個記錄。 同樣地,因為每個取用者執行緒都會從一個分割區讀取訊息,所以也會平行處理從多個分割區取用訊息。

增加分割區密度 (每個訊息代理程式的分割區數目) 會增加與中繼資料作業相關的額外負荷,以及分割區領導者與其追蹤者之間的每個資料區分割要求/回應。 即使沒有資料流過,分割區複本仍會從領導者擷取資料,導致透過網路額外處理收送要求。

對於 Apache Kafka 叢集 2.1 和 2.4 以及先前在 HDInsight 中所述,我們建議每個訊息代理程式最多有 2000 個分割區,包括複本。 增加每個訊息代理程式的分割區數目會降低輸送量,也可能會導致主題無法使用。 如需 Kafka 分割區支援的詳細資訊,請參閱 Apache Kafka 官方部落格文章,以了解 1.1.0 版增加的支援分割區數目。 如需修改主題的詳細資訊,請參閱 Apache Kafka:修改主題 (英文)。

複本數目

較高的複寫係數會導致分割區領導者和追蹤者之間的其他要求。 因此,較高的複寫係數會消耗更多磁碟和 CPU 來處理其他要求,增加寫入延遲並降低輸送量。

建議在 Azure HDInsight 中至少使用 3 倍的 Kafka 複寫。 大部分的 Azure 區域都有三個容錯網域,但在只有兩個容錯網域的區域中,建議使用者使用 4 倍複寫。

如需複寫的詳細資訊,請參閱 Apache Kafka:複寫 (英文) 和 Apache Kafka:增加複寫係數 (英文)。

取用者設定

下一節強調一些可最佳化 Kafka 取用者效能的重要一般設定。 如需所有設定的詳細說明,請參閱有關取用者設定的 Apache Kafka 文件

取用者數目

分割區數目等於取用者數目是不錯的做法。 如果取用者數目小於分割區數目,則有些取用者會從多個分割區讀取,增加取用者延遲。

如果取用者數目大於分割區數目,則取用者資源會因為取用者閒置而浪費。

避免經常重新平衡取用者

分割區擁有權變更 (即取用者擴增或縮小)、訊息代理程式當機 (因為訊息代理程式是取用者群組的群組協調器)、取用者損毀、新增新主題或新增新分割區,都會觸發取用者重新平衡。 在重新平衡期間,因無法取用取用者而增加延遲。

如果取用者可以將活動訊號傳送給 session.timeout.ms 內的訊息代理程式,則可視為作用中。 否則,取用者會被視為無效或失敗。 這種延遲會導致取用者重新平衡。 降低取用者 session.timeout.ms,我們可以更快速地偵測到這些失敗。

如果 session.timeout.ms 太低,取用者可能會因為批次訊息需要較長的處理時間,或 JVM GC 暫停耗時過久等情況,而一再發生不必要的重新平衡。 如果您的取用者花費太多時間處理訊息,您可以使用 max.poll.interval.ms 增加擷取更多記錄前的取用者閒置時間上限,或使用設定參數 max.poll.records 減少傳回的批次大小上限,來解決此問題。

批次處理

和生產者一樣,我們可以為取用者新增批次處理。 您可以變更設定 fetch.min.bytes,以設定每個擷取要求中可取得的資料取用者數量。 此參數會定義取用者擷取回應所預期的位元組下限。 增加此值會減少對訊息代理程式提出的擷取要求數目,從而減少額外負荷。 此值預設為 1。 同樣地,還有另一個設定 fetch.max.wait.ms。 如果擷取要求沒有如 fetch.min.bytes 大小規定的足夠訊息,則會等到依此設定 fetch.max.wait.ms 為準的等候時間到期為止。

注意

在少數情況下,當無法處理訊息時,取用者似乎會變慢。 如果在例外狀況後未認可位移,取用者會卡在無限迴圈的特定位移,不會向前移動,以致增加取用者端的延隔時間。

大量工作負載的 Linux OS 調整

記憶體對應

vm.max_map_count 定義程序可擁有的 mmap 數目上限。 在 HDInsight Apache Kafka 叢集 Linux VM 上,此值預設為 65535。

在 Apache Kafka 中,每個記錄區段都需要一對索引/timeindex 檔案,且每個檔案各耗用 1 個 mmap。 換言之,每個記錄區段都會使用 2 個 mmap。 因此,如果每個分割區裝載一個記錄區段,至少需要 2 個 mmap。 每個分割區的記錄區段數目會因區段大小、負載強度、保留原則、復原期間而異,而且通常不只一個原因。 Mmap value = 2*((partition size)/(segment size))*(partitions)

如果需要的 mmap 值超過 vm.max_map_count,則訊息代理程式會引發「對應失敗」例外狀況。

若要避免此例外狀況,請使用下列命令來檢查 vm 中的 mmap 大小,如有需要,則增加每個背景工作角色節點的大小。

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

注意

請小心勿將此值設定過高,因為會佔用 VM 的記憶體。 JVM 在記憶體對應上可使用的記憶體數量取決於設定 MaxDirectMemory。 預設值為 64MB。 可能已達到限制。 您可以透過 Ambari 將 -XX:MaxDirectMemorySize=amount of memory used 新增至 JVM 設定,以增加此值。 請留意節點上使用的記憶體數量,以及是否有足夠可用的 RAM 支援。

下一步