How to write a full duplex server in Go
A full duplex connection offers several significant advantages in modern communication systems. One of the key benefits is the simultaneous bidirectional data transfer it enables.
In the previous article How to write a concurrent TCP server in Go we saw how to implement a concurrent TCP server in Go. This time we are going to see how to take the server to the next level, and allow it to broadcast messages to its clients. That way, we can start a communication from either end of the connection. Thus, ending up in a full duplex scenario.
We are going to implement a mechanism that allows the server to send messages to all of its clients at the same time. If we wanted to send messages to a specific client, it would just be a matter of keeping track of an ID for each client associated to their corresponding connection. And use that mapping to send the messages to a specific client.
Client connection
We will start by modelling each client connection with a reference to the server, also a reference to the actual network connection and a buffered channel of 256 strings. This channel will be use to store responses as a buffer before sending them back to the client. That way, we can detach the process of sending messages back to the clients from the process of handling their requests.
type connection struct {
s *server
conn net.Conn
responses chan string
}
func newConnection(s *server, conn net.Conn) *connection {
var c connection
c.s = s
c.conn = conn
c.responses = make(chan string, 256)
return &c
}
The client connection processing will be composed of two goroutines. One of them will be in charge of reading requests from the client, and sending them to the server. Each client request will be a text line.
func (c *connection) readConnection() {
defer c.s.removeConnection(c)
buf := bufio.NewReader(c.conn)
for {
data, err := buf.ReadString('\n')
if err != nil {
break
}
c.s.submitRequest(c, data)
}
}
The other goroutine will iterate over the responses channel, and every time a new response is added to the channel, this goroutine will send it to the client through the network connection. It will keep iterating over the responses channel, so in case there are no more responses ready to be sent back to the client, this goroutine will wait until a new one gets to the channel.
func (c *connection) writeConnection() {
for message := range c.responses {
c.conn.Write([]byte(message))
}
}
Server
The server object will keep the client connections in a sync.Map. This is needed because the list of connections can be accessed from different goroutines at the same time: the ones that add and remove references to the map, and the one that broadcast responses to the clients. And similarly to the client connection, the server will contain a channel of 256 requests. That way, it can buffer the requests coming from the clients without the need of blocking any of them. It will also contain a handle function that allows the user to specify a function to process the requests.
type server struct {
connections sync.Map
handle handleFn
requests chan *request
}
func newServer(handle handleFn) *server {
var s server
s.handle = handle
s.requests = make(chan *request, 256)
return &s
}
The server processing will be composed of a goroutine that accepts new network connections and stores them in the sync.Map.
func (s *server) serve(network, address string) {
l, err := net.Listen(network, address)
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}
connection := newConnection(s, c)
s.connections.Store(c.RemoteAddr().String(), connection)
connection.start()
log.Printf("New connection %s", c.RemoteAddr().String())
}
}
And it will also contain a number of goroutines that handle client requests. This number of goroutines is the same as the number of CPUs available in the server machine. Each one of these worker goroutines will execute the handle function of the server to process the requests.
func (s *server) start(network, address string) {
go s.serve(network, address)
numCpu := runtime.NumCPU()
for i := 0; i < numCpu; i++ {
go s.worker()
}
}
func (s *server) worker() {
for req := range s.requests {
s.handle(req.c, req.data)
}
}
The server will also expose a method to broadcast messages to the clients. This method will iterate over the client connections sync.Map, and will send the message to each client. That message will then be queued in the requests channel of the client connection.
func (s *server) broadcast(message string) {
s.connections.Range(func(k, v interface{}) bool {
c := v.(*connection)
c.send(message)
return true
})
}
Complete code
After going over the main parts of the example, here you have the complete code of the server.
package main
import (
"bufio"
"fmt"
"log"
"net"
"os"
"runtime"
"sync"
)
type connection struct {
s *server
conn net.Conn
responses chan string
}
func newConnection(s *server, conn net.Conn) *connection {
var c connection
c.s = s
c.conn = conn
c.responses = make(chan string, 256)
return &c
}
func (c *connection) start() {
go c.readConnection()
go c.writeConnection()
}
func (c *connection) stop() {
close(c.responses)
c.conn.Close()
}
func (c *connection) send(data string) {
c.responses <- data
}
func (c *connection) readConnection() {
defer c.s.removeConnection(c)
buf := bufio.NewReader(c.conn)
for {
data, err := buf.ReadString('\n')
if err != nil {
break
}
c.s.submitRequest(c, data)
}
}
func (c *connection) writeConnection() {
for message := range c.responses {
c.conn.Write([]byte(message))
}
}
type request struct {
c *connection
data string
}
type handleFn func(*connection, string)
type server struct {
connections sync.Map
handle handleFn
requests chan *request
}
func newServer(handle handleFn) *server {
var s server
s.handle = handle
s.requests = make(chan *request, 256)
return &s
}
func (s *server) submitRequest(c *connection, data string) {
req := request{c, data}
s.requests <- &req
}
func (s *server) start(network, address string) {
go s.serve(network, address)
numCpu := runtime.NumCPU()
for i := 0; i < numCpu; i++ {
go s.worker()
}
}
func (s *server) worker() {
for req := range s.requests {
s.handle(req.c, req.data)
}
}
func (s *server) serve(network, address string) {
l, err := net.Listen(network, address)
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}
connection := newConnection(s, c)
s.connections.Store(c.RemoteAddr().String(), connection)
connection.start()
log.Printf("New connection %s", c.RemoteAddr().String())
}
}
func (s *server) removeConnection(c *connection) {
c.stop()
s.connections.Delete(c.conn.RemoteAddr().String())
log.Printf("Closed connection %s", c.conn.RemoteAddr().String())
}
func (s *server) broadcast(message string) {
s.connections.Range(func(k, v interface{}) bool {
c := v.(*connection)
c.send(message)
return true
})
}
func main() {
arguments := os.Args
if len(arguments) != 3 {
log.Fatal("Usage: server <network> <address>")
}
network := arguments[1]
address := ":" + arguments[2]
s := newServer(func(c *connection, request string) {
c.send(request)
})
s.start(network, address)
fmt.Print("Enter message: \n")
reader := bufio.NewReader(os.Stdin)
for {
text, _ := reader.ReadString('\n')
s.broadcast(text)
}
}
Testing the example
TCP sockets example
# TCP Server
./server tcp <port>
# TCP Client
nc localhost <port>
>nc localhost 8080
test1
test1
test2
test2
>./main tcp 8080
Enter message:
2023/08/11 12:32:50 New connection 127.0.0.1:34498
message1
>nc localhost 8080
test1
test1
test2
test2
message1
UNIX Domain Sockets example
# UNIX Server
./server unix <socket>
# example
./server unix test
# UNIX Client
nc -U <socket>
# example:
nc -U :test