Home > Software design >  Is there any way to send and recive messages using one connection with socket in Kotlin?
Is there any way to send and recive messages using one connection with socket in Kotlin?

Time:09-10

From what I know already, one socket should handle multiple clients (connections) and one client (connection) should handle send and recive(listen) actions. I can achive this by running a few clients in CLI on the golang server.

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "log"
    "net"
)

var clients = make(map[string]net.Conn) //Connected 
var registredClients = make(map[int]net.Conn) //Recognized by id
var messages = make(chan message)

type message struct {
    text        string
    address     string
    RecipientID int
    senderId    int
}

type comunicateJson struct {
    Mode        int    `json:"mode"`
    SenderID    int    `json:"senderId"`
    RecipientID int    `json:"recipientId"`
    Message     string `json:"message"`
    Param1      int    `json:"param1"`
    Param2      int    `json:"param2"`
}

type uRegisterJson struct {
    ClientId int `json:"clientId"`
}

func main() {

    listen, err := net.Listen("tcp", "XXX.XXX.XXX.XXX:9999")

    if err != nil {
        log.Fatal(err)
    }

    go broadcaster()

    for {
        conn, err := listen.Accept()
        if err != nil {
            log.Print(err)
            continue
        }
        go handle(conn)
    }

}
func handle(conn net.Conn) {
    clients[conn.RemoteAddr().String()] = conn

    fmt.Println("CONNECT "   conn.RemoteAddr().String())
    

    input := bufio.NewScanner(conn)
    for input.Scan() {

        fmt.Println("Scan "   conn.RemoteAddr().String())
        fmt.Println(input.Text())
        messages <- newMessage(input.Text(), conn)
    }
    

    //Delete client form map
    rmUser(conn)
    delete(clients, conn.RemoteAddr().String())
    conn.Close() // NOTE: ignoring network errors
}

func isJSON(s string) bool {
    var js map[string]interface{}
    return json.Unmarshal([]byte(s), &js) == nil
}

func rmUser(con net.Conn) {
    for i, s := range registredClients {
        if s == con {
            delete(registredClients, i)
        }
    }
}

func newMessage(msg string, conn net.Conn) message {
    fmt.Println("Proccces message")
    fmt.Println(msg)
    if isJSON(msg) {
        res := comunicateJson{}
        json.Unmarshal([]byte(msg), &res)

        res2 := uRegisterJson{}
        json.Unmarshal([]byte(msg), &res2)

        if res.RecipientID != 0 && res.SenderID != 0 { //NOTE: parssing individual message

            addr := conn.RemoteAddr().String()
            senderId := res.SenderID
            reciverId := res.RecipientID

            
            fmt.Printf("Message from clientId %d to clientId %d\n", res.SenderID, res.RecipientID)
            return message{
                text:        msg,
                address:     addr,
                senderId:    senderId,
                RecipientID: reciverId,
            }


        } else if res2.ClientId != 0 { //NOTE: client register

            fmt.Printf("New clientId %d\n", res2.ClientId)
            registredClients[res2.ClientId] = conn
        }

    }

    addr := conn.RemoteAddr().String()

    return message{
        text:    msg,
        address: addr,
    }

}

func broadcaster() {
    for {
        select {
        case msg := <-messages:
            /*
                for i, v := range registredClients {
                    if msg.RecipientID == i {
                        fmt.Fprintln(v, msg.text)
                    }

                }
            */
            for _, conn := range clients {
                if msg.address == conn.RemoteAddr().String() { //NOTE: ignoring sender
                    continue
                }
                fmt.Fprintln(conn, msg.text)
            }

        }
    }
}

The interesting part is the commented loop in the broadcaster. Every client who shares his ID (json) can became a registredClient. If the messages recipientId matches the clients id the message is send to that specific client.

My problem is that I cannot build such client in Kotlin. The only way I can send a message is by setting socket.close() and my server has no reason to register such a client. For obvious reasons, I do not want to send a message to every connected clinet.

For sending messages to socket I'am using a kotlin service executed in Fragment/Activity

class SendMessageService: Service() {



    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        var messageType = intent?.getStringExtra("messageType")
        when(messageType) {
            "m1" -> messageOne(startId)
            "m2" -> messageTwo(startId)
        }
        return super.onStartCommand(intent, flags, startId);
    }



    private fun messageOne(startId: Int) {
        val jsonObj = JSONObject()
        jsonObj.put("mode",1)
        jsonObj.put("senderId",getUserId())
        jsonObj.put("recipientId",100)
        jsonObj.put("message","messageOne")
        jsonObj.put("param1",1)
        jsonObj.put("param2",2)


        CoroutineScope(Dispatchers.IO).launch() {
            val socket = Socket(GlobalVars().socketHost, GlobalVars().socketPort)
            socket.outputStream.write(jsonObj.toString().toByteArray())
            socket.close()
        }
        stopSelf(startId)
    }



    private fun messageTwo(startId: Int) {
        val jsonObj = JSONObject()
        jsonObj.put("mode",1)
        jsonObj.put("senderId",getUserId())
        jsonObj.put("recipientId",100)
        jsonObj.put("message","messageTwo")
        jsonObj.put("param1",1)
        jsonObj.put("param2",2)
        CoroutineScope(Dispatchers.IO).launch() {
            val socket = Socket(GlobalVars().socketHost, GlobalVars().socketPort)
            socket.outputStream.write(jsonObj.toString().toByteArray())
            socket.close()
        }
        stopSelf(startId)
    }


    ...
}

I don't realy care about the socket.close() here, as long as the message contains the receiverId, the server should know where to deliver it.

For reciving messages from socket am using a Kotlin CoroutineWorker witch is executed in anather Worker.

class SocketWorker(
    context: Context,
    workerParams: WorkerParameters
) : CoroutineWorker(context, workerParams) {
    var notifyid : Int = 0

    override suspend fun doWork(): Result = withContext(Dispatchers.IO) {

        val jsonObj = JSONObject()
        jsonObj.put("clientId",getUserId())

        try {
            var socket = Socket(GlobalVars().socketHost, GlobalVars().socketPort)
            PrintWriter(socket.outputStream, true).write(jsonObj.toString()) //This line does not do anything.
            //socket.close()
            var text = ""
            while (true) {
                text = BufferedReader(InputStreamReader(socket.inputStream)).readLine()
                if (text.contains("recipientId")){
                    val gson = Gson()
                    val socketObj: SocketMassage = gson.fromJson(text, SocketMassage::class.java)
                    if (socketObj.recipientId == getUserId()) { //Socket should be sending to one Client
                        generateNotification("Title", "Worker")
                    }
                }
            }

            Result.failure()



        } catch (ex: IOException) {
            Log.e("felixb", "Socket Error Worker Down")
            Result.failure()
        }

    }

    ...
}

I am trying here to send a clients id that can be captured by the server and turn the socket thing to listening for new messages. If I close the socket the server will remove the client from the registredClients and there will be no message for that specific client provided by the servers brodcaster commented loop.

Is there any way to send and recive messages using one connection with socket in Kotlin?

Ktor runBlocking new aproach

class SocketWorker(appContext: Context, workerParams: WorkerParameters): Worker(appContext, workerParams) {

    var notifyid : Int = 0


    @WorkerThread
    override fun doWork(): Result {
        Log.e("felixb","WORK BEFORE runBlocking")
        return runBlocking {
            Log.e("felixb","WORK in runBlocking")
            val jsonObj = JSONObject()
            jsonObj.put("clientId",getUserId())
            val selectorManager = SelectorManager(Dispatchers.IO)
            val socket = aSocket(selectorManager).tcp().connect(GlobalVars().socketHost, GlobalVars().socketPort)

            val receiveChannel = socket.openReadChannel()
            val sendChannel = socket.openWriteChannel(autoFlush = true)

            launch(Dispatchers.IO) {
                while (true) {
                    val incomingMessage =  receiveChannel.readUTF8Line()
                    if (incomingMessage != null) {
                        Log.e("felixb",incomingMessage)
                    } else {

                        Log.e("felixb","Server closed a connection")
                        socket.close()
                        selectorManager.close()
                        exitProcess(0)
                    }
                }
            }

            while (true) {
                Log.e("felixb","Send message ")
                val greeting = jsonObj.toString()
                sendChannel.writeStringUtf8(greeting)
            }

            Result.failure()
        }
    }

    ...


}

I've change the doWork body for anather aproach exchanging messages with socket using ktor and runBlocking. The things got creazy in ktor. I've got multiple connection around hundred (server log) and each of them is immidietly closed and after that there are caming multiple messages sended (client log) and never reach the server couz is no connection. Seems the ktor is trying to send a message with no connection. I belive that my ktor client is not compatible with the go server.

android log

In the android log the red text is from messageOne and the white is from messageTwo i think that is a proof that the methods ware executed.

server log

In the server log there are two consoles the lef one is showing what is going on in the server and the right one is the for the go client reciving all messages captured by the server while for now the server is sendig them to every connected client.

The blue arrow shows the go client The red arrow shows the messageOne android client The green arrow shows the messagaTwo android client

CodePudding user response:

Add a newline:

socket.outputStream.write((jsonObj.toString() "\n").toByteArray())

And add a flush().

  • Related