Проверка квитанций о транзакциях в конфиденциальном реестре Azure

Получение транзакций записи конфиденциального реестра Azure представляет собой криптографическое доказательство того, что соответствующая транзакция записи была глобально зафиксирована сетью CCF. Пользователи конфиденциального реестра Azure могут получить квитанцию о фиксации транзакции записи в любой момент времени, чтобы убедиться, что соответствующая операция записи успешно записана в неизменяемый реестр.

Дополнительные сведения о квитанциях о транзакциях в конфиденциальном реестре Azure см. в выделенной статье.

Этапы проверки квитанций

Получение транзакции записи можно проверить после определенного набора шагов, описанных в следующих подразделах. Те же действия описаны в документации по CCF.

Вычисление конечного узла

Первым шагом является вычисление хэша SHA-256 конечного узла в дереве Merkle, соответствующего зафиксированной транзакции. Конечный узел состоит из упорядоченного объединения следующих полей, которые можно найти в квитанции конфиденциального реестра Azure в разделе leafComponents:

  1. writeSetDigest
  2. Дайджест SHA-256 commitEvidence
  3. claimsDigest Поля

Эти значения необходимо объединить в виде массивов байтов: оба writeSetDigestclaimsDigest и потребуется преобразовать из строк шестнадцатеричных цифр в массивы байтов; с другой стороны, хэш (как массив байтов) можно получить, применяя хэш-функцию commitEvidence SHA-256 в кодировке commitEvidence UTF-8.

Аналогичным образом, хэш-хэш конечного узла можно вычислить, применяя хэш-функцию SHA-256 к результирующей объединения результирующих байтов.

Вычисление корневого узла

Второй шаг — вычислить хэш SHA-256 корня дерева Merkle во время фиксации транзакции. Вычисление выполняется путем итеративного объединения и хэширования результата предыдущей итерации (начиная с хэша конечного узла, вычисляемого на предыдущем шаге), с хэшами упорядоченных узлов, предоставленными в proof поле квитанции. Список proof предоставляется как упорядоченный список и его элементы должны быть итерированы в указанном порядке.

Объединение необходимо выполнить в представлении байтов относительного порядка, указанного в объектах, предоставленных в proof поле ( left или right).

  • Если ключ текущего элемента имеет proof значение left, результат предыдущей итерации должен быть добавлен к текущему значению элемента.
  • Если ключ текущего элемента находится proofright, результат предыдущей итерации должен быть предопределен к текущему значению элемента.

После каждого объединения необходимо применить функцию SHA-256, чтобы получить входные данные для следующей итерации. Этот процесс выполняет стандартные шаги по вычислению корневого узла структуры данных Дерева Merkle, учитывая необходимые узлы для вычисления.

Проверка подписи по корневому узлу

Третий шаг — убедиться, что криптографическая подпись, созданная по хэшу корневого узла, действительна с помощью сертификата узла подписи в квитанции. Процесс проверки выполняет стандартные шаги проверки цифровой подписи для сообщений, подписанных с помощью алгоритма цифровой подписи эллиптической кривой (ECDSA). В частности, ниже приведены действия.

  1. Декодирование строки signature base64 в массив байтов.
  2. Извлеките открытый ключ ECDSA из сертификата certузла подписи.
  3. Убедитесь, что подпись в корне дерева Merkle (вычисленная с помощью инструкций в предыдущем подразделе) является аутентичной с помощью извлеченного открытого ключа из предыдущего шага. Этот шаг эффективно соответствует стандартному процессу проверки цифровой подписи с помощью ECDSA. Существует множество библиотек на самых популярных языках программирования, которые позволяют проверять подпись ECDSA с помощью сертификата открытого ключа по некоторым данным (например, библиотеке шифрования для Python).

Проверка подтверждения сертификата узла подписи

Помимо предыдущего шага, также необходимо убедиться, что сертификат узла подписи одобрен (т. е. подписан) текущим сертификатом реестра. Этот шаг не зависит от остальных трех предыдущих шагов и может выполняться независимо от других.

Возможно, что текущее удостоверение службы, выдавающее квитанцию, отличается от того, что одобрил узел подписи (например, из-за продления сертификата). В этом случае необходимо проверить цепочку сертификатов доверия от сертификата узла подписи (т cert . е. поля в квитанции) до доверенного корневого центра сертификации (ЦС) (то есть текущего сертификата удостоверения службы) с помощью других предыдущих удостоверений службы (т serviceEndorsements . е. поля списка в квитанции). Список serviceEndorsements предоставляется как упорядоченный список от самого старого до последнего удостоверения службы.

Подтверждение сертификата должно быть проверено для всей цепочки и следует точно такой же процедуре проверки цифровой подписи, описанной в предыдущем подразделе. Существуют популярные криптографические библиотеки с открытым кодом (например, OpenSSL), которые обычно можно использовать для выполнения шага подтверждения сертификата.

Проверка дайджеста утверждений приложения

В качестве дополнительного шага, если утверждения приложения присоединены к квитанции, можно вычислить дайджест утверждений из предоставленных утверждений (после определенного алгоритма) и убедиться, что дайджест соответствует claimsDigest содержащимся в полезных данных квитанции. Чтобы вычислить дайджест из предоставленных объектов утверждений, необходимо выполнить итерацию по каждому объекту утверждения приложения в списке и проверка его kind поле.

Если объект утверждения имеет LedgerEntryвид, идентификатор коллекции реестра () и содержимоеcontents утверждения должны быть извлечены и использованы для вычисления дайджестов HMAC с помощью секретного ключа (collectionIdsecretKey), указанного в объекте утверждения. Эти два дайджеста затем объединяются, а хэш SHA-256 объединения вычисляется. Затем протокол (protocol) и результирующий дайджест данных утверждения объединяются, а затем вычисляется другой хэш SHA-256 объединения, чтобы получить окончательный дайджест.

Если объект утверждения имеет ClaimDigestвид, то должен быть извлечен дайджест утверждений (value), сцеплен с протоколом (protocol), а хэш SHA-256 объединения вычисляется для получения окончательного дайджеста.

После вычисления каждого дайджеста утверждений необходимо объединить все вычисляемые дайджесты из каждого объекта утверждения приложения (в том же порядке, что они представлены в квитанции). Затем объединение должно быть предопределено числом обработанных утверждений. Хэш SHA-256 предыдущего объединения создает окончательный дайджест утверждений, который должен соответствовать claimsDigest настоящему объекту квитанции.

Дополнительные ресурсы

Дополнительные сведения о содержимом уведомления о транзакциях в конфиденциальном реестре Azure см. в выделенной статье. Документация по CCF также содержит дополнительные сведения о проверке квитанций и других связанных ресурсах по следующим ссылкам:

Проверка квитанций о транзакциях записи

Служебные программы проверки квитанций

Клиентская библиотека конфиденциального реестра Azure для Python предоставляет служебные функции для проверки квитанций о транзакциях записи и вычисления дайджеста утверждений из списка утверждений приложения. Дополнительные сведения об использовании пакета SDK плоскости данных и служебных программ, относящихся к квитанциям, см . в этом разделе и этом примере кода.

Настройка и необходимые компоненты

В справочных целях мы предоставляем пример кода в Python для полной проверки квитанций о транзакциях конфиденциального реестра Azure, описанных в предыдущем разделе.

Для выполнения полного алгоритма проверки требуется текущий сетевой сертификат службы и получение транзакции записи из запущенного ресурса конфиденциального реестра. Дополнительные сведения о получении квитанции о транзакциях записи и сертификате службы из экземпляра конфиденциального реестра см. в этой статье.

Пошаговое руководство по написанию кода

Следующий код можно использовать для инициализации необходимых объектов и запуска алгоритма проверки квитанций. Отдельная служебная программа (verify_receipt) используется для выполнения полного алгоритма проверки и принимает содержимое receipt поля в ответе в GET_RECEIPT качестве словаря и сертификата службы в виде простой строки. Функция создает исключение, если квитанция недействительна или возникла какая-либо ошибка во время обработки.

Предполагается, что как квитанция, так и сертификат службы можно загрузить из файлов. Обязательно обновите как константы, так service_certificate_file_name и receipt_file_name соответствующие имена файлов сертификата службы и квитанции, которые вы хотите проверить.

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

Так как процесс проверки требует некоторых примитивов шифрования и хэширования, для упрощения вычислений используются следующие библиотеки.

from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

verify_receipt Внутри функции мы проверка, что указанная квитанция действительна и содержит все обязательные поля.

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

Мы инициализируем переменные, которые будут использоваться в остальной части программы.

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

Вы можете загрузить сертификаты PEM для удостоверения службы, узла подписи и подтверждения сертификатов из предыдущих удостоверений службы с помощью библиотеки шифрования.

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

Первым шагом процесса проверки является вычисление дайджеста конечного узла.

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

Функция compute_leaf_node принимает в качестве параметров конечные компоненты квитанции (теclaimsDigestcommitEvidence, и и) writeSetDigestи возвращает хэш конечного узла в шестнадцатеричной форме.

Как описано ранее, мы вычисляем дайджест commitEvidence (с помощью функции SHA-256 hashlib ). Затем мы преобразуем оба writeSetDigestclaimsDigest и в массивы байтов. Наконец, мы сцепляем три массива и дайджестируем результат с помощью функции SHA256.

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

После вычисления листа можно вычислить корень дерева Merkle.

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

Мы используем функцию root , предоставляемую в рамках библиотеки Python CCF. Функция последовательно объединяет результат предыдущей итерации с новым элементом из proof, дайджестирует объединение, а затем повторяет шаг для каждого элемента с proof ранее вычисляемым дайджестом. Объединение должно соблюдать порядок узлов в дереве Merkle, чтобы убедиться, что корень перекомпилируется правильно.

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

После вычисления хэша корневого узла мы можем проверить подпись, содержащуюся в квитанции по корню, чтобы проверить правильность подписи.

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

Аналогичным образом библиотека CCF предоставляет функцию verify для этой проверки. Открытый ключ ECDSA сертификата узла подписывания используется для проверки подписи в корне дерева.

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

Последний шаг проверки квитанции — проверка сертификата, который использовался для подписывания корня дерева Merkle.

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

Аналогичным образом мы можем использовать служебную check_endorsements программу CCF для проверки того, что удостоверение службы поддерживает узел подписывания. Цепочка сертификатов может состоять из предыдущих сертификатов службы, поэтому мы должны проверить, применяется ли подтверждение транзитивно, если serviceEndorsements это не пустой список.

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

В качестве альтернативы можно также проверить сертификат с помощью библиотеки OpenSSL с помощью аналогичного метода.

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

Пример кода

Предоставляется полный пример кода, используемый в пошаговом руководстве по коду.

Основная программа

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

Проверка квитанции

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

Следующие шаги