Verificare le ricevute delle transazioni di scrittura del libro mastro riservato di Azure

Una ricevuta di scrittura del libro mastro riservato di Azure rappresenta una prova di merkle crittografica che indica che la transazione di scrittura corrispondente è stata sottoposta a commit globale dalla rete CCF. Gli utenti del libro mastro riservato di Azure possono ricevere una ricevuta su una transazione di scrittura di cui è stato eseguito il commit in qualsiasi momento per verificare che l'operazione di scrittura corrispondente sia stata registrata correttamente nel libro mastro non modificabile.

Per altre informazioni sulle ricevute delle transazioni di scrittura del libro mastro riservato di Azure, vedere l'articolo dedicato.

Passaggi di verifica della ricevuta

È possibile verificare una ricevuta di transazione di scrittura seguendo un set specifico di passaggi descritti nelle sottosezioni seguenti. Gli stessi passaggi sono descritti nella documentazione CCF.

Calcolo dei nodi foglia

Il primo passaggio consiste nel calcolare l'hash SHA-256 del nodo foglia nell'albero Merkle corrispondente alla transazione di cui è stato eseguito il commit. Un nodo foglia è costituito dalla concatenazione ordinata dei campi seguenti disponibili in una ricevuta di Contabilità riservata di Azure, in leafComponents:

  1. writeSetDigest
  2. Digest SHA-256 di commitEvidence
  3. claimsDigest Campi

Questi valori devono essere concatenati come matrici di byte: entrambi writeSetDigest e claimsDigest devono essere convertiti da stringhe di cifre esadecimali in matrici di byte. D'altra parte, l'hash di commitEvidence (come matrice di byte) può essere ottenuto applicando la funzione hash SHA-256 sulla stringa con commitEvidence codifica UTF-8.

Analogamente, il digest hash del nodo foglia può essere calcolato applicando la funzione hash SHA-256 sulla concatenazione dei byte risultanti.

Calcolo del nodo radice

Il secondo passaggio consiste nel calcolare l'hash SHA-256 della radice dell'albero Merkle al momento del commit della transazione. Il calcolo viene eseguito tramite la concatenazione iterativa e l'hashing del risultato dell'iterazione precedente (a partire dall'hash del nodo foglia calcolato nel passaggio precedente) con gli hash dei nodi ordinati forniti nel proof campo di una ricevuta. L'elenco proof viene fornito come elenco ordinato e i relativi elementi devono essere iterati nell'ordine specificato.

La concatenazione deve essere eseguita sulla rappresentazione dei byte rispetto all'ordine relativo indicato negli oggetti forniti nel proof campo ( left o right).

  • Se la chiave dell'elemento corrente in proof è left, il risultato dell'iterazione precedente deve essere aggiunto al valore dell'elemento corrente.
  • Se la chiave dell'elemento corrente in proof è right, il risultato dell'iterazione precedente deve essere anteporre al valore dell'elemento corrente.

Dopo ogni concatenazione, è necessario applicare la funzione SHA-256 per ottenere l'input per l'iterazione successiva. Questo processo segue i passaggi standard per calcolare il nodo radice di una struttura di dati Merkle Tree in base ai nodi necessari per il calcolo.

Verificare la firma sul nodo radice

Il terzo passaggio consiste nel verificare che la firma crittografica prodotta sull'hash del nodo radice sia valida usando il certificato del nodo di firma nella ricevuta. Il processo di verifica segue i passaggi standard per la verifica della firma digitale per i messaggi firmati usando l'algoritmo ECDSA (Elliptic Curve Digital Signature Algorithm). In particolare, i passaggi sono:

  1. Decodificare la stringa signature base64 in una matrice di byte.
  2. Estrarre la chiave pubblica ECDSA dal certificato certdel nodo di firma .
  3. Verificare che la firma sulla radice dell'albero Merkle (calcolata usando le istruzioni nella sottosezione precedente) sia autenticata usando la chiave pubblica estratta dal passaggio precedente. Questo passaggio corrisponde in modo efficace a un processo di verifica della firma digitale standard tramite ECDSA. Esistono molte librerie nei linguaggi di programmazione più diffusi che consentono di verificare una firma ECDSA usando un certificato di chiave pubblica su alcuni dati, ad esempio la libreria di crittografia per Python.

Verificare l'autenticità del certificato del nodo di firma

Oltre al passaggio precedente, è necessario anche verificare che il certificato del nodo di firma sia approvato (ovvero firmato) dal certificato del libro mastro corrente. Questo passaggio non dipende dagli altri tre passaggi precedenti e può essere eseguito in modo indipendente dagli altri.

È possibile che l'identità del servizio corrente che ha emesso la ricevuta sia diversa da quella che ha approvato il nodo di firma, ad esempio a causa di un rinnovo del certificato. In questo caso, è necessario verificare la catena di certificati attendibili dal certificato del nodo di firma( ovvero il cert campo nella ricevuta) fino all'autorità di certificazione radice attendibile (CA), ovvero il certificato di identità del servizio corrente, tramite altre identità del servizio precedenti(ovvero il serviceEndorsements campo elenco nella ricevuta). L'elenco serviceEndorsements viene fornito come elenco ordinato dal meno recente all'identità del servizio più recente.

L'autenticità del certificato deve essere verificata per l'intera catena e segue esattamente lo stesso processo di verifica della firma digitale descritto nella sottosezione precedente. Esistono librerie di crittografia open source più diffuse ( ad esempio OpenSSL) che in genere possono essere usate per eseguire un passaggio di verifica dell'autenticità dei certificati.

Verificare il digest delle attestazioni dell'applicazione

Come passaggio facoltativo, nel caso in cui le attestazioni dell'applicazione siano associate a una ricevuta, è possibile calcolare il digest delle attestazioni dalle attestazioni esposte (seguendo un algoritmo specifico) e verificare che il digest corrisponda a quello claimsDigest contenuto nel payload della ricevuta. Per calcolare il digest dagli oggetti attestazioni esposti, è necessario scorrere ogni oggetto attestazione dell'applicazione nell'elenco e controllarne kind il campo.

Se l'oggetto attestazione è di tipo LedgerEntry, l'ID raccolta libro mastro (collectionId) e il contenuto (contents) dell'attestazione devono essere estratti e usati per calcolare i digest HMAC usando la chiave privata (secretKey) specificata nell'oggetto attestazione. Questi due digest vengono quindi concatenati e viene calcolato l'hash SHA-256 della concatenazione. Il protocollo (protocol) e il digest dei dati attestazioni risultanti vengono quindi concatenati e viene calcolato un altro hash SHA-256 della concatenazione per ottenere il digest finale.

Se l'oggetto attestazione è di tipo ClaimDigest, il digest dell'attestazione (value) deve essere estratto, concatenato con il protocollo (protocol) e l'hash SHA-256 della concatenazione viene calcolato per ottenere il digest finale.

Dopo aver calcolato ogni singolo digest attestazione, è necessario concatenare tutti i digest calcolati da ogni oggetto attestazione dell'applicazione (nello stesso ordine in cui vengono presentati nella ricevuta). La concatenazione deve quindi essere anteporta al numero di attestazioni elaborate. L'hash SHA-256 della concatenazione precedente produce il digest delle attestazioni finale, che deve corrispondere al claimsDigest presente nell'oggetto ricevuta.

Altre risorse

Per altre informazioni sul contenuto di una ricevuta di transazione di scrittura del libro mastro riservato di Azure e sulla spiegazione di ogni campo, vedere l'articolo dedicato. La documentazione CCF contiene anche altre informazioni sulla verifica della ricezione e altre risorse correlate nei collegamenti seguenti:

Verificare le ricevute delle transazioni di scrittura

Utilità di verifica della ricevuta

La libreria client del libro mastro riservato di Azure per Python offre funzioni di utilità per verificare le ricevute delle transazioni di scrittura e calcolare il digest delle attestazioni da un elenco di attestazioni dell'applicazione. Per altre informazioni su come usare l'SDK del piano dati e le utilità specifiche della ricevuta, vedere questa sezione e questo codice di esempio.

Configurazione e prerequisiti

Ai fini di riferimento, viene fornito il codice di esempio in Python per verificare completamente le ricevute delle transazioni di scrittura del libro mastro riservato di Azure seguendo i passaggi descritti nella sezione precedente.

Per eseguire l'algoritmo di verifica completo, sono necessari il certificato di rete del servizio corrente e una ricevuta di transazione di scrittura da una risorsa Confidential Ledger in esecuzione. Per informazioni dettagliate su come recuperare una ricevuta di transazione di scrittura e il certificato del servizio da un'istanza di Confidential Ledger, vedere questo articolo .

Procedura dettagliata per il codice

Il codice seguente può essere usato per inizializzare gli oggetti necessari ed eseguire l'algoritmo di verifica della ricevuta. Un'utilità separata (verify_receipt) viene usata per eseguire l'algoritmo di verifica completo e accetta il contenuto del receipt campo in una GET_RECEIPT risposta come dizionario e il certificato del servizio come stringa semplice. La funzione genera un'eccezione se la ricevuta non è valida o se si è verificato un errore durante l'elaborazione.

Si presuppone che sia la ricevuta che il certificato del servizio possano essere caricati dai file. Assicurarsi di aggiornare le service_certificate_file_name costanti e receipt_file_name con i rispettivi nomi di file del certificato del servizio e la ricevuta da verificare.

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

Poiché il processo di verifica richiede alcune primitive di crittografia e hashing, vengono usate le librerie seguenti per facilitare il calcolo.

  • La libreria CCF Python: il modulo fornisce un set di strumenti per la verifica della ricezione.
  • La libreria di crittografia Python: una libreria ampiamente usata che include vari algoritmi di crittografia e primitive.
  • Modulo hashlib, parte della libreria standard Python: un modulo che fornisce un'interfaccia comune per gli algoritmi di hashing più diffusi.
from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

All'interno della verify_receipt funzione si verifica che la ricevuta specificata sia valida e contenga tutti i campi obbligatori.

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

Inizializziamo le variabili che verranno usate nel resto del programma.

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

È possibile caricare i certificati PEM per l'identità del servizio, il nodo di firma e i certificati di verifica dell'autenticità delle identità del servizio precedenti usando la libreria di crittografia.

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

Il primo passaggio del processo di verifica consiste nel calcolare il digest del nodo foglia.

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

La compute_leaf_node funzione accetta come parametri i componenti foglia della ricevuta (, claimsDigestcommitEvidence, e writeSetDigest) e restituisce l'hash del nodo foglia in formato esadecimale.

Come descritto in precedenza, viene calcolato il digest di commitEvidence (usando la funzione SHA-256 hashlib ). Si convertono quindi sia che writeSetDigestclaimsDigest in matrici di byte. Infine, concatenare le tre matrici e digerire il risultato usando la funzione SHA256.

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

Dopo aver calcolato la foglia, è possibile calcolare la radice dell'albero Merkle.

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

La funzione root fornita come parte della libreria Python CCF viene usata. La funzione concatena successivamente il risultato dell'iterazione precedente con un nuovo elemento da proof, riepiloga la concatenazione e quindi ripete il passaggio per ogni elemento in proof con il digest calcolato in precedenza. La concatenazione deve rispettare l'ordine dei nodi nell'albero Merkle per assicurarsi che la radice venga ricalcocata correttamente.

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

Dopo aver calcolato l'hash del nodo radice, è possibile verificare la firma contenuta nella ricevuta sulla radice per verificare che la firma sia corretta.

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

Analogamente, la libreria CCF fornisce una funzione verify per eseguire questa verifica. Viene usata la chiave pubblica ECDSA del certificato del nodo di firma per verificare la firma sulla radice dell'albero.

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

L'ultimo passaggio della verifica della ricevuta consiste nel convalidare il certificato usato per firmare la radice dell'albero Merkle.

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

Analogamente, è possibile usare l'utilità check_endorsements CCF per verificare che l'identità del servizio approvi il nodo di firma. La catena di certificati può essere costituita da certificati di servizio precedenti, pertanto è necessario verificare che l'approvazione venga applicata in modo transitivo se serviceEndorsements non è un elenco vuoto.

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

In alternativa, è anche possibile convalidare il certificato usando la libreria OpenSSL usando un metodo simile.

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

Codice di esempio

Viene fornito il codice di esempio completo usato nella procedura dettagliata del codice.

Programma principale

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

Verifica della ricevuta

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

Passaggi successivi