C# Warteschlangen – Die BlockingCollection
Oft werden Warteschlangen benötigt, um Aufgaben zu Verwalten oder durch etwaige Parallelität die Performance zu Steigern. Mit .NET 4.0 hat Microsoft einen großen Schritt in diese Richtung getan und den Entwicklern durch die sogenannten ConcurrentCollections viel Arbeit abgenommen.
Die für diesen Beitrag interessante Klasse ist die BlockingCollection, die den sogenannten Producer and Consumer-Pattern umsetzt. Mit Hilfe dieses Patterns können sehr einfach und effizient Aufgaben in Warteschlangen gelegt und von anderen Threads zur Verarbeitung entnommen werden.
Die Funktionweise ist meist, dass eine Schleife in Thread Nr. 1 die Warteschlange füllt und mehrere Threads (Nr. 2 - Nr. x) solange warten, bis die Warteschlange neue Elemente erhält. Die konsumierenden Threads legen sich dann so lange schlafen, bis neue Elemente eintrudeln um diese zu verarbeiten - ohne dabei die CPU zu belasten.
Das Umsetzen dieses Patterns unter .NET 3.5 erforderte vom Entwickler noch relativ viel Code, der zudem Stoplerfallen durch die Thread-Synchronisation beachten musste.
/// <summary>
/// Element für die Warteschlange
/// </summary>
public class MyWarteschlangenElement
{
public MyWarteschlangenElement( Int32 zahl1, Int32 zahl2 )
{
Zahl1 = zahl1;
Zahl2 = zahl2;
}
public Int32 Zahl1 { get; private set; }
public Int32 Zahl2 { get; private set; }
}
public class MyWarteschlange
{
private readonly object _warteschlangenSynchronisierer = new object( );
/// <summary>
/// Alle aktiven konsumierenden Threads
/// </summary>
private readonly Thread[] _arbeitendeThreads;
/// <summary>
/// Warteschlange
/// </summary>
private readonly Queue<MyWarteschlangenElement> _itemQ = new Queue<MyWarteschlangenElement>( );
public MyWarteschlange( int anzahlThreads ) // Anzahl Threads = Anzahl der Konsumenten, die Elemente
{ // aus der Warteschlange nehmen und bearbeiten
_arbeitendeThreads = new Thread[ anzahlThreads ];
// Erstelle für jeden Arbeiter einen eignen Thread
for ( var i = 0 ; i < anzahlThreads ; i++ )
{
( _arbeitendeThreads[ i ] = new Thread( Consume ) ).Start( );
}
}
/// <summary>
/// Für ein Element, hier zwei Zahlen, zur Warteschlange hinzu
/// </summary>
/// <param name="warteschlangenElement"></param>
public void AufgabeHinzufuegen( MyWarteschlangenElement warteschlangenElement )
{
// Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
lock ( _warteschlangenSynchronisierer )
{
// Element zur Warteschlange hinzufügen
_itemQ.Enqueue( warteschlangenElement );
// Bekanntgeben, dass ein neues Element hinzugefügt wurde
Monitor.Pulse( _warteschlangenSynchronisierer );
}
}
/// <summary>
/// Beendet die Warteschlange
/// </summary>
public void Abschliessen( )
{
// Wir definieren null als das Element, das den verarbeitenden Threads sagt, dass nun abgeschlossen werden soll
// Aufgrund des locks und des Pulse müssen wir für jeden Thread ein null einwerfern
for ( var i = 0 ; i < _arbeitendeThreads.Length ; i++ )
{
AufgabeHinzufuegen( null );
}
// Warten, bis sich alle Threads beendet haben
foreach ( var worker in _arbeitendeThreads )
{
worker.Join( );
}
}
/// <summary>
/// Verarbeitet ein Element
/// </summary>
private void Consume( )
{
// Dauerhaft schauen, ob ein Element in der Warteschlange auf die Verarbeitung wartet
while ( true )
{
MyWarteschlangenElement zubearbeitendesElement;
// Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
lock ( _warteschlangenSynchronisierer )
{
while ( _itemQ.Count == 0 )
{
// Wir warten hier, bis ein neues Element der Warteschlange hinzugefügt wurde
// Dies schont die CPU, da nicht ständig das while(true) durchlaufen wird!
Monitor.Wait( _warteschlangenSynchronisierer );
}
// Wir holen ein Element aus der Warteschlange
zubearbeitendesElement = _itemQ.Dequeue( );
}
// Wir haben null als Abbruchkriterium definiert; schauen, ob wir abbrechen sollen
if ( zubearbeitendesElement == null )
{
// Thread beenden
return;
}
// Ausführung der Aufgabe
BerechneSumme( zubearbeitendesElement );
}
}
/// <summary>
/// Berechnet die Summe zweier Zahlen
/// </summary>
/// <param name="warteschlangenElement"></param>
private static void BerechneSumme( MyWarteschlangenElement warteschlangenElement )
{
// Berechne die Summe aus zahl1 und zahl2
var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
}
```
Unter **.NET 4.0** ist die Implementierung nun deutlich einfacher.
```csharp
public class MyWarteschlange
{
/// <summary>
/// Warteschlange, die die Elemente zur Verarbeitung enthält
/// </summary>
private readonly BlockingCollection<MyWarteschlangenElement> _warteschlange = new BlockingCollection<MyWarteschlangenElement>( );
/// <summary>
/// Mit Hilfe dessen kann die Bearbeitung beendet oder abgebrochen werden
/// </summary>
private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource( );
/// <summary>
/// Array, um die aktuell konsumierenden Threads zwischen zu speichern
/// </summary>
private readonly Task[] _tasks;
public MyWarteschlange( int anzahlThreads ) // Anzahl Threads = Anzahl der Konsumenten, die Elemente
{ // aus der Warteschlange nehmen und bearbeiten
// Array mit der Anzahl der Threads definieren
// Geringer 1 macht natürlich keinen Sinn und sollte abgefangen werden
_tasks = new Task[ anzahlThreads ];
for ( var i = 0 ; i < anzahlThreads ; i++ )
{
// Neuen Task anlegen, der Elemente für die Verarbeitung entnimmt
var consumeTask = Task.Factory.StartNew( ( ) =>
{
try
{
// GetConsumingEnumerable() blockiert hier so lange, bis ein Element entnommen werden konnte oder wenn die BlockingCollection komplettiert wurde.
// Ebenso wird abgebrochen, wenn ein Abschließen, also das Beenden durch den CancellationTokenSource angefordert wurde
foreach ( var warteschlangenElement in _warteschlange.GetConsumingEnumerable( TokenSource.Token ) )
{
// Schauen, ob Abschliessen() aufgerufen wurde
if ( TokenSource.IsCancellationRequested )
{
// Die Bearbeitung wurde abgebrochen
// Abschliessen() wurde ausgeführt
return;
}
// Berechnen der Summe des aktuellen Elements
var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
}
}
catch ( OperationCanceledException )
{
// Die Bearbeitung wurde abgebrochen
// Abschliessen() wurde ausgeführt
}
} );
// Neuen Task zur Liste der Tasks hinzufügen
_tasks[ i ] = consumeTask;
}
}
public void Hinzufuegen( MyWarteschlangenElement warteschlangenElement )
{
// Wir benötigen beim Hinzufügen kein Lock, da die BlockingCollection absolut Thread-sicher ist
_warteschlange.Add( warteschlangenElement );
}
public void Abschliessen( )
{
// Abbrechen anfordern
TokenSource.Cancel( );
// Warten, bis sich alle Tasks beendet haben
Task.WaitAll( _tasks );
}
}
Wir sehen, dass die Variante mit der BlockingCollection folgende Vorteile bietet:
- Wir müssen uns um Locks nicht selbst kümmern > Thread-Sicherheit
- Deutlich weniger Code-Aufwand
- Und damit auch eine gesteigerte Übersicht und Code-Qualität<