Dela via


Orleans transaktioner

Orleans stöder distribuerade ACID-transaktioner mot beständiga korntillstånd. Transaktioner implementeras med hjälp av Microsoft.Orleans. NuGet-paket för transaktioner . Källkoden för exempelappen i den här artikeln består av fyra projekt:

  • Abstraktioner: Ett klassbibliotek som innehåller korngränssnitten och delade klasser.
  • Korn: Ett klassbibliotek som innehåller kornimplementeringarna.
  • Server: En konsolapp som använder abstraktioner och kornklassbibliotek och fungerar som Orleans silo.
  • Klient: En konsolapp som använder abstraktionsklassbiblioteket som representerar Orleans klienten.

Ställ in

Orleans transaktioner är anmälning. Både silo och klient måste konfigureras för att använda transaktioner. Om de inte har konfigurerats får OrleansTransactionsDisabledExceptionalla anrop till transaktionsmetoder på en kornig implementering . Om du vill aktivera transaktioner på en silo anropar du SiloBuilderExtensions.UseTransactions silovärdverktyget:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

På samma sätt anropar du klientens värdbyggare för att aktivera transaktioner på klienten ClientBuilderExtensions.UseTransactions :

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

Transaktionstillståndslagring

Om du vill använda transaktioner måste du konfigurera ett datalager. För att stödja olika datalager med transaktioner används lagringsabstraktionen ITransactionalStateStorage<TState> . Den här abstraktionen är specifik för transaktionernas behov, till skillnad från allmän kornlagring (IGrainStorage). Om du vill använda transaktionsspecifik lagring konfigurerar du silon med valfri implementering av ITransactionalStateStorage, till exempel Azure (AddAzureTableTransactionalStateStorage).

Tänk till exempel på följande konfiguration av värdbyggaren:

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

I utvecklingssyfte kan du använda en IGrainStorage implementering i stället om transaktionsspecifik lagring inte är tillgänglig för det datalager du behöver. För alla transaktionstillstånd som inte har konfigurerat ett arkiv försöker transaktioner redundansväxla till kornlagringen med hjälp av en brygga. Det är mindre effektivt att komma åt ett transaktionstillstånd via en brygga till kornlagring och kanske inte stöds i framtiden. Rekommendationen är därför att endast använda detta i utvecklingssyfte.

Gränssnitt för korn

För ett korn som stöder transaktioner måste transaktionsmetoder i ett korngränssnitt markeras som en del av en transaktion med hjälp TransactionAttributeav . Attributet måste ange hur kornanropet beter sig i en transaktionsmiljö enligt beskrivningen med följande TransactionOption värden:

  • TransactionOption.Create: Anropet är transaktionellt och skapar alltid en ny transaktionskontext (den startar en ny transaktion), även om den anropas i en befintlig transaktionskontext.
  • TransactionOption.Join: Anropet är transaktionellt men kan bara anropas inom ramen för en befintlig transaktion.
  • TransactionOption.CreateOrJoin: Anropet är transaktionellt. Om den anropas inom ramen för en transaktion kommer den att använda den kontexten, annars skapas en ny kontext.
  • TransactionOption.Suppress: Anropet är inte transaktionellt men kan anropas inifrån en transaktion. Om den anropas inom ramen för en transaktion skickas inte kontexten till anropet.
  • TransactionOption.Supported: Anropet är inte transaktionellt men stöder transaktioner. Om den anropas inom ramen för en transaktion skickas kontexten till anropet.
  • TransactionOption.NotAllowed: Anropet är inte transaktionellt och kan inte anropas inifrån en transaktion. Om den anropas i en transaktions kontext utlöser den NotSupportedException.

Anrop kan markeras som TransactionOption.Create, vilket innebär att anropet alltid startar transaktionen. Åtgärden i ATM-kornet nedan startar till exempel Transfer alltid en ny transaktion som omfattar de två refererade kontona.

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

Transaktionsåtgärderna Withdraw och Deposit på kontointervallet är markerade TransactionOption.Join, vilket anger att de bara kan anropas inom ramen för en befintlig transaktion, vilket skulle vara fallet om de anropades under IAtmGrain.Transfer. Anropet GetBalance är markerat CreateOrJoin så att det kan anropas från en befintlig transaktion, till exempel via IAtmGrain.Transfer, eller på egen hand.

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

Viktigt!

Det OnActivateAsync gick inte att markera som transaktionell eftersom ett sådant anrop kräver en korrekt konfiguration före anropet. Den finns bara för API:et för kornprogram. Det innebär att ett försök att läsa transaktionstillstånd som en del av dessa metoder genererar ett undantag i körningen.

Korniga implementeringar

En kornimplementering måste använda en ITransactionalState<TState> aspekt för att hantera korntillstånd via ACID-transaktioner.

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

All läs- eller skrivåtkomst till det bevarade tillståndet måste utföras via synkrona funktioner som skickas till transaktionstillståndets fasettering. Detta gör att transaktionssystemet kan utföra eller avbryta dessa åtgärder transaktionsmässigt. Om du vill använda ett transaktionstillstånd inom ett korn definierar du en serialiserbar tillståndsklass som ska bevaras och deklarera transaktionstillståndet i kornkonstruktorn med en TransactionalStateAttribute. Den senare deklarerar tillståndsnamnet och, om du vill, vilken transaktionstillståndslagring som ska användas. Mer information finns i Installation.

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

Till exempel definieras tillståndsobjektet Balance på följande sätt:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

Föregående tillståndsobjekt:

  • Är dekorerad med GenerateSerializerAttribute för att instruera Orleans kodgeneratorn att generera en serialiserare.
  • Har en Value egenskap som är dekorerad med IdAttribute för att unikt identifiera medlemmen.

Tillståndsobjektet Balance används sedan i implementeringen enligt AccountGrain följande:

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

Viktigt!

Ett transaktionellt korn måste markeras med ReentrantAttribute för att säkerställa att transaktionskontexten skickas korrekt till kornanropet.

I föregående exempel TransactionalStateAttribute används för att deklarera att balance konstruktorparametern ska associeras med ett transaktionstillstånd med namnet "balance". Med den här deklarationen Orleans matar in en ITransactionalState<TState> instans med ett tillstånd som läses in från transaktionstillståndslagringen med namnet "TransactionStore". Tillståndet kan ändras via PerformUpdate eller läsas via PerformRead. Transaktionsinfrastrukturen säkerställer att alla sådana ändringar som utförs som en del av en transaktion, även bland flera korn som distribueras över ett Orleans kluster, antingen checkas in eller att alla ångras när kornanropet som skapade transaktionen har slutförts (IAtmGrain.Transfer i föregående exempel).

Anropa transaktionsmetoder från en klient

Det rekommenderade sättet att anropa en transaktionskornsmetod är att använda ITransactionClient. ITransactionClient Registreras automatiskt med providern för beroendeinmatningstjänsten när Orleans klienten har konfigurerats. ITransactionClient Används för att skapa en transaktionskontext och för att anropa transaktionsintervallmetoder i den kontexten. I följande exempel visas hur du använder ITransactionClient för att anropa transaktionsintervallmetoder.

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

I föregående klientkod:

  • IHostBuilder Är konfigurerad med UseOrleansClient.
    • IClientBuilder Använder localhost-klustring och transaktioner.
  • Gränssnitten IClusterClient och ITransactionClient hämtas från tjänstleverantören.
  • Variablerna from och to tilldelas sina IAccountGrain referenser.
  • ITransactionClient Används för att skapa en transaktion och anropar:
    • Withdraw på kontokornsreferensen from .
    • Deposit på kontokornsreferensen to .

Transaktioner utförs alltid om det inte finns ett undantag som genereras i transactionDelegate eller en motstridig transactionOption angiven. Det rekommenderade sättet att anropa metoder för transaktionsintervall är att använda ITransactionClient, men du kan också anropa transaktionsintervallmetoder direkt från ett annat korn.

Anropa transaktionsmetoder från ett annat korn

Transaktionsmetoder i ett korngränssnitt anropas som alla andra kornmetod. Som en alternativ metod med hjälp av ITransactionClientanropar Transfer implementeringen AtmGrain nedan metoden (som är transaktionell) i IAccountGrain gränssnittet.

Överväg implementeringen AtmGrain , som löser de två refererade kontokornen och gör lämpliga anrop till Withdraw och Deposit:

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

Din klientappskod kan anropa AtmGrain.Transfer på ett transaktionellt sätt enligt följande:

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

I föregående anrop används en IAtmGrain för att överföra 100 valutaenheter från ett konto till ett annat. När överföringen är klar efterfrågas båda kontona för att få sitt aktuella saldo. Valutaöverföringen samt båda kontofrågorna utförs som ACID-transaktioner.

Som du ser i föregående exempel kan transaktioner returnera värden inom en Task, som andra korniga anrop. Men vid anropsfel utlöser de inte programfel utan snarare en OrleansTransactionException eller TimeoutException. Om programmet utlöser ett undantag under transaktionen och det undantaget gör att transaktionen misslyckas (i stället för att misslyckas på grund av andra systemfel) är programundansundansen det inre undantaget för OrleansTransactionException.

Om ett transaktionsfel genereras av typen OrleansTransactionAbortedExceptionmisslyckades transaktionen och kan göras om. Alla andra undantag som utlöses indikerar att transaktionen avslutades med ett okänt tillstånd. Eftersom transaktioner är distribuerade åtgärder kan en transaktion i ett okänt tillstånd ha lyckats, misslyckats eller fortfarande pågår. Därför rekommenderar vi att du tillåter en tidsgräns för samtal (SiloMessagingOptions.SystemResponseTimeout) för att undvika sammanhängande avbrott innan du verifierar tillståndet eller försöker utföra åtgärden igen.