Share via


確認 Azure 機密總帳寫入交易收據

Azure 機密總帳寫入交易收據代表密碼編譯 Merkle 證明對應的寫入交易已由 CCF 網路全域認可。 Azure 機密總帳使用者可以在任何時間點取得認可寫入交易的收據,以確認對應的寫入作業已成功記錄到不可變的總帳中。

如需 Azure 機密總帳寫入交易收據的詳細資訊,請參閱 專用文章

收據驗證步驟

寫入交易收據可以依照下列小節中所述的特定步驟集進行驗證。 CCF 檔 概述 相同的步驟。

分葉節點計算

第一個步驟是計算對應至已認可交易之 Merkle Tree 中分葉節點的 SHA-256 雜湊。 分葉節點是由下欄欄位的已排序串連所組成,可在 Azure 機密總帳收據的下方 leafComponents 找到:

  1. writeSetDigest
  2. SHA-256 摘要 commitEvidence
  3. claimsDigest 領域

這些值必須串連為位元組陣列:和 writeSetDigestclaimsDigest 都必須從十六進位數位字串轉換成位元組陣列;另一方面,可以將 SHA-256 雜湊函式套用至 UTF-8 編碼字串,以取得雜湊。另一方面, commitEvidence 可以將 SHA-256 雜湊函數套用至 UTF-8 編碼 commitEvidence 字串。

同樣地,分葉節點雜湊摘要可以藉由將 SHA-256 雜湊函式套用至產生的位元組結果串連來計算。

根節點計算

第二個步驟是在認可交易時計算 Merkle Tree 根目錄的 SHA-256 雜湊。 計算是反復串連和雜湊上一個反復專案的結果(從上一個步驟中 proof 計算的分葉節點雜湊開始),以及收據欄位中提供的已排序節點雜湊來完成。 此 proof 清單會以已排序的清單的形式提供,且其元素必須依指定順序進行查看。

串連必須在位元組表示上完成,以相對於欄位 (或 leftright 中提供之 物件中所指示的 proof 相對順序。

  • 如果 中 proof 目前專案的索引鍵是 left ,則先前反復專案的結果應該附加至目前的專案值。
  • 如果 中 proof 目前專案的索引鍵為 right ,則先前反復專案的結果應該前面加上目前的專案值。

每個串連之後,必須套用 SHA-256 函式,才能取得下一個反復專案的輸入。 此程式遵循標準步驟,以針對計算所需的節點,計算 Merkle 樹 狀結構之根節點

驗證根節點的簽章

第三個步驟是使用收據中的簽署節點憑證,確認透過根節點雜湊產生的密碼編譯簽章是否有效。 驗證程式會遵循使用 橢圓曲線數位簽章演算法 (ECDSA) 簽署之訊息的數位簽章驗證標準步驟。 更具體來說,步驟如下:

  1. 將 base64 字串 signature 解碼為位元組陣列。
  2. 從簽署節點憑證 cert 擷取 ECDSA 公開金鑰。
  3. 使用上一個步驟中擷取的公開金鑰,確認透過 Merkle Tree 根目錄的簽章(使用上一個子區段中的指示計算)的簽章是真實的。 此步驟有效地對應到使用 ECDSA 的標準 數位簽章 驗證程式。 在一些資料上使用公開金鑰憑證來驗證 ECDSA 簽章的最熱門程式設計語言有許多程式庫(例如 Python 的密碼編譯程式庫 )。

確認簽署節點憑證簽署

除了上一個步驟之外,還必須確認目前的總帳憑證已背書簽署節點憑證(也就是已簽署)。 此步驟不相依于先前的三個步驟,而且可以與其他步驟分開執行。

發出收據的目前服務身分識別可能不同于背書簽署節點的服務身分識別(例如,因為憑證更新所致)。 在此情況下,必須透過其他先前的服務身分識別(也就是收據中的清單欄位)來驗證簽署節點憑證( cert 也就是收據中的欄位)的憑證信任鏈結,才能通過其他先前的服務身分識別(也就是 serviceEndorsements 收據中的清單欄位)。 此 serviceEndorsements 清單會以從最舊到最新服務識別的已排序清單提供。

憑證簽署必須針對整個鏈結進行驗證,並遵循上一個小節中所述的完全相同的數位簽章驗證程式。 有常用的開放原始碼密碼編譯程式庫(例如 OpenSSL ),通常可以用來執行憑證簽署步驟。

驗證應用程式宣告摘要

在選擇性步驟中,如果應用程式宣告附加至收據,就可以從公開的宣告計算宣告摘要(遵循特定演算法),並確認摘要符合 claimsDigest 收據承載中包含的 。 若要從公開的宣告物件計算摘要,必須逐一查看清單中的每個應用程式宣告物件,並檢查其 kind 欄位。

如果宣告物件的類型為 LedgerEntry ,則應該擷取宣告的總帳集合識別碼 ( collectionId ) 和內容 ( contents ),並使用宣告物件中指定的秘密金鑰 ( secretKey ) 來計算其 HMAC 摘要。 接著會串連這兩個摘要,並計算串連的 SHA-256 雜湊。 接著會串連通訊協定 ( protocol ) 和產生的宣告資料摘要,並計算串連的另一個 SHA-256 雜湊,以取得最終摘要。

如果宣告物件的類型為 ClaimDigest ,則應該擷取宣告摘要 ( value ),並串連通訊協定 ( protocol ),並計算串連的 SHA-256 雜湊,以取得最終摘要。

計算每個單一宣告摘要之後,必須串連每個應用程式宣告物件的所有計算摘要(按照收據中呈現的順序相同)。 接著,串連應該前面加上已處理的宣告數目。 先前串連的 SHA-256 雜湊會產生最終宣告摘要,這應該符合 claimsDigest 收據物件中的存在。

更多資源

如需有關 Azure 機密總帳寫入交易收據和每個欄位說明之內容的詳細資訊,請參閱 專用文章 CCF 檔 也包含有關收據驗證和其他相關資源的詳細資訊,請參閱下列連結:

確認寫入交易收據

收據驗證公用程式

適用于 Python Azure 機密總帳用戶端程式庫提供公用程式函式,以驗證寫入交易收據,並從應用程式宣告清單計算宣告摘要。 如需如何使用資料平面 SDK 和收據特定公用程式的詳細資訊,請參閱 本節 此範例程式碼

設定和必要條件

為了參考目的,我們在 Python 中提供範例程式碼,以在上一節所述的步驟中完整驗證 Azure 機密總帳寫入交易收據。

若要執行完整驗證演算法,則需要執行中機密總帳資源的目前服務網路憑證和寫入交易收據。 如需如何從機密總帳實例擷取寫入交易收據和服務憑證的詳細資料,請參閱這篇文章

程式碼逐步解說

下列程式碼可用來初始化必要的物件,並執行收據驗證演算法。 另一個公用程式 ( verify_receipt ) 可用來執行完整的驗證演算法,並接受回應中 GET_RECEIPT 欄位的內容 receipt 做為字典,並將服務憑證當做簡單的字串。 如果收據無效,或處理期間發生任何錯誤,函式會擲回例外狀況。

假設收據和服務憑證都可以從檔案載入。 請務必使用 service_certificate_file_name 您想要驗證的服務憑證和收據各自的檔案名來更新 和 receipt_file_name 常數。

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

由於驗證程式需要一些密碼編譯和雜湊基本類型,下列程式庫可用來協助計算。

from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

在 函式中 verify_receipt ,我們會檢查指定的收據是否有效,並包含所有必要的欄位。

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

我們會初始化要用於程式其餘部分的變數。

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

我們可以使用密碼編譯程式庫,載入服務識別的 PEM 憑證、簽署節點,以及先前服務識別的簽署憑證。

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

驗證程式的第一個步驟是計算分葉節點的摘要。

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

compute_leaf_node 式會接受作為收據的分葉元件參數,並以 claimsDigestcommitEvidencewriteSetDigest 十六進位形式傳回分葉節點雜湊。

如先前所述,我們會計算 的摘要 commitEvidence (使用 SHA-256 hashlib 函式)。 然後,我們會將 writeSetDigestclaimsDigest 轉換成位元組陣列。 最後,我們會串連三個數組,並使用 SHA256 函式來摘要結果。

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

計算分葉之後,我們可以計算 Merkle 樹狀結構的根目錄。

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

我們會使用提供的 函 root 式作為 CCF Python 程式庫的一部分。 函式會連續串連上一個反復專案與 的新元素來自 proof 的結果,摘要串連串連,然後針對 中 proof 每個元素重複步驟,並搭配先前計算的摘要。 串連必須遵守 Merkle 樹狀結構中節點的順序,以確保根目錄已正確重新計算。

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

計算根節點雜湊之後,我們可以透過根目錄驗證收據中包含的簽章,以驗證簽章是否正確。

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

同樣地,CCF 程式庫會提供函 verify 式來執行這項驗證。 我們使用簽署節點憑證的 ECDSA 公開金鑰來驗證樹狀結構根目錄的簽章。

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

收據驗證的最後一個步驟是驗證用來簽署 Merkle 樹狀目錄根目錄的憑證。

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

同樣地,我們也可以使用 CCF 公用程式 check_endorsements 來驗證服務身分識別是否背書簽署節點。 憑證鏈結可以由先前的服務憑證組成,因此,如果 serviceEndorsements 不是空清單,我們應該驗證背書是否可轉移套用。

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

或者,我們也可以使用類似的方法來使用 OpenSSL 程式庫來驗證憑證。

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

範例指令碼

提供程式碼逐步解說中使用的完整範例程式碼。

主要程式

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

收據驗證

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

下一步