c ++ – multithreaded local server

I write several local servers that have almost the code in the main.cpp. Enjoy comments, suggestions for improvement and especially notes on potential memory leaks Thank you!

#understand 
#understand 
#understand 
#understand 
#understand 

#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" // MS :: log ()



void pullRequests ();


const std :: string pathToSocket = "/var/run/SomeServer.sock";
const std :: string SERVICE_NAME = "SomeServer";

RequestManager Service; // Is the current treatment of the request


std :: unordered_map RequestQueue;
std :: mutex requestQueue_mutex;
std :: condition_variable processorsThreadSwitch;
bool gotNewRequests = false;



int main ()
{
UdsServer application; // Server listening on a Unix domain socket

try
{
app.createServer (pathToSocket);
}
catch (const std :: string & err)
{
MS :: log (SERVICE_NAME, "Can not start service." Error: "+ err, MS :: MessageType :: FatalException);

return -1;
}

unsigned n_concThreads = std :: thread :: hardware_concurrency ();

if (! n_concThreads) // if the request failed ...
{
std :: ifstream cpuinfo ("/ proc / cpuinfo");

n_concThreads = std :: count (std :: istream_iterator(Cpuinfo)
std :: istream_iterator(),
std :: string ("processor"));

if (! n_concThreads)
n_concThreads = 6; // ~ number of processor cores. TODO: Make the number of processes / worker threads configurable with the help of a configuration file
}

for (int i = 0; i < n_concThreads; ++i)
    {
        std::thread t (pullRequests);
        t.detach();
    }



    while ((int clientConnection = app.newConnectionEstablished()) > -1) // Use accept () internally
{
std :: string command = app.getMsg (clientConnection); // Use read () internally

if (command.empty ())
app.closeConnection (clientConnection);
else if (command == "SHUTDOWN")
{
app.closeConnection (clientConnection);

returns 0;
}
other
{
{// anonymous reach just to get rid of the lock before notifying a thread
std :: lock_guard writeLock (requestQueue_mutex);

RequestQueue[clientConnection] = std :: move (command);

gotNewRequests = true;
}

processorsThreadSwitch.notify_one (); // nothing happens here if all the threads are busy
}
}
}



void pullRequests ()
{
UnixDomainSocket uds;

std :: unique_lock writeLock (requestQueue_mutex);

while (true) // let the thread run "forever"
{
while (! gotNewRequests)
processorsThreadSwitch.wait (writeLock);


std :: unordered_map queueCopy (std :: move (requestQueue));
requestQueue.clear ();

gotNewRequests = false;

writeLock.unlock (); // Do not let other threads wait when these threads do not need to access shared data anymore


if (queueCopy.empty ())
Carry on;
else if (queueCopy.size () == 1)
{
std :: string response = service.pullRequests (queueCopy.cbegin () -> second);


if (response.length ())
{
auto sendResult = uds.sendMsg (queueCopy.cbegin () -> first, response);

if (! sendResult.isValid ())
MS :: log (SERVICE_NAME, "Unable to send response to request:" + queueCopy.begin () -> second, MS :: MessageType :: Error);
}

if (! uds.closeConnection (queueCopy.begin () -> first))
MS :: log (SERVICE_NAME, "Unable to close the connection.", MS :: MessageType :: Error);
}
otherwise // Multiplex
{
std :: unordered_map <std :: string, std :: vector> multiplexed requests;

for (auto & request: queueCopy)
multiplexed requests[std::move(request.second)].push_back (request.first);

for (const auto & request: multiplexedRequests)
{
std :: string response = service.pullRequests (request.first);

if (response.length ())
for (auto socket: request.second)
{
auto sendResult = uds.sendMsg (socket, response);

if (! sendResult.isValid ())
MS :: log (SERVICE_NAME, "Unable to send response for query:" + request.first, MS :: MessageType :: Error);

if (! uds.closeConnection (socket))
MS :: log (SERVICE_NAME, "Unable to close the connection.", MS :: MessageType :: Error);
}
}
}


writeLock.lock ();
}
}