Simulace vysokofrekvenčního obchodování pomocí Stream Analytics

Kombinace jazyka SQL s uživatelem definovanými funkcemi (UDF) a uživatelem definovanými agregacemi (UDA) JavaScriptu v Azure Stream Analytics umožňuje uživatelům provádět pokročilé analýzy. Mezi pokročilé analýzy může patřit online trénování a vyhodnocování machine learningu a také simulace stavových procesů. Tento článek popisuje, jak provádět lineární regresi v úloze Azure Stream Analytics, která provádí průběžné trénování a vyhodnocování ve scénáři vysokofrekvenčního obchodování.

Vysokofrekvenční obchodování

Logický tok vysokofrekvenčního obchodování spočívá v:

  1. Získávání nabídek z burzy cenných papírů v reálném čase.
  2. Sestavení prediktivního modelu z těchto nabídek, abychom mohli předvídat pohyb cen.
  3. Zadávání pokynů k nákupu nebo prodeji, abychom na úspěšné predikci pohybu cen vydělali.

Ve výsledku potřebujeme:

  • Informační kanál nabídek v reálném čase.
  • Prediktivní model schopný pracovat s nabídkami v reálném čase.
  • Simulaci obchodování ukazující zisk nebo ztrátu algoritmu obchodování.

Informační kanál nabídek v reálném čase

IEX nabízí bezplatně nabídky kupních a prodejních cen v reálném čase pomocí socket.io. Je možné napsat jednoduchý konzolový program, který přijímá nabídky v reálném čase a nabízí je jako zdroj dat do služby Azure Event Hubs. Následující kód představuje kostru programu. Kód pro zkrácení vynechává zpracování chyb. Do svého projektu také musíte zahrnout balíčky NuGet SocketIoClientDotNet a WindowsAzure.ServiceBus.

using Quobject.SocketIoClientDotNet.Client;
using Microsoft.ServiceBus.Messaging;
var symbols = "msft,fb,amzn,goog";
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
var socket = IO.Socket("https://ws-api.iextrading.com/1.0/tops");
socket.On(Socket.EVENT_MESSAGE, (message) =>
{
    eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes((string)message)));
});
socket.On(Socket.EVENT_CONNECT, () =>
{
    socket.Emit("subscribe", symbols);
});

Tady je několik vygenerovaných ukázkových událostí:

{"symbol":"MSFT","marketPercent":0.03246,"bidSize":100,"bidPrice":74.8,"askSize":300,"askPrice":74.83,volume":70572,"lastSalePrice":74.825,"lastSaleSize":100,"lastSaleTime":1506953355123,lastUpdated":1506953357170,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04825,"bidSize":114,"bidPrice":870,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953357633,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"MSFT","marketPercent":0.03244,"bidSize":100,"bidPrice":74.8,"askSize":100,"askPrice":74.83,volume":70572,"lastSalePrice":74.825,"lastSaleSize":100,"lastSaleTime":1506953355123,lastUpdated":1506953359118,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"FB","marketPercent":0.01211,"bidSize":100,"bidPrice":169.9,"askSize":100,"askPrice":170.67,volume":39042,"lastSalePrice":170.67,"lastSaleSize":100,"lastSaleTime":1506953351912,lastUpdated":1506953359641,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04795,"bidSize":100,"bidPrice":959.19,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953360949,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"FB","marketPercent":0.0121,"bidSize":100,"bidPrice":169.9,"askSize":100,"askPrice":170.7,volume":39042,"lastSalePrice":170.67,"lastSaleSize":100,"lastSaleTime":1506953351912,lastUpdated":1506953362205,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04795,"bidSize":114,"bidPrice":870,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953362629,"sector":"softwareservices","securityType":"commonstock"}

Poznámka

Časové razítko události je hodnota lastUpdated v unixovém čase.

Prediktivní model pro vysokofrekvenční obchodování

Pro účely ukázky používáme lineární model, který ve své studii popsal Darryl Shen.

VOI (Volume Order Imbalance) je funkce aktuální kupní/prodejní ceny a objemu a kupní/prodejní ceny a objemu z posledního impulzu. Studie identifikuje korelaci mezi funkcí VOI a budoucím pohybem ceny. Sestavuje lineární model mezi posledními 5 hodnotami VOI a změnou ceny v dalších 10 impulzech. Model se trénuje pomocí dat z předchozího dne a lineární regrese.

Natrénovaný model se pak použije k předvídání změny cen u nabídek v aktuálním obchodním dni v reálném čase. Pokud je předpokládaná změna ceny dostatečně velká, uzavře se obchod. V závislosti na nastavení prahové hodnoty se dají během trénovacího dne očekávat tisíce obchodů jedné akcie.

Definice nevyrovnanosti objednávky svazku

Teď vyjádříme operace trénování a predikce v úloze Azure Stream Analytics.

Nejprve se vyčistí vstupy. Unixový čas se prostřednictvím funkce DATEADD převede na datový typ datetime. Funkce TRY_CAST slouží k vynucení datových typů, aniž by došlo k selhání dotazu. Vstupní pole je vždy vhodné přetypovat na očekávané datové typy, aby nedocházelo k neočekávanému chování při manipulaci s poli nebo jejich porovnávání.

WITH
typeconvertedquotes AS (
    /* convert all input fields to proper types */
    SELECT
        System.Timestamp AS lastUpdated,
        symbol,
        DATEADD(millisecond, CAST(lastSaleTime as bigint), '1970-01-01T00:00:00Z') AS lastSaleTime,
        TRY_CAST(bidSize as bigint) AS bidSize,
        TRY_CAST(bidPrice as float) AS bidPrice,
        TRY_CAST(askSize as bigint) AS askSize,
        TRY_CAST(askPrice as float) AS askPrice,
        TRY_CAST(volume as bigint) AS volume,
        TRY_CAST(lastSaleSize as bigint) AS lastSaleSize,
        TRY_CAST(lastSalePrice as float) AS lastSalePrice
    FROM quotes TIMESTAMP BY DATEADD(millisecond, CAST(lastUpdated as bigint), '1970-01-01T00:00:00Z')
),
timefilteredquotes AS (
    /* filter between 7am and 1pm PST, 14:00 to 20:00 UTC */
    /* clean up invalid data points */
    SELECT * FROM typeconvertedquotes
    WHERE DATEPART(hour, lastUpdated) >= 14 AND DATEPART(hour, lastUpdated) < 20 AND bidSize > 0 AND askSize > 0 AND bidPrice > 0 AND askPrice > 0
),

Dále použijeme funkci LAG k získání hodnot z posledního impulzu. Jedna hodina pro hodnotu LIMIT DURATION je zvolena náhodně. S ohledem na frekvenci nabídek se dá předpokládat, že předchozí impulz najdete v poslední hodině.

shiftedquotes AS (
    /* get previous bid/ask price and size in order to calculate VOI */
    SELECT
        symbol,
        (bidPrice + askPrice)/2 AS midPrice,
        bidPrice,
        bidSize,
        askPrice,
        askSize,
        LAG(bidPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidPricePrev,
        LAG(bidSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidSizePrev,
        LAG(askPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askPricePrev,
        LAG(askSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askSizePrev
    FROM timefilteredquotes
),

Následně můžeme vypočítat hodnotu VOI. Pro jistotu vyfiltrujeme hodnoty null, pokud předchozí impulz neexistuje.

currentPriceAndVOI AS (
    /* calculate VOI */
    SELECT
        symbol,
        midPrice,
        (CASE WHEN (bidPrice < bidPricePrev) THEN 0
            ELSE (CASE WHEN (bidPrice = bidPricePrev) THEN (bidSize - bidSizePrev) ELSE bidSize END)
         END) -
        (CASE WHEN (askPrice < askPricePrev) THEN askSize
            ELSE (CASE WHEN (askPrice = askPricePrev) THEN (askSize - askSizePrev) ELSE 0 END)
         END) AS VOI
    FROM shiftedquotes
    WHERE
        bidPrice IS NOT NULL AND
        bidSize IS NOT NULL AND
        askPrice IS NOT NULL AND
        askSize IS NOT NULL AND
        bidPricePrev IS NOT NULL AND
        bidSizePrev IS NOT NULL AND
        askPricePrev IS NOT NULL AND
        askSizePrev IS NOT NULL
),

Teď znovu použijeme funkci LAG k vytvoření sekvence se 2 po sobě jdoucími hodnotami VOI následovanými 10 po sobě jdoucími hodnotami střední ceny.

shiftedPriceAndShiftedVOI AS (
    /* get 10 future prices and 2 previous VOIs */
    SELECT
        symbol,
        midPrice AS midPrice10,
        LAG(midPrice, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice9,
        LAG(midPrice, 2) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice8,
        LAG(midPrice, 3) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice7,
        LAG(midPrice, 4) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice6,
        LAG(midPrice, 5) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice5,
        LAG(midPrice, 6) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice4,
        LAG(midPrice, 7) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice3,
        LAG(midPrice, 8) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice2,
        LAG(midPrice, 9) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice1,
        LAG(midPrice, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice,
        LAG(VOI, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI1,
        LAG(VOI, 11) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
    FROM currentPriceAndVOI
),

Následně data strukturujeme do vstupů pro lineární model se dvěma proměnnými. Znovu vyfiltrujeme události, pro které nemáme všechna data.

modelInput AS (
    /* create feature vector, x being VOI, y being delta price */
    SELECT
        symbol,
        (midPrice1 + midPrice2 + midPrice3 + midPrice4 + midPrice5 + midPrice6 + midPrice7 + midPrice8 + midPrice9 + midPrice10)/10.0 - midPrice AS y,
        VOI1 AS x1,
        VOI2 AS x2
    FROM shiftedPriceAndShiftedVOI
    WHERE
        midPrice1 IS NOT NULL AND
        midPrice2 IS NOT NULL AND
        midPrice3 IS NOT NULL AND
        midPrice4 IS NOT NULL AND
        midPrice5 IS NOT NULL AND
        midPrice6 IS NOT NULL AND
        midPrice7 IS NOT NULL AND
        midPrice8 IS NOT NULL AND
        midPrice9 IS NOT NULL AND
        midPrice10 IS NOT NULL AND
        midPrice IS NOT NULL AND
        VOI1 IS NOT NULL AND
        VOI2 IS NOT NULL
),

Vzhledem k tomu, že Azure Stream Analytics neobsahuje integrovanou funkci lineární regrese, koeficienty pro lineární model vypočítáme pomocí agregací SUM a AVG.

Matematický vzorec lineární regrese

modelagg AS (
    /* get aggregates for linear regression calculation,
     http://faculty.cas.usf.edu/mbrannick/regression/Reg2IV.html */
    SELECT
        symbol,
        SUM(x1 * x1) AS x1x1,
        SUM(x2 * x2) AS x2x2,
        SUM(x1 * y) AS x1y,
        SUM(x2 * y) AS x2y,
        SUM(x1 * x2) AS x1x2,
        AVG(y) AS avgy,
        AVG(x1) AS avgx1,
        AVG(x2) AS avgx2
    FROM modelInput
    GROUP BY symbol, TumblingWindow(hour, 24, -4)
),
modelparambs AS (
    /* calculate b1 and b2 for the linear model */
    SELECT
        symbol,
        (x2x2 * x1y - x1x2 * x2y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b1,
        (x1x1 * x2y - x1x2 * x1y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b2,
        avgy,
        avgx1,
        avgx2
    FROM modelagg
),
model AS (
    /* calculate a for the linear model */
    SELECT
        symbol,
        avgy - b1 * avgx1 - b2 * avgx2 AS a,
        b1,
        b2
    FROM modelparambs
),

Abychom pro vyhodnocení aktuálních událostí mohli použít model z předchozího dne, chceme nabídky spojit s modelem. Ale místo použití operátoru JOIN sjednotíme události modelu a události nabídek pomocí operátoru UNION. Potom pomocí funkce LAG spárujeme události s modelem z předchozího dne, abychom získali přesně jednu shodu. Kvůli víkendu se musíme podívat o tři dny zpět. Kdybychom použili přímé spojení operátorem JOIN, dostali bychom pro každou událost nabídky tři modely.

shiftedVOI AS (
    /* get two consecutive VOIs */
    SELECT
        symbol,
        midPrice,
        VOI AS VOI1,        
        LAG(VOI, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
    FROM currentPriceAndVOI
),
VOIAndModel AS (
    /* combine VOIs and models */
    SELECT
        'voi' AS type,
        symbol,
        midPrice,
        VOI1,
        VOI2,
        0.0 AS a,
        0.0 AS b1,
        0.0 AS b2
    FROM shiftedVOI
    UNION
    SELECT
        'model' AS type,
        symbol,
        0.0 AS midPrice,
        0 AS VOI1,
        0 AS VOI2,
        a,
        b1,
        b2
    FROM model
),
VOIANDModelJoined AS (
    /* match VOIs with the latest model within 3 days (72 hours, to take the weekend into account) */
    SELECT
        symbol,
        midPrice,
        VOI1 as x1,
        VOI2 as x2,
        LAG(a, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS a,
        LAG(b1, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b1,
        LAG(b2, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b2
    FROM VOIAndModel
    WHERE type = 'voi'
),

Teď můžeme provést předpovědi a na základě modelu vygenerovat signály pro nákup nebo prodej s prahovou hodnotou 0,02. Hodnota obchodu 10 znamená nákup. Hodnota obchodu −10 znamená prodej.

prediction AS (
    /* make prediction if there is a model */
    SELECT
        symbol,
        midPrice,
        a + b1 * x1 + b2 * x2 AS efpc
    FROM VOIANDModelJoined
    WHERE
        a IS NOT NULL AND
        b1 IS NOT NULL AND
        b2 IS NOT NULL AND
        x1 IS NOT NULL AND
        x2 IS NOT NULL
),
tradeSignal AS (
    /* generate buy/sell signals */
    SELECT
        DateAdd(hour, -7, System.Timestamp) AS time,
        symbol,     
        midPrice,
        efpc,
        CASE WHEN (efpc > 0.02) THEN 10 ELSE (CASE WHEN (efpc < -0.02) THEN -10 ELSE 0 END) END AS trade,
        DATETIMEFROMPARTS(DATEPART(year, System.Timestamp), DATEPART(month, System.Timestamp), DATEPART(day, System.Timestamp), 0, 0, 0, 0) as date
    FROM prediction
),

Simulace obchodování

Jakmile budeme mít obchodní signály, chceme otestovat efektivitu této strategie obchodování, aniž bychom skutečně obchodovali.

Tento test provedeme pomocí UDA se skákajícím oknem, kdy se skok provádí každou minutu. Další seskupení podle data a klauzule Having umožňují zahrnout do okna pouze účty pro události, které patří do stejného dne. V případě skákajícího okna trvajícího dva dny se pomocí seskupení podle data operátorem GROUP BY toto seskupení rozdělí na předchozí den a aktuální den. Klauzule HAVING vyfiltruje okna, která končí aktuálním dnem, ale byla seskupená předchozí den.

simulation AS
(
    /* perform trade simulation for the past 7 hours to cover an entire trading day, and generate output every minute */
    SELECT
        DateAdd(hour, -7, System.Timestamp) AS time,
        symbol,
        date,
        uda.TradeSimulation(tradeSignal) AS s
    FROM tradeSignal
    GROUP BY HoppingWindow(minute, 420, 1), symbol, date
    Having DateDiff(day, date, time) < 1 AND DATEPART(hour, time) < 13
)

UDA JavaScriptu ve funkci init inicializuje všechny průběžné součty, při každém přidání události do okna vypočítá přechod stavu a na konci okna vrátí výsledky simulace. Obecný průběh obchodování je:

  • Nákup akcií, když se obdrží signál k nákupu a žádné akcie nedržíme.
  • Prodej akcií, když se obdrží signál k prodeji a nějaké akcie držíme.
  • Otevření krátké pozice, pokud žádné akcie nedržíme.

Pokud je otevřená krátká pozice a obdrží se signál k nákupu, provedeme nákup a uzavřeme pozici. V této simulaci podržíme nebo krátká 10 akcií akcií. Transakční náklady jsou pevně nastavené na 8 USD.

function main() {
    var TRADE_COST = 8.0;
    var SHARES = 10;
    this.init = function () {
        this.own = false;
        this.pos = 0;
        this.pnl = 0.0;
        this.tradeCosts = 0.0;
        this.buyPrice = 0.0;
        this.sellPrice = 0.0;
        this.buySize = 0;
        this.sellSize = 0;
        this.buyTotal = 0.0;
        this.sellTotal = 0.0;
    }
    this.accumulate = function (tradeSignal, timestamp) {
        if(!this.own && tradeSignal.trade == 10) {
          // Buy to open
          this.own = true;
          this.pos = 1;
          this.buyPrice = tradeSignal.midprice;
          this.tradeCosts += TRADE_COST;
          this.buySize += SHARES;
          this.buyTotal += SHARES * tradeSignal.midprice;
        } else if(!this.own && tradeSignal.trade == -10) {
          // Sell to open
          this.own = true;
          this.pos = -1
          this.sellPrice = tradeSignal.midprice;
          this.tradeCosts += TRADE_COST;
          this.sellSize += SHARES;
          this.sellTotal += SHARES * tradeSignal.midprice;
        } else if(this.own && this.pos == 1 && tradeSignal.trade == -10) {
          // Sell to close
          this.own = false;
          this.pos = 0;
          this.sellPrice = tradeSignal.midprice;
          this.tradeCosts += TRADE_COST;
          this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
          this.sellSize += SHARES;
          this.sellTotal += SHARES * tradeSignal.midprice;
          // Sell to open
          this.own = true;
          this.pos = -1;
          this.sellPrice = tradeSignal.midprice;
          this.tradeCosts += TRADE_COST;
          this.sellSize += SHARES;        
          this.sellTotal += SHARES * tradeSignal.midprice;
        } else if(this.own && this.pos == -1 && tradeSignal.trade == 10) {
          // Buy to close
          this.own = false;
          this.pos = 0;
          this.buyPrice = tradeSignal.midprice;
          this.tradeCosts += TRADE_COST;
          this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
          this.buySize += SHARES;
          this.buyTotal += SHARES * tradeSignal.midprice;
          // Buy to open
          this.own = true;
          this.pos = 1;
          this.buyPrice = tradeSignal.midprice;
          this.tradeCosts += TRADE_COST;
          this.buySize += SHARES;         
          this.buyTotal += SHARES * tradeSignal.midprice;
        }
    }
    this.computeResult = function () {
        var result = {
            "pnl": this.pnl,
            "buySize": this.buySize,
            "sellSize": this.sellSize,
            "buyTotal": this.buyTotal,
            "sellTotal": this.sellTotal,
            "tradeCost": this.tradeCost
            };
        return result;
    }
}

Nakonec odešleme výstup na řídicí panel Power BI, který zobrazí vizualizaci.

SELECT * INTO tradeSignalDashboard FROM tradeSignal /* output tradeSignal to PBI */
SELECT
    symbol,
    time,
    date,
    TRY_CAST(s.pnl as float) AS pnl,
    TRY_CAST(s.buySize as bigint) AS buySize,
    TRY_CAST(s.sellSize as bigint) AS sellSize,
    TRY_CAST(s.buyTotal as float) AS buyTotal,
    TRY_CAST(s.sellTotal as float) AS sellTotal
    INTO pnlDashboard
FROM simulation /* output trade simulation to PBI */

Vizuál obchodu Power BI grafu

Vizuál grafu Power BI PNL

Souhrn

Realistický model vysokofrekvenčního obchodování můžeme implementovat pomocí mírně složitého dotazu v Azure Stream Analytics. Vzhledem k chybějící integrované funkci lineární regrese musíme model zjednodušit a místo pěti vstupních proměnných použít dvě. Odhodlaný uživatel však možná dokáže jako UDA JavaScriptu implementovat i sofistikovanější algoritmy vyšších dimenzí.

Za zmínku stojí, že většinu dotazu, kromě UDA JavaScriptu, je možné testovat a ladit v sadě Visual Studio prostřednictvím nástrojů Azure Stream Analytics pro Visual Studio. Od napsání počátečního dotazu strávil autor testováním a laděním dotazu v sadě Visual Studio méně než 30 minut.

UDA v současné době není možné ladit v sadě Visual Studio. Pracujeme na povolení této funkce s možností procházet kód JavaScriptu. Navíc si všimněte, že názvy polí přicházejících do UDA jsou malými písmeny. Během testování dotazu to nebylo zřejmé chování. Díky úrovni kompatibility Azure Stream Analytics 1.1 však zachováváme velikosti písmen v názvech polí, takže je chování přirozenější.

Doufám, že tento článek poslouží jako inspirace pro všechny uživatele Azure Stream Analytics, kteří můžou naši službu využít k průběžnému provádění pokročilých analýz v reálném čase. Podělte se s námi o svůj názor, usnadníte nám tím implementaci dotazů pro scénáře pokročilých analýz.