2017-08-10 71 views
0

多个连接考虑,你需要保持与只是ocassionally发送命令的设备256个的TCP连接的情况。我想同时做到这一点(它需要阻止,直到它得到响应),我试图使用QThreadPool的这个目的,但我有一些怀疑,如果这是可能的。处理使用QThreadPool

我试图用QRunnable,但我不知道插座将如何线程之间的行为(即他们在创建插座应只用于线程?)

我也担心这个解决方案的效率如果有人能够提出一些替代方案,而不一定使用QT,我会很高兴。

下面我张贴的代码的某些代码段。

class Task : public QRunnable { 

    Task(){ 
     //creating TaskSubclass instance and socket in it 
    } 

private: 
    TaskSubclass    *sub; 

    void run() override { 
     //some debug info and variable setting... 
     sub->doSomething(args); 
     return; 
    } 
}; 

class TaskSubclass { 
    Socket   *sock;   // socket instance 
    //... 
    void doSomething(args) 
    { 
     //writing to socket here 
    } 
} 

class MainProgram : public QObject{ 
    Q_OBJECT 
private: 
    QThreadPool *pool; 
    Task *tasks; 

public: 
    MainProgram(){ 
     pool = new QThreadPool(this); 
     //create tasks here 
    } 

    void run(){ 
     //decide which task to start 
     pool->start(tasks[i]); 
    } 
}; 
+1

请问为什么你要使用多线程的解决方案? Qt的事件系统能够通过一个线程处理数百个Tcp连接,而不会出现麻烦,或者你的“套接字数据处理程序” - 逻辑阻塞主线程? – Spiek

+0

感谢评论,我刚刚编辑帖子。它需要阻塞直到得到响应。 – Vido

+0

有没有你不使用信号和插槽机制的原因?你可以使用QTcpSocket的readyRead-Signal并将它连接到一个插槽。 – Spiek

回答

0

我对这个问题的方案最是通过复用select()您的插座。这样你就不需要创建额外的线程,而且这是一个“非常POSIX”的方式。

见例如请参见本教程:

http://www.binarytides.com/multiple-socket-connections-fdset-select-linux/

或相关问题:

Using select(..) on client

+0

我想通过使用像QTcpSockets或boost套接字这样的包装来保持更高的抽象层,所以我不需要在原始内存上操作。 – Vido

+0

思考过后,我可以从这些wrappes中仅使用select()的那一刻获取原始描述符,但我想使它平行。这仍然是很好的选择,谢谢你。 – Vido

0

由于OMD_AT有媒体链接指出,最好的解决办法是使用选择()并让内核做的工作适合你:-)

在这里你有作为的一个例子ync方法和Syncron多线程方法。

在这个例子中,我们创建10个连接到一个谷歌的互联网服务和做一个简单的GET请求到服务器,我们测量所需的每一种方法的所有连接多长时间才能收到来自谷歌服务器的响应。

请注意您应该使用更快的网络服务器进行真正的测试,例如本地主机,因为网络延迟对结果有很大影响。

#include <QCoreApplication> 
#include <QTcpSocket> 
#include <QtConcurrent/QtConcurrentRun> 
#include <QElapsedTimer> 
#include <QAtomicInt> 

class Task : public QRunnable 
{ 
    public: 
     Task() : QRunnable() {} 
     static QAtomicInt counter; 
     static QElapsedTimer timer; 
     virtual void run() override 
     { 
      QTcpSocket* socket = new QTcpSocket(); 
      socket->connectToHost("www.google.com", 80); 
      socket->write("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n"); 
      socket->waitForReadyRead(); 
      if(!--counter) { 
       qDebug("Multiple Threads elapsed: %lld nanoseconds", timer.nsecsElapsed()); 
      } 
     } 
}; 

QAtomicInt Task::counter; 
QElapsedTimer Task::timer; 

int main(int argc, char *argv[]) 
{ 
    QCoreApplication app(argc, argv); 

    // init 
    int connections = 10; 
    Task::counter = connections; 
    QElapsedTimer timer; 

    /// Async via One Thread (Select) 

    // handle the data 
    auto dataHandler = [&timer,&connections](QByteArray data) { 
     Q_UNUSED(data); 
     if(!--connections) qDebug(" Single Threads elapsed: %lld nanoseconds", timer.nsecsElapsed()); 
    }; 

    // create 10 connection to google.com and send an http get request 
    timer.start(); 
    for(int i = 0; i < connections; i++) { 
     QTcpSocket* socket = new QTcpSocket(); 
     socket->connectToHost("www.google.com", 80); 
     socket->write("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n"); 
     QObject::connect(socket, &QTcpSocket::readyRead, [dataHandler,socket]() { 
      dataHandler(socket->readAll()); 
     }); 
    } 


    /// Async via Multiple Threads 

    Task::timer.start(); 
    for(int i = 0; i < connections; i++) { 
     QThreadPool::globalInstance()->start(new Task()); 
    } 

    return app.exec(); 
} 

打印:

Multiple Threads elapsed: 62324598 nanoseconds 
    Single Threads elapsed: 63613967 nanoseconds 
+0

正如我在评论中提到的那样,我真的想要利用计算机中的许多内核,这些内核将专用于此软件。 – Vido

+0

我已添加多线程处理支持 – Spiek

+0

在主线程中创建套接字是否安全并在其中对其执行一些操作?为了提高效率,您还可以比较此解决方案和select()方法的性能? (是否有很大的区别) – Vido

0

虽然,答案已经被接受了,我想和大家分享我的)

我从你的问题明白了什么:有256当前活动连接,您不时发送一个请求(命名为“command”)给其中一个请求并等待响应。同时,要进行这个过程中多线程和,虽然你说:“这需要阻塞,直到它获得响应”,我想你暗示阻止它处理请求 - 响应进程中的线程,但不是主线程。

如果我确实了解这个问题吧,这里是我建议使用Qt来做到这一点:

#include <functional> 

#include <QObject>   // need to add "QT += core" in .pro 
#include <QTcpSocket>  // QT += network 
#include <QtConcurrent>  // QT += concurrent 
#include <QFuture>   
#include <QFutureWatcher> 

class CommandSender : public QObject 
{ 
public: 
    // Sends a command via connection and blocks 
    // until the response arrives or timeout occurs 
    // then passes the response to a handler 
    // when the handler is done - unblocks 
    void SendCommand(
     QTcpSocket* connection, 
     const Command& command, 
     void(*responseHandler)(Response&&)) 
    { 
     const int timeout = 1000; // milliseconds, set it to -1 if you want no timeouts 

     // Sending a command (blocking) 
     connection.write(command.ToByteArray()); // Look QByteArray for more details 
     if (connection.waitForBytesWritten(timeout) { 
      qDebug() << connection.errorString() << endl; 
      emit error(connection); 
      return; 
     } 

     // Waiting for a response (blocking) 
     QDataStream in{ connection, QIODevice::ReadOnly }; 
     QString message; 
     do { 
      if (!connection.waitForReadyRead(timeout)) { 
       qDebug() << connection.errorString() << endl; 
       emit error(connection); 
       return; 
      } 
      in.startTransaction(); 
      in >> message; 
     } while (!in.commitTransaction()); 

     responseHandler(Response{ message }); // Translate message to a response and handle it 
    } 

    // Non-blocking version of SendCommand 
    void SendCommandAsync(
     QTcpSocket* connection, 
     const Command& command, 
     void(*responseHandler) (Response&&)) 
    { 
     QFutureWatcher<void>* watcher = new QFutureWatcher<void>{ this }; 
     connect(watcher, &QFutureWatcher<void>::finished, [connection, watcher]() 
     { 
      emit done(connection); 
      watcher->deleteLater(); 
     }); 

     // Does not block, 
     // emits "done" when finished 
     QFuture<void> future 
      = QtConcurrent::run(this, &CommandSender::SendCommand, connection, command, responseHandler); 
     watcher->setFuture(future); 
    } 

signals: 
    void done(QTcpSocket* connection); 
    void error(QTcpSocket* connection); 
} 

现在你可以使用从线程池取出一个单独的线程发送命令到插座:下引擎盖QtConcurrent::run()使用由Qt为您提供的QThreadPool的全局实例。该线程阻塞,直到它得到一个响应,然后用responseHandler来处理它。同时,管理所有命令和套接字的主线程将保持畅通。只需抓住done()信号即可知道响应已成功接收和处理。

有一点需要注意:异步版本只有在线程池中有空闲线程时才发送请求,否则就等待它。当然,这是任何线程池的行为(这正是这种模式的要点),但不要忘记这一点。

另外我写的没有Qt的代码很方便,所以可能会包含一些错误。

编辑:事实证明,这不是线程安全的,因为套接字在Qt中不可重入。

你可以做的是将一个互斥量与一个套接字关联,并在每次执行其功能时将其锁定。这可以很容易地创建QTcpSocket类的包装。请纠正我,如果我错了。

+0

谢谢,它看起来不错,但如果我们将指针传递给QtConcureent.run()中的套接字,不应该存在线程安全问题?我听说套接字只能用于创建它们的线程,并且在不同线程之间传递它们是不安全的(我不知道应该创建哪些套接字,因为主线程会不安全?)。如果我错了,或者你有一些解释说这是安全的,请纠正我。 – Vido

+0

你是对的,它不是线程安全的。原因是 - Qt中的套接字不可重入(我忘记了,很抱歉)。所以我遇到了这种情况,我尝试管理套接字从主线程连接/断开连接并读取/写入另一个工作线程。我的错。 **但是**它导致了一个合乎逻辑的结论,即不管什么(至少在Qt中),套接字上的所有**操作都必须在同一个线程中执行。 – WindyFields