Il presente articolo è stato tradotto automaticamente.

C++

Nuove funzionalità di concorrenza in Visual C++ 11

Diego Dagum

Scaricare il codice di esempio

L'iterazione più recente di C++, conosciuto come C + + 11 e approvato dall'International Organization for Standardization (ISO), l'anno scorso, formalizza una nuova serie di librerie e poche parole riservate per affrontare la concorrenza.Molti sviluppatori hanno usato concorrenza in C++ prima, ma sempre attraverso una libreria di terze parti — spesso direttamente esporre API OS.

Herb Sutter ha annunciato nel dicembre 2004 che il "pranzo prestazioni libero" era finita nel senso che i produttori di CPU vennero impediti di spedizione più veloce CPU da consumo di energia fisica e aumentando i motivi di calore.Questo ha portato all'attuale, mainstream era multi-core, una nuova realtà a cui C++ — quella standard — ha appena fatto un importante salto di adattarsi.

Il resto di questo articolo è organizzato in due sezioni principali e più piccoli sottosezioni.La prima sezione principale, iniziando con l'esecuzione parallela, copre le tecnologie che consentono alle applicazioni di eseguire inde­pendenti o semi-indipendente attività in parallelo.La seconda sezione principale, a partire da Syncing fino l'esecuzione contemporanea, Esplora i meccanismi per la sincronizzazione il modo in cui che queste attività di gestire dati, evitando così race condition.

Questo articolo è basato sulle funzionalità incluse nella prossima versione di Visual C++ (per ora, chiamato Visual C++ 11).Alcuni di loro sono già disponibili nella versione attuale, Visual C++ 2010.Anche se non una guida per modellare algoritmi paralleli, né un'esauriente documentazione su tutte le opzioni disponibili, questo articolo è una solida introduzione al nuovo C + + 11 caratteristiche della concorrenza.

Esecuzione parallela

Quando il modello di progettazione algoritmi e processi sui dati, c'è una tendenza naturale per specificarli in una sequenza di passaggi.Fintanto che le prestazioni sono entro limiti accettabili, questo è lo schema più consigliabile perché è in genere più facile da capire — un requisito per le basi di codice gestibile.

Quando le prestazioni diventano un fattore preoccupante, un classico iniziale tentativo di superare la situazione è per ottimizzare l'algoritmo sequenza al fine di ridurre i cicli di CPU consumati.Questo può essere fatto, fino ad arrivare a un punto in cui non sono disponibili ulteriori ottimizzazioni — o sono difficili da raggiungere.Allora è giunto il momento di dividere le serie sequenziale di passaggi nelle attività dell'occorrenza simultanea.

Nella prima sezione apprenderete le seguenti:

  • Attività asincrone: quelle più piccole parti dell'algoritmo originale collegato solo dai dati, essi producono o consumare.
  • Thread: unità di esecuzione amministrato dall'ambiente di runtime.Essi riguardano attività nel senso che le attività sono eseguite sui thread.
  • Filo internals: legato al thread variabili, eccezioni propagato da thread e così via.

Attività asincrone

Nel codice complementare a questo articolo, troverete un progetto chiamato caso sequenziale, come mostrato nella Figura 1.

Figura 1 codice Case sequenziale

int a, b, c;
int calculateA()
{
  return a+a*b;
}
int calculateB()
{
  return a*(a+a*(a+1));
}
int calculateC()
{
  return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
  getUserData(); // initializes a and b
  c = calculateA() * (calculateB() + calculateC());
  showResult();
}

La funzione principale chiede all'utente alcuni dati e poi sostiene che i dati a tre funzioni: calculateA, calculateB e calculateC. I risultati sono più tardi combinati per produrre output di alcune informazioni per l'utente.

Le funzioni di calcolare nel materiale di accompagnamento sono codificate in modo tale che un ritardo casuale tra uno e tre secondi è stato introdotto in ciascuno. Considerando che queste operazioni vengono eseguite in sequenza, questo porta ad un tempo complessivo di esecuzione — una volta i dati di input sono entrati — di nove secondi in cui lo scenario peggiore. Si può provare questo codice premendo F5 e esecuzione dell'esempio.

Così ho bisogno di rivedere la sequenza di esecuzione e di trovare passaggi da eseguire contemporaneamente. Queste funzioni sono indipendenti, io posso eseguirli in parallelo utilizzando la funzione async:

int main(int argc, char * argv[])
{
  getUserData();
  futuro < int > F1 = async(calculateB), f2 = async(calculateC);
  c = (calculateA() + f1.get()) * f2.get();
  showResult();
}

Ive ' introdurre due concetti qui: Async e futuro, entrambi definiti in <future> intestazione e il namespace std. Il primo si riceve una funzione, un lambda o un oggetto di funzione (funtore) e restituisce un futuro. Si può capire il concetto di un futuro come segnaposto per un eventuale risultato. Quale risultato? Quello restituito dalla funzione chiamata in modo asincrono.

Ad un certo punto avrete bisogno i risultati di queste funzioni di esecuzione parallela. Chiamando il metodo get su ogni futura blocchi l'esecuzione fino a quando il valore è disponibile.

È possibile verificare e confrontare il codice modificato con il caso sequenza eseguendo il progetto AsyncTasks del campione del compagno. Il ritardo peggiore di questa modifica è circa tre secondi contro nove secondi per la versione sequenza.

Questo è un modello di programmazione leggero che rilascia lo sviluppatore dal dazio della creazione di thread. Tuttavia, è possibile specificare criteri di threading, ma quelli non coprirà qui.

Threads

Il modello di attività asincrona presentato nella sezione precedente potrebbe essere sufficiente in alcuni scenari di data, ma se avete bisogno di una più profonda gestione e controllo dell'esecuzione dei thread, C + + 11 viene fornito con la classe thread, dichiarata nel <thread> intestazione e si trova nello spazio dei nomi std.

Pur essendo un modello di programmazione più complesso, discussioni offrono migliori metodi per la sincronizzazione e coordinamento, permettendo loro di cedere l'esecuzione di un altro thread e aspettare per una determinata quantità di tempo o fino a quando un altro thread è finito prima di continuare.

Nell'esempio seguente (disponibile nel progetto di codice complementare thread), hanno una funzione lambda, che, dato un intero argomento, stampe suoi multipli di meno di 100.000 nella console:

auto multiple_finder = {
  [](int n)
  per(int i = 0; Ho < 100000; i + +)
  Se(ho % n = = 0)
  cout << io << " è un multiplo di" << n << Endl;
};
int main(int argc, char * argv[])
{
  filettatura th(multiple_finder, 23456);
  multiple_finder(34567);
  Th.join();
}

Come si vedrà in esempi successivi, il fatto che ho superato un lambda al thread è circostanziale; una funzione o un funtore sarebbe stato sufficiente pure.

Nella funzione main eseguire questa funzione in due thread con parametri diversi. Date un'occhiata al mio risultato (che potrebbe variare tra le diverse esecuzioni a causa di timing):

0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456

Io potrei implementare l'esempio su attività asincrone nella sezione precedente con fili. Per questo, è necessario introdurre il concetto di una promessa. Una promessa può essere inteso come un sink attraverso il quale un risultato sarà abbandonato quando disponibile. Dove quel risultato verrà fuori una volta caduto? Ogni promessa ha un futuro associato.

Il codice mostrato nella Figura 2, disponibile nel progetto di codice di esempio, soci tre fili (invece di attività) con promesse e rende ogni thread di chiamare una funzione calcola promesse. Confrontare questi dati con la versione più leggera di AsyncTasks.

Figura 2 associando Futures con promesse

typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
  p.set_value(f());
}
int main(int argc, char *argv[])
{
  getUserData();
  promise<int> p1, p2;
  future<int> f1 = p1.get_future(), f2 = p2.get_future();
  thread t1(&func2promise, calculateB, std::ref(p1)),
    t2(&func2promise, calculateC, std::ref(p2));
  c = (calculateA() + f1.get()) * f2.get();
  t1.join(); t2.join();
  showResult();
}

Variabili legato al thread e le eccezioni

In C++, è possibile definire le variabili globali in cui ambito è associato all'intera applicazione, comprese le discussioni. Ma rispetto al thread, ora c'è un modo per definire queste variabili globali, tale che ogni thread mantiene la propria copia. Questo concetto è conosciuto come memoria locale di thread e si è dichiarato come segue:

thread_local int subtotal = 0;

Se la dichiarazione è fatto nell'ambito di una funzione, la visibilità della variabile sarà essere ridotto a che la funzione ma ogni thread sarà mantenere mantenendo la propria copia statica. Vale a dire, i valori della variabile per ogni thread sono tenuti tra le chiamate di funzione.

Anche se thread_local non è disponibile in Visual C++ 11, può essere simulato con un'estensione di Microsoft non standard:

#define  thread_local __declspec(thread)

Che cosa accadrebbe se un'eccezione all'interno di un thread? Ci saranno casi in cui l'eccezione può essere catturato e gestito nello stack di chiamate all'interno del thread. Ma se il thread non tratta con l'eccezione, è necessario un modo per trasportare l'eccezione al thread dell'initiator. C + + 11 introduce tali meccanismi.

In Figura 3, disponibile nel codice complementare al progetto ThreadInternals, c'è una funzione sum_until_element_with_threshold, che attraversa un vettore, finché trova un elemento specifico, sommando tutti gli elementi lungo la strada. Se la somma supera una soglia, viene generata un'eccezione.

Figura 3 memoria locale di Thread e le eccezioni di Thread

thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
  unsigned threshold, exception_ptr& pExc)
{
  try{
    find_if_not(begin(v), end(v), [=](const unsigned i) -> bool {
      bool ret = (i!=element);
      sum_total+= i;
      if (sum_total>threshold)
        throw runtime_error("Sum exceeded threshold.");
      return ret;
    });
    cout << "(Thread #" << this_thread::get_id() << ") " <<
      "Sum of elements until " << element << " is found: " << sum_total << endl;
  } catch (...) {
    pExc = current_exception();
  }
}

Se ciò accade, l'eccezione viene acquisito tramite current_exception in un exception_ptr.

La funzione principale si innesca un thread su sum_until_element_with_threshold, durante la chiamata a tale funzione stessa con un parametro diverso. Quando avete finito di entrambe le chiamate (quello nel thread principale) e quella nel thread innescato da esso, sarà analizzata loro rispettivi exception_ptrs:

const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  exception_ptr pExc1, pExc2;
  scramble_vector(1000);
  thread th(sum_until_element_with_threshold, 0, THRESHOLD, ref(pExc1));
  sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
  th.join();
  dealWithExceptionIfAny(pExc1);
  dealWithExceptionIfAny(pExc2);
}

Se uno qualsiasi di questi exception_ptrs vengono inizializzato — un segno che qualche eccezione di successo — loro eccezioni sono scatenate torna con rethrow_exception:

void dealWithExceptionIfAny(exception_ptr pExc)
{
  try
  {
    if (!(pExc==exception_ptr()))
      rethrow_exception(pExc);
    } catch (const exception& exc) {
      cout << "(Main thread) Exception received from thread: " <<
        exc.what() << endl;
  }
}

Questo è il risultato della nostra esecuzione, come la somma nel secondo thread ha superato la soglia di:

(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread: Sum exceeded threshold.

L'esecuzione contemporanea di sincronizzazione

Sarebbe auspicabile se tutte le applicazioni potrebbero essere suddiviso in un insieme indipendente di 100 per cento di attività asincrone. In pratica questo quasi mai è possibile, come ci sono almeno le dipendenze sui dati che tutte le parti di gestire contemporaneamente. Questa sezione introduce il nuovo C + + 11 tecnologie per evitare race condition.

Apprenderete:

  • Tipi atomici: simile a tipi di dati primitivi, ma abilitante modifica thread-safe.
  • Mutex e blocchi: elementi che ci permettono di definire le aree critiche di thread-safe.
  • Variabili di condizione: un modo per congelare il thread di esecuzione fino ad alcuni criteri è soddisfatta.

Tipi atomici

<atomic> intestazione introduce una serie di tipi primitivi — atomic_char, atomic_int e così via — implementato in termini di operazioni ad incastro. Così, questi tipi sono equivalenti a loro omonimi senza il prefisso atomic_ ma con la differenza che tutti i loro operatori di assegnazione (= =, + +, --, + =, * = e così via) sono protetti da race condition. Così che non accadrà nel bel mezzo di un'assegnazione per questi tipi di dati, un altro thread irrompe e modifica i valori prima che abbiamo finito.

Nell'esempio seguente ci sono due thread paralleli (uno è il principale) alla ricerca di elementi diversi all'interno lo stesso vettore:

atomic_uint total_iterations;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  total_iterations = 0;
  scramble_vector(1000);
  thread th(find_element, 0);
  find_element(100);
  th.join();
  cout << total_iterations << " total iterations." << endl;
 }

Quando ogni elemento viene trovato, viene stampato un messaggio dall'interno del thread, raccontando la posizione in formato vettoriale (o iterazione) in cui l'elemento è stato trovato:

void find_element(unsigned element)
{
  unsigned iterations = 0;
  find_if(begin(v), end(v), [=, &iterations](const unsigned i) -> bool {
    ++iterations;
    return (i==element);
  });
  total_iterations+= iterations;
  cout << "Thread #" << this_thread::get_id() << ": found after " <<
    iterations << " iterations." << endl;
}

C'è anche una variabile comune, total_iterations, che viene aggiornata con il numero di iterazioni applicate da entrambi i thread di aggravata. Così, total_iterations deve essere atomica per impedire che entrambi i thread aggiornandolo allo stesso tempo. Nell'esempio precedente, anche se non è necessario stampare il parziale numero di iterazioni nel find_element, si sarebbero ancora accumulano iterazioni in tale variabile invece di total_iterations, per evitare la contesa sopra la variabile atomica.

Troverete l'esempio precedente nel progetto Atomics nel download del codice compagno. Mi sono imbattuto, ottenendo i seguenti:

Thread #8064: found after 168 iterations.
Thread #6948: found after 395 iterations.
563 total iterations.

Serrature e Mut(ual) Ex(clusion)

Sezione precedente raffigurato un caso particolare di mutua esclusione per la scrittura di accesso su tipi primitivi. <mutex> intestazione definisce una serie di classi con serratura per definire le aree critiche. In questo modo, è possibile definire un mutex per stabilire un'area critica in tutta una serie di funzioni o metodi, nel senso che un solo thread alla volta sarà in grado di accedere a qualsiasi membro in questa serie bloccando con successo i suoi mutex.

Un thread tenta di bloccare un mutex può rimanere bloccato fino a quando il mutex è disponibile o non riescono proprio nel tentativo. Al centro di questi due estremi, la classe timed_mutex alternativo può rimanere bloccata per un piccolo intervallo di tempo prima di fallire. Che consente di bloccare i tentativi di desistere aiuta a prevenzione il deadlock.

Un mutex bloccato deve essere sbloccato in modo esplicito per gli altri di bloccarlo. Non riuscendo a farlo potrebbe portare ad un comportamento dell'applicazione indeterminato — che potrebbe essere soggetto a errori, simili a dimenticare rilasciare la memoria dinamica. Dimenticando di rilasciare un blocco è in realtà molto peggio, perché potrebbe significare che l'applicazione non può funzionare correttamente più se altro codice mantiene in attesa di quel blocco. Fortunatamente, C + + 11 è disponibile anche con classi di bloccaggio. Un blocco agisce su un mutex, ma relativo distruttore fa in modo di rilasciarlo se bloccato.

Il seguente codice (disponibile nel progetto Mutex nel download del codice) definisce un'area critica intorno a un mutex mx:

mutex mx;
void funcA();
void funcB();
int main()
{
  thread th(funcA)
  funcB();
  th.join();
}

Questo mutex viene utilizzata per garantire che due funzioni, funcA e funcB, può eseguire in parallelo senza che si uniscono nella regione critica.

La funzione funcA dovranno attendere, se necessario, al fine di venire a regione critica. Per rendere più farlo, basta semplicemente il meccanismo di blocco più semplice — lock_guard:

void funcA()
{
  for (int i = 0; i<3; ++i)
  {
    this_thread::sleep_for(chrono::seconds(1));
    cout << this_thread::get_id() << ": locking with wait... "
<< endl;
    lock_guard<mutex> lg(mx);
    ...
// Do something in the critical region.
cout << this_thread::get_id() << ": releasing lock." << endl;
  }
}

La strada che è definito, funcA dovrebbe accedere all'area critica tre volte. La funzione funcB, invece, tenterà di bloccare, ma se il mutex è da allora già bloccato, funcB basta aspettare un secondo prima di tentare nuovamente ottenere l'accesso alla regione critica. Il meccanismo utilizza è unique_lock con la try_to_lock_t della politica, come mostrato nella Figura 4.

Figura 4 Lock con attesa

void funcB()
{
  int successful_attempts = 0;
  for (int i = 0; i<5; ++i)
  {
    unique_lock<mutex> ul(mx, try_to_lock_t());
    if (ul)
    {
      ++successful_attempts;
      cout << this_thread::get_id() << ": lock attempt successful." <<
        endl;
      ...
// Do something in the critical region
      cout << this_thread::get_id() << ": releasing lock." << endl;
    } else {
      cout << this_thread::get_id() <<
        ": lock attempt unsuccessful.
Hibernating..." << endl;
      this_thread::sleep_for(chrono::seconds(1));
    }
  }
  cout << this_thread::get_id() << ": " << successful_attempts
    << " successful attempts." << endl;
}

La strada che è definito, funcB cercherà fino a cinque volte di immettere la regione critica. Figura 5 mostra il risultato dell'esecuzione. Fuori dei cinque tentativi, funcB potrebbe venire solo nella regione critica due volte.

Figura 5 l'esecuzione del campione progetto Mutex

funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcA: releasing lock.
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcA: releasing lock.
funcB: 2 successful attempts.
funcA: locking with wait ...
funcA: lock secured ...
funcA: releasing lock.

Variabili di condizione

L'intestazione <condition_variable> viene fornito con l'ultimo impianto trattato in questo articolo, fondamentale per quei casi in cui il coordinamento tra thread è legata agli eventi.

Nell'esempio seguente, disponibile nel progetto CondVar nel download del codice, una funzione di produttore spinge gli elementi in una coda:

mutex mq;
condition_variable cv;
queue<int> q;
void producer()
{
  for (int i = 0;i<3;++i)
  {
    ...
// Produce element
    cout << "Producer: element " << i << " queued." << endl;
    mq.lock();      q.push(i);  mq.unlock();
    cv.
notify_all();
  }
}

La coda standard non è thread-safe, quindi è necessario assicurarsi che nessun altro è l'uso (cioè, il consumatore non è schioccando in qualsiasi elemento) quando Accodamento messaggi.

La funzione dei consumatori tenta di recuperare gli elementi dalla coda quando è disponibile, o se esso appena attende per un po' sulla variabile di condizione prima di tentare nuovamente; Dopo due consecutivi tentativi falliti, il consumatore si conclude (vedere Figura 6).

Figura 6 svegliarsi discussioni le variabili condizionale

void consumer()
{
  unique_lock<mutex> l(m);
  int failed_attempts = 0;
  while (true)
  {
    mq.lock();
    if (q.size())
    {
      int elem = q.front();
      q.pop();
      mq.unlock();
      failed_attempts = 0;
      cout << "Consumer: fetching " << elem << " from queue." << endl;
      ...
// Consume elem
    } else {
      mq.unlock();
      if (++failed_attempts>1)
      {
        cout << "Consumer: too many failed attempts -> Exiting." << endl;
        break;
      } else {
        cout << "Consumer: queue not ready -> going to sleep." << endl;
        cv.wait_for(l, chrono::seconds(5));
      }
    }
  }
}

Il consumatore deve essere risvegliato tramite notify_all dal produttore ogni volta un nuovo elemento è disponibile. In questo modo, il produttore evita di avere il sonno dei consumatori per tutto l'intervallo se gli elementi sono pronti.

Figura 7 mostra il risultato del mio run.

Figura 7 sincronizzazione con variabili di condizione

Consumer: queue not ready -> going to sleep.
Producer: element 0 queued.
Consumer: fetching 0 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 1 queued.
Consumer: fetching 1 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 2 queued.
Producer: element 3 queued.
Consumer: fetching 2 from queue.
Producer: element 4 queued.
Consumer: fetching 3 from queue.
Consumer: fetching 4 from queue.
Consumer: queue not ready -> going to sleep.
Consumer: two consecutive failed attempts -> Exiting.

Una visione olistica

Per ricapitolare, questo articolo ha mostrato un panorama concettuale dei meccanismi introdotti in C + + 11 per consentire l'esecuzione parallela in un'epoca in cui sono tradizionali ambienti multi-core.

Attività asincrone attivare un modello di programmazione leggero parallelizzare l'esecuzione. I risultati di ogni attività possono essere recuperati tramite un futuro associato.

Discussioni offrono maggiore granularità rispetto a compiti — anche se sono più pesanti — insieme con meccanismi per mantenere separati copie di variabili statiche e trasportando le eccezioni tra thread.

Come thread paralleli agiscono sui dati comuni, C + + 11 fornisce risorse per evitare race condition. I tipi atomici consentono un modo affidabile per garantire che i dati vengono modificati da un thread alla volta.

Mutex aiutano a definire le aree critiche in tutto il codice — regioni a cui discussioni non sono in grado di accedere contemporaneamente. Serrature avvolgono i mutex, legando l'apertura di quest'ultimo per il ciclo di vita dell'ex.

Infine, le variabili di condizione concedere maggiore efficienza di sincronizzazione dei thread, come alcuni thread può attendere gli eventi notificati dagli altri thread.

Questo articolo non ha coperto tutti i molti modi per configurare e utilizzare ciascuna di queste caratteristiche, ma il lettore ora ha una visione olistica di loro ed è pronto a scavare più a fondo.

Diego Dagum è uno sviluppatore di software con più di 20 anni di esperienza. Egli è attualmente un Visual C++ comunità program manager di Microsoft.

Grazie ai seguenti esperti tecnici per la revisione di questo articolo: David Cravey, Alon Fliess, Fabio Galuppo e Marc Gregoire