Sdílet prostřednictvím


Orleans transakcí

Orleans podporuje distribuované transakce ACID proti trvalému stavu zrnitosti. Transakce se implementují pomocí Microsoftu .Orleans Transakce balíčku NuGet. Zdrojový kód ukázkové aplikace v tomto článku se skládá ze čtyř projektů:

  • Abstrakce: Knihovna tříd obsahující rozhraní grain a sdílené třídy.
  • Zrna: Knihovna tříd obsahující implementace zrnitosti.
  • Server: Konzolová aplikace, která využívá abstrakce a knihovny tříd zrn a funguje jako Orleans sila.
  • Klient: Konzolová aplikace, která využívá knihovnu Orleans abstrakcí tříd, která představuje klienta.

Nastavení

Orleans transakce jsou opt-in. Pro použití transakcí musí být nakonfigurovaný silo i klient. Pokud nejsou nakonfigurované, obdrží všechna volání transakčních metod v implementaci agregačního intervalu OrleansTransactionsDisabledException. Pokud chcete povolit transakce na silu, zavolejte SiloBuilderExtensions.UseTransactions tvůrce hostitelů sila:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

Podobně pokud chcete povolit transakce na klientovi, zavolejte ClientBuilderExtensions.UseTransactions tvůrce hostitelů klienta:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

Úložiště transakčního stavu

Pokud chcete používat transakce, musíte nakonfigurovat úložiště dat. K podpoře různých úložišť dat s transakcemi se používá abstrakce ITransactionalStateStorage<TState> úložiště. Tato abstrakce je specifická pro potřebytransakcíchIGrainStorage Pokud chcete použít úložiště specifické pro transakce, nakonfigurujte silo pomocí jakékoli implementace ITransactionalStateStorage, jako je Azure (AddAzureTableTransactionalStateStorage).

Představte si například následující konfiguraci tvůrce hostitelů:

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

Pro účely vývoje platí, že pokud pro úložiště dat, které potřebujete, není k dispozici úložiště specifické pro transakce, můžete místo toho použít IGrainStorage implementaci. V případě jakéhokoli transakčního stavu, který nemá nakonfigurované úložiště, se transakce pokusí převzít služby při selhání do úložiště grain pomocí mostu. Přístup k transakčnímu stavu prostřednictvím mostu do úložiště zrnek je méně efektivní a nemusí být v budoucnu podporován. Proto doporučujeme toto použít pouze pro účely vývoje.

Rozhraní pro agregační intervaly

Aby bylo možné podporovat transakce, transakční metody v rozhraní grain musí být označeny jako součást transakce pomocí TransactionAttribute. Atribut musí indikovat, jak se volání agregace chová v transakčním prostředí, jak je podrobně popsáno s následujícími TransactionOption hodnotami:

  • TransactionOption.Create: Volání je transakční a vždy vytvoří nový kontext transakce (spustí novou transakci), i když je volána v existujícím kontextu transakce.
  • TransactionOption.Join: Volání je transakční, ale lze volat pouze v kontextu existující transakce.
  • TransactionOption.CreateOrJoin: Volání je transakční. Pokud je volána v kontextu transakce, použije tento kontext, jinak vytvoří nový kontext.
  • TransactionOption.Suppress: Volání není transakční, ale lze volat z transakce. Pokud je volána v kontextu transakce, kontext se nepředá volání.
  • TransactionOption.Supported: Volání není transakční, ale podporuje transakce. Pokud je volána v kontextu transakce, kontext se předá volání.
  • TransactionOption.NotAllowed: Volání není transakční a nelze volat z transakce. Pokud je volána v kontextu transakce, vyvolá NotSupportedException.

Volání lze označit jako TransactionOption.Create, což znamená, že volání vždy spustí svou transakci. Například Transfer operace v níže uvedeném agregačním intervalu atm vždy spustí novou transakci, která zahrnuje dva odkazované účty.

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

Transakční operace Withdraw a agregační agregace účtů jsou označeny TransactionOption.Join, což znamená, že je lze volat pouze v kontextu existující transakce, což by byl případ, kdyby byly volány během IAtmGrain.TransferDeposit . Volání GetBalance je označené CreateOrJoin tak, aby bylo možné volat z existující transakce, například prostřednictvím IAtmGrain.Transfer, nebo samostatně.

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

Důležitá poznámka

OnActivateAsync Před voláním nelze označit jako transakční jako jakékoli takové volání, které vyžaduje správné nastavení. Existuje pouze pro rozhraní API odstupňované aplikace. To znamená, že pokus o čtení transakčního stavu jako součást těchto metod vyvolá výjimku v modulu runtime.

Implementace agregačních intervalů

Implementace zrnitosti potřebuje ke ITransactionalState<TState> správě stavu zrnitosti prostřednictvím transakcí ACID omezující vlastnost.

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

Veškerý přístup pro čtení nebo zápis k trvalému stavu musí být proveden prostřednictvím synchronních funkcí předaných omezující vlastnosti transakčního stavu. To umožňuje transakčnímu systému provádět nebo rušit tyto operace transakční transakce. Chcete-li použít transakční stav v rámci agregačního intervalu, definujete serializovatelnou třídu stavu, která má být zachována a deklarovat transakční stav v konstruktoru grainu pomocí TransactionalStateAttribute. Druhá deklaruje název stavu a volitelně i úložiště transakčního stavu, které se má použít. Další informace naleznete v tématu Instalace.

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

Jako příklad Balance je objekt stavu definován takto:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

Předchozí stav objekt:

  • Je vyzdoben pomocí GenerateSerializerAttribute pokyn generátoru Orleans kódu k vygenerování serializátoru.
  • Value Má vlastnost, která je zdobena IdAttribute jedinečnou identifikací člena.

Objekt Balance stavu se pak použije v implementaci AccountGrain následujícím způsobem:

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

Důležité

Transakční agregační agregační interval musí být označený, ReentrantAttribute aby se zajistilo, že kontext transakce je správně předán do volání agregačního intervalu.

V předchozím příkladu TransactionalStateAttribute se používá k deklaraci, že balance konstruktor parametr by měl být přidružen k transakční stav s názvem "balance". S touto deklarací Orleans vloží ITransactionalState<TState> instanci se stavem načteným z úložiště transakčního stavu s názvem "TransactionStore". Stav lze upravit prostřednictvím PerformUpdate nebo přečíst prostřednictvím PerformRead. Infrastruktura transakcí zajistí, že všechny takové změny provedené jako součást transakce, a to i mezi více zrn distribuovanými v clusteru Orleans , budou potvrzeny nebo všechny budou vráceny zpět po dokončení volání agregace, která vytvořila transakci (IAtmGrain.Transfer v předchozím příkladu).

Volání metod transakcí z klienta

Doporučeným způsobem volání metody agregační transakce je použít ITransactionClient. Služba ITransactionClient injektáže závislostí se při konfiguraci klienta automaticky zaregistruje u poskytovatele Orleans služby injektáže závislostí. Slouží ITransactionClient k vytvoření kontextu transakce a k volání transakčních metod agregačního intervalu v tomto kontextu. Následující příklad ukazuje, jak použít ITransactionClient k volání transakčních metod agregačního intervalu.

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

V předchozím klientském kódu:

  • Konfiguruje se IHostBuilder s UseOrleansClient.
    • Používá IClientBuilder clustering localhost a transakce.
  • Rozhraní IClusterClient a ITransactionClient rozhraní se načítají od poskytovatele služeb.
  • Proměnné from jsou to přiřazeny jejich IAccountGrain odkazy.
  • Slouží ITransactionClient k vytvoření transakce, volání:
    • Withdraw v referenčních informacích o agregačním intervalu from účtu.
    • Deposit v referenčních informacích o agregačním intervalu to účtu.

Transakce jsou vždy potvrzeny, pokud není výjimka, která je vyvolán v transactionDelegate zadaném nebo v rozporu transactionOption . I když doporučený způsob volání transakčních odstupňovaných metod je použít ITransactionClient, můžete také volat transakční odstupňované metody přímo z jiného zrnitého.

Volání metod transakcí z jiného agregace

Transakční metody v rozhraní grain jsou volána jako jakákoli jiná metoda agregace. Jako alternativní přístup pomocí ITransactionClient, AtmGrain implementace níže volá metodu Transfer (což je transakční) v IAccountGrain rozhraní.

Zvažte implementaci AtmGrain , která řeší dvě odkazovaná zrna účtů a provádí příslušná volání Withdraw a Deposit:

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

Kód klientské aplikace může volat AtmGrain.Transfer transakčním způsobem následujícím způsobem:

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

V předchozích voláních IAtmGrain se používá k přenosu 100 jednotek měny z jednoho účtu do druhého. Po dokončení převodu se oba účty dotazují, aby získaly aktuální zůstatek. Převod měny i dotazy na účty se provádějí jako transakce ACID.

Jak je znázorněno v předchozím příkladu, transakce mohou vracet hodnoty v rámci , Taskjako jsou jiné volání agregace. Ale při selhání volání nebudou vyvolány výjimky aplikace, ale spíše nebo OrleansTransactionExceptionTimeoutException. Pokud aplikace vyvolá výjimku během transakce a tato výjimka způsobí selhání transakce (na rozdíl od selhání kvůli jiným systémovým selháním), výjimka aplikace bude vnitřní výjimkou OrleansTransactionException.

Pokud je vyvolána výjimka transakce typu OrleansTransactionAbortedException, transakce selhala a lze ji opakovat. Jakákoli jiná vyvolaná výjimka značí, že transakce byla ukončena neznámým stavem. Vzhledem k tomu, že transakce jsou distribuované operace, transakce v neznámém stavu mohla být úspěšná, neúspěšná nebo stále probíhá. Z tohoto důvodu je vhodné před ověřením stavu nebo opakováním operace umožnit předáníSiloMessagingOptions.SystemResponseTimeout časového limitu časového limitu () volání, abyste se vyhnuli kaskádovým přerušením.