Asynchronität mit C# und Go Blog

TPL Dataflow vs. Go Channels

Rainer Stropek
May 4, 2021

Wie können wir asynchron Nachrichten zwischen Verarbeitungsschritten austauschen? Dazu existieren in C# und Go verschiedene Ansätze – ein guter Anlass, einen Blick über den Tellerrand zu wagen und sich die Programmiersprache Go einmal näher anzusehen. Bevor wir damit starten, rufen wir uns zunächst die TPL Dataflow Library in Erinnerung.

Nebenläufige Programmierung ist heutzutage die Regel, nicht die Ausnahme. Daten werden gelesen oder empfangen, asynchron innerhalb des Prozesses verarbeitet und die Ergebnisse werden danach ausgegeben oder an einen Empfänger gesendet. Natürlich lässt sich die Kommunikation zwischen den Threads eines Prozesses über Puffer im Speicher und Synchronisationsobjekte wie Locks oder Semaphore lösen. Dieses Programmiermodell ist jedoch fehleranfällig und führt nicht selten zu ineffizienten Algorithmen.

Die meisten Programmierplattformen, die ich kenne, enthalten auf Programmiersprachen- oder Framework-Ebene alternative Mechanismen, um asynchron Messages zwischen Verarbeitungsschritten auszutauschen. In diesem Artikel vergleiche ich die Ansätze von C# und Go, wobei der Schwerpunkt auf den Besonderheiten von Go liegt. Der Message-Austausch zwischen den sogenannten Goroutines über sogenannte Channels ist eine charakteristische Eigenschaft der Sprache Go. Dieser Artikel soll C#-Entwicklerinnen und Entwicklern einen Blick über den Tellerrand bieten und vielleicht sogar Lust darauf machen, im einen oder anderen Projekt Go auszuprobieren.

TPL Dataflow Library

Bevor wir auf Go und seine Channels zu sprechen kommen, möchte ich kurz in Erinnerung rufen, was C# und .NET in Sachen asynchronem Message-Austausch innerhalb von Prozessen eingebaut hat: die TPL (Task Parallel Library) Dataflow Library [1]. Diese Bibliothek basiert auf den folgenden Grundprinzipien:

  • Source Blocks (ISourceBlock<T>) sind Datenquellen. Man kann Messages von ihnen lesen.

  • Target Blocks (ITargetBlock<T>) empfangen Messages. Man kann Messages auf sie schreiben.

  • Propagator Blocks (IPropagatorBlock<TIn, TOut>) sind gleichzeitig Source und Target Blocks. Sie verarbeiten Daten in irgendeiner Form (Projizieren, Filtern, Gruppieren etc.).

Die Blöcke kann man flexibel zu Pipelines kombinieren. .NET kommt mit vielen vordefinierten Blöcken (Namespace: System.Threading.Tasks.Dataflow), die man je nach Anwendungsfall zu einer Pipeline zusammenstellt.

Listing 1 zeigt exemplarisch, wie eine Datenverarbeitung mit der TPL Dataflow Library programmiert wird. Ein Producer erzeugt Daten. In der Praxis würden diese beispielsweise aus Dateien oder Datenbanken geladen beziehungsweise über das Netzwerk empfangen. Im Beispiel werden Zufallsdaten in einen BufferBlock geschrieben. Er kann je nach Anwendungsfall als Source-, Propagator- oder Target-Block eingesetzt werden. Der Transformer verarbeitet die Daten und gibt das Verarbeitungsergebnis an den nächsten Schritt der Pipeline weiter. Im Beispiel wird der Mittelwert aus Zahlen in einem Array berechnet. In der Praxis können hier beliebig komplexe Logiken zur Verarbeitung ablaufen. Eine Besonderheit der TPL Dataflow Library ist, dass sie sich automatisch um die Parallelisierung der Message-Verarbeitung kümmert, wenn der jeweilige Algorithmus das erlaubt. Im Beispiel wird MaxDegreeOfParallelism angegeben, wodurch mehrere Transformer parallel laufen. Die Verarbeitung kann dadurch entsprechend beschleunigt werden. Der Consumer ist im Beispiel der letzte Schritt der Pipeline. Er erwartet das Eintreffen von Messages und verarbeitet sie.

Listing 1: TPL Dataflow Pipeline

using System;
using static System.Console;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
 
// Create a buffer (=target block) into which we can write messages
var buffer = new BufferBlock<byte[]>();
 
// Create a transformer (=source and target block) that processes data.
// For demo purposes, we specify a degree of parallelism. This allows
// .NET to run multiple transformers concurrently.
var transform = new TransformBlock<byte[], double>(Transform, new() { MaxDegreeOfParallelism = 5 });
 
// Link buffer with transformer
buffer.LinkTo(transform);
 
// Start asynchronous consumer
var consumerTask = ConsumeAsync(transform);
 
// Start producer
Produce(buffer);
 
// Producer is done, we can mark transformer as completed.
// This will stop the consumer after it will have been finished
// consuming buffered messages.
transform.Complete();
 
// Wait for consumer to finish and print number of processed messages
var bytesProcessed = await consumerTask;
WriteLine($"Processed {bytesProcessed} messages.");
 
/// <summary>
/// Produces values and writes them into <c>target</c>
/// </summary>
static void Produce(ITargetBlock<byte[]> target)
{
  // Here we generate random bytes. In practice, the producer
  // would e.g. read data from disk, receive data over the network,
  // get data from a database, etc.
  var rand = new Random();
  for (int i = 0; i < 100; ++i)
  {
    var buffer = new byte[1024];
    rand.NextBytes(buffer);
 
    // Send message into target block
    WriteLine("Sending message");
    target.Post(buffer);
  }
 
  // Mark as completed
  target.Complete();
}
 
/// <summary>
/// Transforms incoming message (byte array -> average value)
/// </summary>
static double Transform(byte[] bytes)
{
  // For debug purposes, we print the thread id. If you run the program,
  // you will see that transformers run in parallel on multiple threads.
  WriteLine($"Transforming message on thread ${Thread.CurrentThread.ManagedThreadId}");
  return bytes.Average(val => (double)val);
}
 
/// <summary>
/// Consumes message
/// </summary>
static async Task<int> ConsumeAsync(ISourceBlock<double> source)
{
  var messagesProcessed = 0;
 
  // Await incoming message
  while (await source.OutputAvailableAsync())
  {
    // Receive message
    var average = await source.ReceiveAsync();
 
    // Process message. In this demo we are just printing its content.
    WriteLine($"Consumed message, average value is {average}");
    messagesProcessed++;
  }
 
  return messagesProcessed;
}

Die TPL Dataflow Library ist gut geeignet für CPU- oder I/O-intensive Algorithmen, bei denen eine große Datenmenge idealerweise in mehreren, parallel laufenden Threads verarbeitet werden soll. Als Entwicklerin oder Entwickler muss man sich nicht mit Shared Memory, Thread-Synchronisation und manuellem Scheduling der Aufgaben herumplagen. Das geschieht automatisch im Hintergrund durch die Library.

Auftritt: Go

Bevor wir ins Thema Go Channels einsteigen, zunächst ein paar einleitende Worte zu dieser Programmiersprache für die, die mit ihr noch keine Berührungspunkte hatten. Go stammt aus dem Hause Google. Die Grundidee hinter Go ist, eine Programmiersprache zu haben, die besonders flott beim Kompilieren, einfach in der Nutzung und performant ist, was den generierten Code betrifft. Go wird im Gegensatz zu C# in Maschinensprache übersetzt. Es gibt keine Intermediate Language. Es handelt sich trotzdem wie bei C# um eine Managed Language inklusive Garbage Collector.

Der auffälligste Unterschied zwischen C# und Go, der jeder Entwicklerin und jedem Entwickler beim Umstieg sofort auffällt, ist die Einfachheit der Sprache. Go hat im Vergleich zu C# nur einen Bruchteil an Schlüsselwörtern. Ein einprägsames Beispiel dafür ist der Increment-Operator (++). In Go ist er ein Statement, keine Expression. Etwas wie if (++x > 5) gibt es in Go nicht. Das Go-Entwicklungsteam ist sehr zurückhaltend, neue Sprachfeatures hinzuzufügen, die nur dazu dienen, Code kürzer zu machen. Es wird nicht als Nachteil empfunden, wenn man als Entwickler ein paar Zeilen mehr Code schreiben muss, wenn im Gegenzug die Sprache dadurch einfach und schlank gehalten werden kann. Ein weiteres Beispiel, das die Philosophie hinter Go klar macht, sind Generics. Es gibt Stand heute in Go keine Generics, obwohl die Sprache schon über 10 Jahre alt und speziell im Bereich Cloud-Computing und Containertechnologie weit verbreitet ist. Generics (inklusive Generic Channels) sind gerade erst in Planung und werden voraussichtlich nächstes Jahr zu Go hinzugefügt.

Kommt man von C#, erfordert das eine gewisse Bereitschaft zum Umdenken. Gerade in den letzten Sprachversionen von C# wurde vieles hinzugefügt, was nur dazu dient, kürzeren und prägnanteren Code zu schreiben. Go ist im Vergleich zu C# meiner Erfahrung nach eine Art „Coding Detox“. Man arbeitet mit einer auf das Wesentliche reduzierten Sprache, die sich langsam, aber stetig weiterentwickelt und langfristige Stabilität bietet. Wenn man sich auf diesen Grundgedanken einlässt, ist Entwickeln mit Go eine angenehme Erfahrung.

Dieser Artikel ist keine generelle Einleitung in Go. Wir konzentrieren uns auf zwei charakteristische Sprachfunktionen, Goroutines und Channels. Die Beispiele sind so einfach gehalten, dass auch Leserinnen und Leser ohne Go-Know-how ohne Weiteres folgen können.

Goroutines

Wir wollen uns in diesem Artikel mit dem Austauschen von Nachrichten zwischen nebenläufig ausgeführten Programmteilen in einem Prozess beschäftigen. Daher müssen wir uns ansehen, welche Analogie es in Go für C# Tasks gibt. Das Grundkonstrukt für nebenläufige Threads in Go-Programmen sind sogenannte Goroutines. Wie C# Tasks sind Goroutines viel leichtgewichtiger als Betriebssystem-Threads, es kann daher viel mehr Goroutines geben als darunterliegende Threads. Goroutines brauchen nur wenig Speicher (wenige KB). Das Scheduling wird durch die Go Runtime erledigt.

Schluss mit der Theorie, werfen wir einen Blick auf ein Codebeispiel. Listing 2 zeigt ein ganz einfaches Beispiel einer Goroutine. Achten Sie beim Durchsehen des Codes darauf, dass in der main-Methode dem ersten Aufruf von sayHello das Schlüsselwort go vorangestellt ist. Dadurch startet die aufgerufene Methode in einer eigenen Goroutine und ist nebenläufig zum Hauptprogramm.

Listing 2: Goroutine

package main
 
import (
  "fmt"
  "time"
)
 
func sayHello(source string) {
  fmt.Printf("Hello World from %s!\n", source)
  time.Sleep(5 * time.Millisecond)
}
 
func main() {
  go sayHello("goroutine")
  sayHello("direct call")
 
  time.Sleep(10 * time.Millisecond)
}

Es gäbe noch eine Menge über Goroutines zu sagen. In diesem Artikel wollen wir uns aber in Folge mit Channels beschäftigen. Die erwähnten Grundlagen von Goroutines sind für das Verständnis von Channels ausreichend. Wer Go in der Praxis einsetzen möchte, ist gut beraten, noch etwas Zeit in das Lesen der Dokumentation über Goroutines zu investieren. Aber keine Angst, das ist kein sprichwörtliches Rabbit Hole, in das man abtauchen muss, um tagelang Feinheiten des Goroutine-Schedulers von Go zu studieren. Die Grundphilosophie von Go ist, dass die Sprache einfach sein soll und man für ihre Verwendung die Implementierungsdetails nicht zu kennen braucht. Aus eigener Erfahrung kann ich sagen, dass der Einstieg in Go für erfahrene C#-Entwicklerinnen und Entwickler nicht schwer ist. Wer C# Tasks verstanden hat, kann in kürzester Zeit Anwendungen mit Goroutines schreiben.

Channels

Channels in Go dienen dazu, Daten zwischen Goroutines auszutauschen. Channels sind typsicher und man erspart sich durch ihre Verwendung die manuelle Synchronisation nebenläufiger Prozesse beim Zugriff auf gemeinsame Speicherbereiche. Insofern haben Channels in Go eine gewisse Ähnlichkeit mit der C# Dataflow Library. Der große Unterschied ist, dass Channels viel tiefer in die Programmiersprache Go und die zugehörigen Bibliotheken integriert sind als Dataflow Blocks in C# und .NET.

Listing 3 zeigt an einem einfachen Beispiel, wie mit Go Channels programmiert wird. Beachten Sie beim Durchsehen des Codes die Codekommentare. Sie weisen auf wichtige Dinge hin, die für das Verständnis der Channels wichtig sind.

Listing 3: Grundlagen von Go Channels

package main
 
import (
  "fmt"
  "time"
)
 
// Note that our function receives a 
// write-only channel. The channel will be used
// to send back the result.
func getValueAsync(result chan<- int) {
  // Simulate some longer running operation. Could
  // be reading data from disk, accessing DB or
  // network, etc.
  time.Sleep(10 * time.Millisecond)
 
  // Return result and print some debug output
  fmt.Println("Before sending result")
  result <- 42
  fmt.Println("After sending result")
}
 
func doSomethingComplex(done chan<- bool) {
  // Simulate a longer operation again
  time.Sleep(10 * time.Millisecond)
 
  // This time, we do not return a meaningful
  // result. We only signal that processing has
  // been completed by sending something into
  // the channel.
  done <- true
}
 
func main() {
  // Create a channel for receiving an async result
  result := make(chan int)
 
  // Run operation asynchronously
  go getValueAsync(result)
 
  // Wait for result by receiving from channel
  fmt.Println("Before receiving result")
  fmt.Println(<-result)
  fmt.Println("After receiving result")
 
  // Create another channel and start 
  // another Goroutine
  done := make(chan bool)
  go doSomethingComplex(done)
 
  // The next statement is not interested in the
  // Goroutine’s result. We just want to block this
  // method’s execution until something has been 
  // sent to the channel.
  <-done
  fmt.Println("Complex operation is done")
}

Wie man bereits an diesem ersten Beispiel sieht, gibt es zwar gewisse Ähnlichkeiten zwischen der TPL Dataflow Library und den Go Channels, der typische Anwendungsbereich ist jedoch unterschiedlich. Die Dataflow Library ist im Speziellen interessant, wenn man Anwendungen hat, bei denen größere Datenmengen asynchron abgearbeitet werden sollen und der Algorithmus von automatischer Parallelisierung profitiert. Bei Go Channels spielt die automatisierte Parallelverarbeitung keine so große Rolle. Natürlich gibt es Anwendungsbereiche, bei denen beide Lösungsansätze einsetzbar sind. Die beim Design und bei der Entwicklung vordergründig wichtigsten Ziele waren aber nicht ident.

Blocking Channels

Wenn man keine besonderen Vorkehrungen trifft, blockieren Channels die lesende Goroutine, falls der Channel keinen Wert enthält, und die schreibende Goroutine, falls bereits ein Wert im Channel steckt und noch nicht empfangen wurde. „Blockieren“ bedeutet in diesem Fall nicht, dass der darunterliegende Betriebssystem-Thread blockiert ist. Blockiert wird nur die Goroutine. Der Go-interne Scheduler wird versuchen, die Zeit zu nutzen und übergibt die Kontrolle an Goroutines, die gerade nicht blockiert sind und ausgeführt werden könnten. Die Eigenschaft des Blockierens von Channels kann verwendet werden, um Goroutines zu synchronisieren. Was ist aber, wenn man ein Producer-Consumer-Pattern umsetzen möchte, bei dem der Producer manchmal etwas schneller Daten liefert als Consumer sie verarbeiten können? Für diese Zwecke gibt es Buffered Channels.

Buffered Channels

Listing 4 zeigt Buffered Channels im Einsatz. Der Code enthält wieder viele Kommentare, die auf wichtige Aspekte hinweisen. Besondere Aufmerksamkeit verdient die for/range-Schleife in der main-Methode. Man verwendet sie in Go, um über die mit Hilfe des Channels empfangenen Messages zu iterieren. Das ist nur der Anfang der Integration von Channels in die Sprache Go. Channels sind fundamentaler Bestandteil der Sprache und sie werden an allen Ecken und Enden in Go-Bibliotheken eingesetzt. Das ist ein weiterer Unterschied zwischen der TPL Dataflow Library in C# und den Channels in Go. Die Dataflow Library wird in C# für ganz spezielle Anwendungsfälle eingesetzt. Natürlich ist das API der Library so gestaltet, dass man sie angenehm mit C# nutzen kann. Von einer tiefen Integration in die Sprachsyntax von C# kann man aber ganz im Gegensatz zu Channels in Go nicht sprechen.

Listing 4: Buffered Channels

package main
 
import (
  "fmt"
  "math/rand"
  "time"
)
 
// Note that our function receives a write-only channel.
// The channel will be used to send back the result.
func producer(result chan<- int) {
  // Send 50 values through channel
  for i := 0; i < 20; i++ {
    // Simulate a longer operation by waiting. In practice,
    // this would be e.g. an I/O operation.
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
 
    // Send back result. Note that sending will be blocked
    // if buffered channel is currently full.
    fmt.Println("Sending")
    result <- rand.Intn(42)
  }
 
  // Close channel to signal caller that no further
  // values can be expected.
  close(result)
}
 
func main() {
  // Call async operation that produces values
  values := make(chan int, 10)
  go producer(values)
 
  // Note that we can use for/range to iterate over
  // values coming in through a channel. The loop will block
  // if no value is currently available in the channel.
  for val := range values {
    fmt.Printf("Received %d\n", val)
    time.Sleep(1 * time.Second)
  }
}

Non-Blocking Channel Operations

Ein weiteres Beispiel für die Sprachintegration von Channels in Go ist das select-Statement. Entwicklerinnen und Entwickler, die aus C# kommen, werden überrascht sein, denn das select-Statement hat in Go eine andere Funktion als in C#. Es wird verwendet, um Werte aus verschiedenen Channels zu empfangen. Wenn es einen default-Zweig enthält, wird es zu einer Non-Blocking Channel Operation. Das Codebeispiel in Listing 5 zeigt das Prinzip. Das erste select-Statement in main enthält einen default-Zweig. Der Code kann daher auch bei Nichtvorhandensein eines Wertes im Channel weiterlaufen. Das zweite select-Statement enthält Empfangsoperationen für zwei verschiedene Channels. Ausgeführt wird der Zweig, dessen Channel als erstes einen Wert empfängt.

Listing 5: Non-Blocking Channel Operations

package main
 
import (
  "fmt"
  "time"
)
 
func getValueAsync(result chan int) {
  time.Sleep(10 * time.Millisecond)
 
  fmt.Println("Before sending result")
  result <- 42
  fmt.Println("After sending result")
}
 
func main() {
  // Create channel and trigger asynchronous operation
  result := make(chan int)
  go getValueAsync(result)
 
  // Wait for some milliseconds
  time.Sleep(5 * time.Millisecond)
 
  // Note that we are using Go’s select statement here.
  // It is non-blocking because of the default case.
  select {
  case m := <-result:
    fmt.Printf("We have a value: %d\n", m)
  default:
    fmt.Println("Sorry, no value")
  }
 
  // Empty channel and restart async operation
  <-result
  go getValueAsync(result)
 
  // Use select statement without default case. Here
  // we combine it with time.After, which is a timer sending
  // a value in a channel after a configurable amount of time.
  select {
  case m := <-result:
    fmt.Println(m)
  case <-time.After(5 * time.Millisecond):
    fmt.Println("timed out")
  }
}

Um mögliche Einsatzbereiche des switch-Statements zu verdeutlichen, wurde in Listing 6 ein Beispiel entwickelt, bei dem eine zentrale Goroutine verschiedene mathematische Operationen anbietet. Die Operationen werden nicht direkt aufgerufen, sondern durch Senden einer Message an einen Channel ausgelöst. Anwenden könnte man dieses Entwurfsmuster beispielsweise in einer Webanwendung, bei der eine einzelne Goroutine Dienste anbietet, die von den parallellaufenden Goroutines, die für die Abarbeitung eingehender HTTP Requests gestartet wurden, aufgerufen werden.

Listing 6: Asynchrone Nachrichtenverarbeitung

package main
 
import "fmt"
 
// Create structures for operation parameters
type binaryOperationParameter struct {
  x int
  y int
}
 
// Create structure with channels used to trigger
// asynchronous operations
type operationChannels struct {
  add     chan binaryOperationParameter
  sub     chan binaryOperationParameter
  square chan int
  negate chan int
  exit     chan bool
}
 
// Helper methods to create channels
func createChannels() operationChannels {
  return operationChannels{
    add:     make(chan binaryOperationParameter),
    sub:     make(chan binaryOperationParameter),
    square: make(chan int),
    negate: make(chan int),
    exit:     make(chan bool),
  }
}
 
func calculator(channels operationChannels, result chan<- int) {
  // Run until exit operation is received
  for {
    select {
      case p := <-channels.add:
        result <- p.x + p.y
      case p := <-channels.sub:
        result <- p.x - p.y
      case p := <-channels.square:
        result <- p * p
      case p := <-channels.negate:
        result <- -p
      case <-channels.exit:
        close(result)
        fmt.Println("Good bye from calculator")
        return
    }
  }
 
}
 
func main() {
  // Start calculator in a separate Goroutine
  channels := createChannels()
  result := make(chan int)
  go calculator(channels, result)
 
  // Async add
  channels.add <- binaryOperationParameter{x: 21, y: 21}
  fmt.Printf("21 + 21 = %d\n", <-result)
 
  // Async square
  channels.square <- 2
  fmt.Printf("2^2 = %d\n", <-result)
 
  // Send exit message and wait until result channel is closed
  channels.exit <- true
  <-result
 
  fmt.Println("We are done")
}

Fazit

Wenn ich mit C#-Entwicklerinnen und -Entwicklern über Go spreche und von meiner Begeisterung für Channels und die damit verbundenen Sprachkonstrukte in Go erzähle, bekomme ich oft als Gegenargument zu hören, dass .NET mit der TPL Dataflow Library im Wesentlichen das Gleiche eingebaut habe. Zugegeben, beide Technologien haben Ähnlichkeiten. Ich stimme allerdings der Aussage, dass sie die gleichen Probleme lösen und sich beim Programmieren gleich anfühlen, nicht zu. Das Besondere an den Channels in Go ist die tiefe Integration in die Programmiersprache. Seit ich speziell im Bereich von Web-APIs Go einsetze, habe ich das zu schätzen gelernt und ich hoffe, dass ich manche Leserinnen und Leser auf Go neugierig machen konnte. Es geht dabei nicht darum, C# durch Go zu ersetzen, sondern neue Sichtweisen kennenzulernen, seine Komfortzone hin und wieder zu verlassen und bei manchen Projekten auf neue Technologien zurückzugreifen, die sich im jeweiligen Fall besonders gut eignen.

Links & Literatur

[1] https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

Top Articles About Blog

Ihr aktueller Zugang zur .NET- und Microsoft-Welt.
Der BASTA! Newsletter:

Behind the Tracks

.NET Framework & C#
Visual Studio, .NET, Git, C# & mehr

Agile & DevOps
Agile Methoden, wie Scrum oder Kanban und Tools wie Visual Studio, Azure DevOps usw.

Web Development
Alle Wege führen ins Web

Data Access & Storage
Alles rund um´s Thema Data

JavaScript
Leichtegewichtig entwickeln

UI Technology
Alles rund um UI- und UX-Aspekte

Microservices & APIs
Services, die sich über APIs via REST und JavaScript nutzen lassen

Security
Tools und Methoden für sicherere Applikationen

Cloud & Azure
Cloud-basierte & Native Apps