Home > Enterprise >  Go : Cancel Context inside a For loop
Go : Cancel Context inside a For loop

Time:02-23

I am trying to create a UDP server in Golang to Listen at a port for eg. 1234. I have a client which sends the start/stop message to this server.

On receiving of message "start", the server will start sending random data to this client and on the stop, the server will stop sending to the client.

For this purpose, I am using context to create a goroutine to send the data and cancel it when it gets "stop".

The error I am getting is the program works fine for one client, but if I start the client again the data is not sent again.

Any help would be appreciated?

UDP server Code:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "net"
    "time"
)

func generateMessageToUDP(ctx context.Context, addr *net.UDPAddr) {
    // stop writing to UDP
    done := false
    fmt.Println("Generating message to UDP client", addr)
    conn, err := net.DialUDP("udp", nil, addr)
    if err != nil {
        fmt.Println("Error: ", err)
    }
    defer func(conn *net.UDPConn) {
        err := conn.Close()
        if err != nil {
            fmt.Println("Error in closing the UDP Connection: ", err)
        }
    }(conn)
    // write to address using UDP connection
    go func() {
        for i := 0; !done; i   {
            RandomInt := rand.Intn(100)
            fmt.Println("Random Int: ", RandomInt)
            _, err = conn.Write([]byte(fmt.Sprintf("%d", RandomInt)))
            fmt.Println("Sent ", RandomInt, " to ", addr)
            time.Sleep(time.Second * 1)
        }
    }()
    <-ctx.Done()
    fmt.Println("Stopping writing to UDP client", addr)
    done = true
}

//var addr *net.UDPAddr
//var conn *net.UDPConn

func main() {
    fmt.Println("Hi this is a UDP server")
    udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
    if err != nil {
        fmt.Println("Error: ", err)
    }
    defer func(udpServer *net.UDPConn) {
        err := udpServer.Close()
        if err != nil {
            fmt.Println("Error in closing the UDP Connection: ", err)
        }
    }(udpServer)
    // create a buffer to read data into
    buffer := make([]byte, 1024)
    ctx, cancel := context.WithCancel(context.Background())
    for {
        // read the incoming connection into the buffer
        n, addr, err := udpServer.ReadFromUDP(buffer)
        fmt.Println("Recieved ", string(buffer[0:n]), " from ", addr)
        if err != nil {
            fmt.Println("Error: ", err)
        }
        fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
        if string(buffer[0:n]) == "stop" {
            fmt.Println("Stopped listening")
            cancel()
            continue
        } else if string(buffer[0:n]) == "start" {
            // send a response back to the client
            _, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
            if err != nil {
                fmt.Println("Error: ", err)
            }
            // start a routine to generate messages to the client
            generateMessageToUDP(ctx, addr)
        } else {
            fmt.Println("Unknown command")
        }
    }
}

Client Code:

package main

import (
    "fmt"
    "net"
    "time"
)

func main() {
    fmt.Println("Hello, I am a client")

    // Create a new client
    localAddr, err := net.ResolveUDPAddr("udp", ":5011")
    client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
    if err != nil {
        fmt.Println(err)
        return
    }
    defer client3.Close()
    _, err = client3.Write([]byte("start"))
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("Message sent. Sleeping for 5 seconds")
    time.Sleep(time.Second * 5)
    fmt.Println("Sending stop message")
    _, err = client3.Write([]byte("stop"))
    if err != nil {
        fmt.Println(err)
    }
}

CodePudding user response:

Okay, I did a simple hack on the server and added a label Start before creating a context and when I cancel the context, I addded goto label. This means when the task get cancelled it will again create the context and start doings its job

CodePudding user response:

You must take care to what you are doing.

  • avoid data races (done variable is read/write by two different routines without synchronization mechanism) https://go.dev/doc/articles/race_detector

  • dont make a new dialer everytime the program start sending messages to a new client. This will open a new local address and use it to send it to the client. The client will receive messages from another address, which it should normally ignore, because it did not initiated any exchange with that remote.

  • dont mixup client lifetime span with the program context lifetime span. In the code provided a client sending a stop message will trigger the cancel function of the whole program, it will stop all clients. Make a new context for each client, derived from the program context, cancel the related client context upon receiving a stop message.

  • UDP conns are shared by all clients, they must not be stopped from listening incoming packets because the program is serving a client. IE the call to generateMessageToUDP should be executed into another routine.

Following is a revised version accounting for those comments.

A var peers map[string]peer is added to match a remote address with a context. The type peer is defined as struct {stop func();since time.Time}. Upon receiving a start message, the peer is added to the map with a derived context, pctx, pcancel := context.WithCancel(ctx). The new client is then served in a different routine, go generateMessageToUDP(pctx, udpServer, addr), which is bond to the newly created context and the server socket. Upon receiving a stop message, the program performs a lookup peer, ok := peers[addr.String()], it then cancels the associated peer context peer.stop(); delete(peers, addr.String()) and forgets the peer.

package main

import (
    "context"
    "fmt"
    "math/rand"
    "net"
    "time"
)

func generateMessageToUDP(ctx context.Context, conn *net.UDPConn, addr *net.UDPAddr) {
    fmt.Println("Generating message to UDP client", addr)
    go func() {
        for i := 0; ; i   {
            RandomInt := rand.Intn(100)
            d := []byte(fmt.Sprintf("%d", RandomInt))
            conn.WriteTo(d, addr)
            time.Sleep(time.Second * 1)
        }
    }()
    <-ctx.Done()
    fmt.Println("Stopping writing to UDP client", addr)
}

//var addr *net.UDPAddr
//var conn *net.UDPConn

func main() {
    fmt.Println("Hi this is a UDP server")
    udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
    if err != nil {
        fmt.Println("Error: ", err)
    }
    defer func(udpServer *net.UDPConn) {
        err := udpServer.Close()
        if err != nil {
            fmt.Println("Error in closing the UDP Connection: ", err)
        }
    }(udpServer)
    // create a buffer to read data into
    type peer struct {
        stop  func()
        since time.Time
    }
    peers := map[string]peer{}
    buffer := make([]byte, 1024)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    for {
        // read the incoming connection into the buffer
        n, addr, err := udpServer.ReadFromUDP(buffer)
        if err != nil {
            fmt.Println("Error: ", err)
        }
        fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
        if string(buffer[0:n]) == "stop" {
            fmt.Println("Stopped listening")
            peer, ok := peers[addr.String()]
            if !ok {
                continue
            }
            peer.stop()
            delete(peers, addr.String())
            continue
        } else if string(buffer[0:n]) == "start" {
            peer, ok := peers[addr.String()]
            if ok {
                continue
            }
            pctx, pcancel := context.WithCancel(ctx)
            peer.stop = pcancel
            peer.since = time.Now()
            peers[addr.String()] = peer
            // send a response back to the client
            _, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
            if err != nil {
                fmt.Println("Error: ", err)
            }
            // start a routine to generate messages to the client
            go generateMessageToUDP(pctx, udpServer, addr)
        } else if string(buffer[0:n]) == "ping" {
            peer, ok := peers[addr.String()]
            if !ok {
                continue
            }
            peer.since = time.Now()
            peers[addr.String()] = peer
        } else {
            fmt.Println("Unknown command")
        }
        for addr, p := range peers {
            if time.Since(p.since) > time.Minute {
                fmt.Println("Peer timedout")
                p.stop()
                delete(peers, addr)
            }
        }
    }
}
-- go.mod --
module play.ground
-- client.go --
package main

import (
    "fmt"
    "log"
    "net"
    "time"
)

func main() {
    fmt.Println("Hello, I am a client")

    // Create a new client
    localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5011")
    client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
    if err != nil {
        fmt.Println(err)
        return
    }
    defer client3.Close()
    var n int
    n, err = client3.Write([]byte("start"))
    if err != nil {
        fmt.Println(err)
        return
    }
    log.Println(n)
    now := time.Now()
    b := make([]byte, 2048)
    for time.Since(now) < time.Second*10 {
        n, addr, err := client3.ReadFrom(b)
        fmt.Println(n, addr, err)
        if err != nil {
            fmt.Println(err)
            continue
        }
        if addr.String() == "127.0.0.1:5010" {
            m := b[:n]
            fmt.Println("message:", string(m))
        }
    }
    fmt.Println("Sending stop message")
    _, err = client3.Write([]byte("stop"))
    if err != nil {
        fmt.Println(err)
    }
}

In

    go func() {
        for i := 0; ; i   {
            RandomInt := rand.Intn(100)
            d := []byte(fmt.Sprintf("%d", RandomInt))
            conn.WriteTo(d, addr)
            time.Sleep(time.Second * 1)
        }
    }()

I left as an exercise to the reader the writing of the missing select on the context channel to figure out if the routine should exit.

  • Related