Home > front end >  Parallel TCP connection using threads
Parallel TCP connection using threads

Time:11-06

I am trying to build a system that opens parallel TCP sockets using threads. My threads are triggered using message queue IPC , thus every time a packet arrive to the message queue a thread "wakes up" , open TCP connection with remote server and send the packet. My problem is that in Wireshark , I can see the the time it takes to send a file is smaller using threads instead of one connection , but the throughput does not change.
My questions are :

  1. How can i verify my threads working parallely?
  2. What is the best way to check the throughput in wireshark? 3.Is the concept of parallel TCP suppose to increase the throughput?

I am using Virtual machine to run the multithreaded clients. The IDE I am using is Clion , language is C. My code:

#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h>    // for ethernet header
#include<netinet/ip.h>      // for ip header
#include<netinet/udp.h>     // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE 10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;

typedef struct frag
{
    int packet_number;
    int seq;
    uint8_t data[4096];
    bool lastfrag;
} fragma;

void * middlemanThread(void *arg)
{
    ///========================================///
    ///**** Waiting for message queue trigger!:///
    ///=======================================///
    long id = (long)arg;
    id =1;
    mqd_t qd; //queue descriptor
    //open the queue for reading//
    qd= mq_open(QUEUE_NAME,O_RDONLY);
    assert(qd != -1);
    struct mq_attr attr;
    assert(mq_getattr(qd,&attr) != -1);
    uint8_t *income_buf = calloc(attr.mq_msgsize,1);
    uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
    assert(income_buf);
    fragma frag;
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME,&timeout);
    timeout.tv_sec =50;
    //bool closesoc =false;
    printf("Waiting for messages ..... \n\n");
    while(1){
        ///========================================///
        ///**** Open message queue fo receive:///
        ///=======================================///

        if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0){
            printf("Failed to receive message for 50 sec \n");
            //closesoc =true;
            pthread_exit(NULL);
        }
        else{
            cast_buf = income_buf;
            printf("Received successfully , your msg :\n");
            frag.packet_number = *cast_buf;
            cast_buf = (cast_buf   sizeof(int));
            frag.seq = *cast_buf;
            cast_buf = (cast_buf   sizeof(int));
            memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
            cast_buf = cast_buf   Nbytes;
            frag.lastfrag = *cast_buf;
            uint8_t * data = frag.data;
        }
        pthread_mutex_lock(&lock);

        ///========================================///
        ///**** Connecting to Server and send Frament:///
        ///=======================================///

        int size = sizeof(( fragma *)income_buf)->packet_number   sizeof(( fragma *)income_buf)->seq   sizeof(( fragma *)income_buf)->data   sizeof(( fragma *)income_buf)->lastfrag;
        printf("In thread\n");
        int clientSocket;
        struct sockaddr_in serverAddr;
        socklen_t addr_size;

        // Create the socket.
        clientSocket = socket(PF_INET, SOCK_STREAM, 0);

        //Configure settings of the server address
        // Address family is Internet
        serverAddr.sin_family = AF_INET;

        //Set port number, using htons function
        serverAddr.sin_port = htons(8081);

        //Set IP address to localhost
        serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
        memset(serverAddr.sin_zero, '\0', sizeof serverAddr.sin_zero);

        //Connect the socket to the server using the address
        addr_size = sizeof serverAddr;
        connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
        if(send(clientSocket , income_buf , size,0) < 0)
        {
            printf("Send failed\n");
        }
        printf("Trhead Id : %ld \n" , id);
        printf("Packet number : %d \n Seq = %d  \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
        pthread_mutex_unlock(&lock);
        //if(closesoc)
        close(clientSocket);
        usleep(20000);
    }
}
int main(){
    int i = 0;
    pthread_t tid[5];

    while(i< 5)
    {
        if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
            printf("Failed to create thread\n");
        i  ;
    }
    sleep(2);
    i = 0;
    while(i< 5)
    {
        pthread_join(tid[i  ],NULL);
        printf("Thread ID : %d:\n",i);
    }
    return 0;
}

CodePudding user response:

Here is a partial answer to:

3.Is the concept of parallel TCP suppose to increase the throughput?

kinda. it really depends on what the bottleneck is.

First possible bottleneck is cogestion control. TCP sender has a limit on how much packets can be sent at once (before an ACK for the first of the bunch is received), called congestion window. This number should start small and grow over time. Also, if a packet is lost this number is reduced by half and then grow slowly back untill next drop occurs. The limit is however per one TCP connection, so if you spread your data over several parallel connections the overall congestion window (sum of all windows of all flows) will grow faster and drop for lesser amount. (this is a summary, for details you need to know how congestion control works, and it is a big topic). This should happen irrespectively of whether you are using threads or not. You can open several connections in one thread, and, achieve the same effect.

Second possible bottleneck is the network processing in the OS. AFAIK this is an issue starting with 10Gb connections. Maybe 1Gb, but probably not. TCP processing happens in OS not in your application. You may achieve better performance if the processing is spread between processors by the OS (there should be parameters to enable this), and maybe a little better performance because of caches.

If you are reading files from disk, you disk IO can also very well be a bottleneck. In this case I don't think that spreading sending data between different threads is actually going to help.

CodePudding user response:

thus every time a packet arrive to the message queue a thread "wakes up" , open TCP connection with remote server and send the packet

If you're at all concerned about speed or efficiency, don't do this. The single most expensive thing you can do with a TCP socket is the initial connection. You're doing a 3-way handshake just to send a single message!

Then, you're holding a global mutex while doing this entire operation - which, again, is the slowest operation in your program.

The current design is effectively single-threaded, but in the most complicated and expensive possible way.

I can see the the time it takes to send a file is smaller using threads instead of one connection , but the throughput does not change

I have no idea what you're actually measuring, and it's not at all clear that you do either. What is a file? One fragment? Multiple fragments? How big is it compared to your MTU? Have you checked that the fragments are actually received in the correct order, because it looks to me like the only possible parallelism is the spot where that could break.

How is it possible to have lower latency and unchanged throughput for a single file?

How can i verify my threads working parallely?

If you see multiple TCP connections in wireshark with different source ports, and their packets are interleaved, you have effective parallelism. This is unlikely though as you explicitly prohibited it with your global mutex!

What is the best way to check the throughput in wireshark?

Don't. Use wireshark for inspecting packets, use the server to determine throughput. That's where the results actually matter.

3.Is the concept of parallel TCP suppose to increase the throughput?

Why did you implement all this complexity if you don't know what it's for?

There's a good chance a single thread (correctly coded with no spurious mutex thrashing) can saturate your network, so: no. Having multiple I/O threads is generally about conveniently partitioning your logic and state (ie, having one client per thread, or different unrelated I/O subsystems in different threads), rather than performance.


If you want to pull packets off a message queue and send them to TCP, the performant way is to:

  1. use a single thread just doing this (your program may have other threads doing other things - avoid synchronizing with them if possible)
  2. open a single persistent TCP connection to the server and don't connect/close it for every fragment
  3. that's it. It's much simpler than what you have and will perform much better.

You can realistically have one thread handling multiple different connections, but I can't see any way this would be useful in your case, so keep it simple.

  • Related