C# Warteschlangen – Die BlockingCollection

Oft werden Warteschlangen benötigt, um Aufgaben zu Verwalten oder durch etwaige Parallelität die Performance zu Steigern. Mit .NET 4.0 hat Microsoft einen großen Schritt in diese Richtung getan und den Entwicklern durch die sogenannten ConcurrentCollections viel Arbeit abgenommen.

Die für diesen Beitrag interessante Klasse ist die BlockingCollection, die den sogenannten Producer and Consumer-Pattern umsetzt. Mit Hilfe dieses Patterns können sehr einfach und effizient Aufgaben in Warteschlangen gelegt und von anderen Threads zur Verarbeitung entnommen werden.

Die Funktionweise ist meist, dass eine Schleife in Thread Nr. 1 die Warteschlange füllt und mehrere Threads (Nr. 2 - Nr. x) solange warten, bis die Warteschlange neue Elemente erhält. Die konsumierenden Threads legen sich dann so lange schlafen, bis neue Elemente eintrudeln um diese zu verarbeiten - ohne dabei die CPU zu belasten.

Das Umsetzen dieses Patterns unter .NET 3.5 erforderte vom Entwickler noch relativ viel Code, der zudem Stoplerfallen durch die Thread-Synchronisation beachten musste.

    /// <summary>
    /// Element für die Warteschlange
    /// </summary>
    public class MyWarteschlangenElement
    {
        public MyWarteschlangenElement( Int32 zahl1, Int32 zahl2 )
        {
            Zahl1 = zahl1;
            Zahl2 = zahl2;
        }
        public Int32 Zahl1 { get; private set; }
        public Int32 Zahl2 { get; private set; }
    }

    public class MyWarteschlange
    {
        private readonly object _warteschlangenSynchronisierer = new object( );

        /// <summary>
        /// Alle aktiven konsumierenden Threads
        /// </summary>
        private    readonly Thread[] _arbeitendeThreads;

        /// <summary>
        /// Warteschlange
        /// </summary>
        private   readonly Queue<MyWarteschlangenElement> _itemQ = new Queue<MyWarteschlangenElement>( );

        public MyWarteschlange( int anzahlThreads )  // Anzahl Threads = Anzahl der Konsumenten, die Elemente 
        {                                            // aus der Warteschlange nehmen und bearbeiten
            _arbeitendeThreads = new Thread[ anzahlThreads ];

            // Erstelle für jeden Arbeiter einen eignen Thread
            for ( var i = 0 ; i < anzahlThreads ; i++ )
            {
                ( _arbeitendeThreads[ i ] = new Thread( Consume ) ).Start( );
            }
        }

        /// <summary>
        /// Für ein Element, hier zwei Zahlen, zur Warteschlange hinzu
        /// </summary>
        /// <param name="warteschlangenElement"></param>
        public void AufgabeHinzufuegen( MyWarteschlangenElement warteschlangenElement )
        {
            // Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
            lock ( _warteschlangenSynchronisierer )
            {
                // Element zur Warteschlange hinzufügen
                _itemQ.Enqueue( warteschlangenElement );

                // Bekanntgeben, dass ein neues Element hinzugefügt wurde
                Monitor.Pulse( _warteschlangenSynchronisierer );
            }
        }

        /// <summary>
        /// Beendet die Warteschlange
        /// </summary>
        public void Abschliessen( )
        {
            // Wir definieren null als das Element, das den verarbeitenden Threads sagt, dass nun abgeschlossen werden soll
            // Aufgrund des locks und des Pulse müssen wir für jeden Thread ein null einwerfern
            for ( var i = 0 ; i < _arbeitendeThreads.Length ; i++ )
            {
                AufgabeHinzufuegen( null );
            }

            // Warten, bis sich alle Threads beendet haben
            foreach ( var worker in _arbeitendeThreads )
            {
                worker.Join( );
            }
        }

        /// <summary>
        /// Verarbeitet ein Element
        /// </summary>
        private void Consume( )
        {
            // Dauerhaft schauen, ob ein Element in der Warteschlange auf die Verarbeitung wartet
            while ( true )
            {
                MyWarteschlangenElement zubearbeitendesElement;

                // Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
                lock ( _warteschlangenSynchronisierer )
                {
                    while ( _itemQ.Count == 0 )
                    {
                        // Wir warten hier, bis ein neues Element der Warteschlange hinzugefügt wurde
                        // Dies schont die CPU, da nicht ständig das while(true) durchlaufen wird!
                        Monitor.Wait( _warteschlangenSynchronisierer );
                    }

                    // Wir holen ein Element aus der Warteschlange
                    zubearbeitendesElement = _itemQ.Dequeue( );
                }

                // Wir haben null als Abbruchkriterium definiert; schauen, ob wir abbrechen sollen
                if ( zubearbeitendesElement == null )
                {
                    // Thread beenden
                    return;
                }

                // Ausführung der Aufgabe
                BerechneSumme( zubearbeitendesElement );
            }
        }

        /// <summary>
        /// Berechnet die Summe zweier Zahlen
        /// </summary>
        /// <param name="warteschlangenElement"></param>
        private static void BerechneSumme( MyWarteschlangenElement warteschlangenElement )
        {
            // Berechne die Summe aus zahl1 und zahl2
            var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
        }
        ```
        
Unter **.NET 4.0** ist die Implementierung nun deutlich einfacher.

```csharp
public class MyWarteschlange
    {
        /// <summary>
        /// Warteschlange, die die Elemente zur Verarbeitung enthält
        /// </summary>
        private   readonly BlockingCollection<MyWarteschlangenElement> _warteschlange = new BlockingCollection<MyWarteschlangenElement>( );

        /// <summary>
        /// Mit Hilfe dessen kann die Bearbeitung beendet oder abgebrochen werden
        /// </summary>
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource( );

        /// <summary>
        /// Array, um die aktuell konsumierenden Threads zwischen zu speichern
        /// </summary>
        private readonly Task[] _tasks;

        public MyWarteschlange( int anzahlThreads )  // Anzahl Threads = Anzahl der Konsumenten, die Elemente 
        {                                            // aus der Warteschlange nehmen und bearbeiten

            // Array mit der Anzahl der Threads definieren
            // Geringer 1 macht natürlich keinen Sinn und sollte abgefangen werden
            _tasks = new Task[ anzahlThreads ];

            for ( var i = 0 ; i < anzahlThreads ; i++ )
            {
                // Neuen Task anlegen, der Elemente für die Verarbeitung entnimmt
                var consumeTask = Task.Factory.StartNew( ( ) =>
                    {
                        try
                        {
                            // GetConsumingEnumerable() blockiert hier so lange, bis ein Element entnommen werden konnte oder wenn die BlockingCollection komplettiert wurde.
                            // Ebenso wird abgebrochen, wenn ein Abschließen, also das Beenden durch den CancellationTokenSource angefordert wurde
                            foreach ( var warteschlangenElement in _warteschlange.GetConsumingEnumerable( TokenSource.Token ) )
                            {
                                // Schauen, ob Abschliessen() aufgerufen wurde
                                if ( TokenSource.IsCancellationRequested )
                                {
                                    // Die Bearbeitung wurde abgebrochen
                                    // Abschliessen() wurde ausgeführt
                                    return;
                                }

                                // Berechnen der Summe des aktuellen Elements
                                var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
                            }
                        }
                        catch ( OperationCanceledException )
                        {
                            // Die Bearbeitung wurde abgebrochen
                            // Abschliessen() wurde ausgeführt
                        }
                    } );

                // Neuen Task zur Liste der Tasks hinzufügen
                _tasks[ i ] = consumeTask;
            }
        }

        public void Hinzufuegen( MyWarteschlangenElement warteschlangenElement )
        {
            // Wir benötigen beim Hinzufügen kein Lock, da die BlockingCollection absolut Thread-sicher ist
            _warteschlange.Add( warteschlangenElement );
        }

        public void Abschliessen( )
        {
            // Abbrechen anfordern
            TokenSource.Cancel( );

            // Warten, bis sich alle Tasks beendet haben
            Task.WaitAll( _tasks );
        }
    }

Wir sehen, dass die Variante mit der BlockingCollection folgende Vorteile bietet:

  • Wir müssen uns um Locks nicht selbst kümmern > Thread-Sicherheit
  • Deutlich weniger Code-Aufwand
  • Und damit auch eine gesteigerte Übersicht und Code-Qualität<