2017-08-18 79 views
0

我正在尝试使用boost单元测试来测试我的boost asio套接字侦听器。监听器的目的是简单地监听一个端口并读取所有内容并将其保存到一个队列中并发回一个http响应头。即使长时间等待,C++线程也不会加入

作为第一步,我创建了一个客户端向侦听器发送消息并读取来自侦听器的响应消息。我还创建了一个线程来启动侦听器。主线程将发送和接收来自Listener的消息。我可以在客户端和列表程序之间发送和接收消息。但是当我尝试加入时,它并没有加入并继续等待加入(我猜)。

请帮助我出了什么问题。

在此先感谢。

Listener.hpp

#ifndef Listener_hpp 
#define Listener_hpp 
#include <iostream> 
#include <stdio.h> 
#include <atomic> 
#include <boost/asio.hpp> 
#include "sharedQueue.hpp" 

using namespace std; 
using namespace boost; 
using boost::asio::ip::tcp; 

class Listener{ 

private: 
int port; 
std::atomic<bool> m_stop; 
boost::system::error_code error; 
boost::asio::io_service io_service; 
std::shared_ptr<sharedQueue> queue_ptr; 
void saveMessageToQueue(std::string,std::string); 
Listener(int portToListen, std::shared_ptr<sharedQueue> queue_unique_ptr); 
Listener(const Listener&); 
Listener& operator=(const Listener&); 
public: 
static Listener& getInstance(int portToListenOn,   std::shared_ptr<sharedQueue> queue_unique_ptr); 
void listen(); 
}; 
#endif /* Listener_hpp */ 

Listener.cpp

#include "Listener.hpp" 
Listener::Listener(int portToListen, std::shared_ptr<sharedQueue> queue_unique_ptr):port(portToListen), queue_ptr(queue_unique_ptr),m_stop(false) 
{;} 
Listener& Listener::getInstance(int portToListenOn, std::shared_ptr<sharedQueue> queue_unique_ptr) 
{ 
    static Listener instance(portToListenOn, queue_unique_ptr); 
    return instance; 
} 
void Listener::stopListen(){ 
m_stop = true; 
} 
void Listener::listen() 
{ 
    try{ 
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port)); 
    while(!m_stop) 
    { 
     std::cout<<"Server is listening on port "<<port<<std::endl; 
     boost::asio::ip::tcp::socket socket(io_service); 
     acceptor.accept(socket); 
     std::cout<<"Connection Request Accepted "<<std::endl; 
     std::array<char, 512> buf; 
     socket.read_some(boost::asio::buffer(buf), error); 
     std::string responseBack =" HTTP:300"; 
     boost::asio::write(socket, boost::asio::buffer(responseBack, responseBack.size()), error); 
    } 
    } 
    catch (std::exception& e) 
    { 
    std::cerr <<"Listener Exception : "<< e.what() << std::endl; 
    exit(0); 
    } 
    } 

sharedQueue.hpp

#ifndef SharedQueue_hpp 
    #define SharedQueue_hpp 
    #include <stdio.h> 
    #include <iostream> 
    class sharedQueue{ 
    public: 
    sharedQueue(){}; 
    void pushToQueue(std::string str){}; 
    }; 
    #endif /* SharedQueue_hpp */ 

ClientTestHelper.hpp

#ifndef ClientTestHelper_hpp 
    #define ClientTestHelper_hpp 

    #include <iostream> 
    #include <boost/asio.hpp> 
    using namespace std; 
    class ClientTestHelper 
    { 
    string address = ""; 
    public: 
    ClientTestHelper(std::string addressToPush){ 
    this->address = addressToPush; 
    }; 
    ~ClientTestHelper(){}; 
    std::string sendMessage(string message); 
    }; 

    #endif /* ClientTestHelper_hpp */ 

ClientTestHelper.cpp

#include "ClientTestHelper.hpp" 
std::string ClientTestHelper::sendMessage(string message){ 
boost::asio::io_service io_service; 
boost::asio::ip::tcp::resolver resolver(io_service); 
boost::asio::ip::tcp::resolver::query query(address, boost::asio::ip::resolver_query_base::numeric_service); 
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); 
boost::asio::ip::tcp::socket socket(io_service); 
boost::asio::connect(socket, endpoint_iterator); 
boost::asio::streambuf writeBuffer; 
boost::asio::streambuf readBuffer; 
std::string str = message; 
boost::system::error_code error; 
boost::asio::write(socket, boost::asio::buffer(str, str.size()), error); 
std::array<char,1024> buf; 
socket.read_some(boost::asio::buffer(buf), error); 
std::string respo = buf.data(); 
cout<<respo<<std::endl; 
return respo; 
} 

ListenerTest.hpp

#ifndef ListenerTest_hpp 
#define ListenerTest_hpp 

#define BOOST_TEST_DYN_LINK 
#define BOOST_TEST_MAIN 

#include <iostream> 
#include <stdio.h> 
#include <thread> 
#include <boost/test/unit_test.hpp> 
#include "Listener.hpp" 
#include "ClientTestHelper.hpp" 
#include "SharedQueue.hpp" 

struct ListenerTestFixture{ 
public: 
void startListen(Listener &receiver){ 
    receiver.listen(); 
} 
std::shared_ptr<sharedQueue> myQueue = std::make_shared<sharedQueue>(); 
Listener &receiver = Listener::getInstance(8282, myQueue); 
void doRun(){ 
    receiver.listen(); 
} 
ClientTestHelper *client = new ClientTestHelper("8282"); 
ListenerTestFixture(){}; 
~ListenerTestFixture(){}; 
}; 
#endif /* ListenerTest_hpp */ 

ListenerTest.cpp

#include "ListenerTest.hpp" 

BOOST_FIXTURE_TEST_CASE(connectionTest, ListenerTestFixture) 
{ 
std::thread t1(&ListenerTestFixture::doRun, *this); 
std::string response = ""; 
response = client->sendMessage("SomeRandom Messages \r\n\r\n\r\n"); 
//receiver.~Listener(); 
receiver.stopListen(); 
std::cout<<"Response Received "<<response; 
t1.join(); 
std::cout<<"Finished Testing "<<endl; 
} 

控制台输出对于澄清:

Running 1 test case... 
Server is listening on port 8585 
Connection Request Accepted 
Server is listening on port 8585 
    HTTP:300 

在这个执行没有停止之后等待加入。

编辑:修改后的监听器类添加成员m_stop和stopListen(),更改为(;;)成while(!m_stop)。包含头文件。

+3

为什么你手动调用一个对象的析构函数? – Blacktempel

+1

为了加入,线程函数必须退出。 “听”会退出吗? –

+0

请提供示例代码SSCCE。由于错误 – sehe

回答

2

receiver.~Listener();不符合你的想法。 该行代码无法工作,必须出来。

它所做的是导致在receiver的所有成员上调用析构函数。它不会(直接)中断对listen()的呼叫。 可能的结果是执行listen()的线程将执行未定义的行为,几乎肯定意味着无法以通知join()的方式结束。

总之,您的主线程将永远等待,因为您已从receiver()下拉出地毯。

即使这种方式工作,编译器也会在main()的末尾添加一个隐含的receiver.~Listener()调用,并且如果偶然的话它可能会崩溃。

正常的解决方案是将成员添加到Listenerstd::atomic<bool> m_stop;(或者,如果你喜欢,如果你正在使用升压Boost当量)它初始化到false在构造函数(不listen()),然后调用成员称为stop()这在拨打join()之前立即将其设置为true

然后您将listen()中的非终止循环替换为while(!stop)

这样,你的主线程就可以通知听到循环结束时有序的结论。

没有简单的方法让线程“正好”或“突然”停止干净。你无法预测它在某个过程中的意义或者它正在使用的资源。 它可能会在库调用中损坏堆,操作系统,文件系统,任何东西。

另外Java有一个叫做stop()的线程方法,它在添加之后立即被弃用。没有安全的方式来实施或使用它。


这是罕见的,你应该直接调用析构函数。我知道的唯一用例是你已经使用了放置位置new,并且不希望将空间返回到堆中。在现代C中更为罕见,因为像容器回收空间这样的东西全都交给std容器,如std::vector<>

std::vector<>的大多数实现调用析构函数,因为如果您调整矢量大小,它需要破坏对象但不重新分配内部存储。

你几乎不应该在静态分配的对象上调用析构函数。

+0

Hi @Persixty,谢谢你的回复。我已经实现你的逻辑。但问题仍然存在。据我所知,从控制台输出中,执行'listen()'方法的第二个线程正在等待接受新连接,即'已经在循环内部'。在执行这些代码之后,只有这个线程检查'm_stop'的值。这就是为什么通过将'm_stop'设置为'false'来终止循环的原因。 – Kid

+0

@kid你看过我对OP的评论吗? Boost库中显然有办法迫使'等待'活动停止使用'cancel()'。你尝试过吗?它也似乎有办法使对象无阻塞。正如我所说我不是Boost专家。 – Persixty

+0

对于非常晚的回复感到抱歉,我为了测试而有一个想法(由于时间限制)。 'BOOST_FIXTURE_TEST_CASE(connectionTest,pushMessageReceiverFixture) { std :: thread t1(&pushMessageReceiverFixture :: doRun,* this); std :: string response =“”; response = pushServer-> pushEmptyMessage(“SomeRandom Messages \ r \ n \ r \ n \ r \ n”); receiver。〜pushMessageReceiver(); std :: cout <<“收到的响应”<<响应; pushServer-> pushEmptyMessage(“停止收听\ r \ n \ r \ n \ r \ n”); t1.join(); std :: cout <<“Finished Testing”<< endl; }' – Kid

0

,如果你正在运行ASIO功能,如

acceptor.accept(socket); 

,他们将阻止,除非他们已经完成或有错误或异常返回(除了自己的基础套接字已被设置为非阻塞模式!)。因此,如果m_stop已设置或没有设置,则不会进行测试。

你需要取消stop()函数内的任何运行的操作,即:

void Listener::stopListen(){ 
    std::error_code _ignore; 
    m_stop = true; 
    m_listener.cancel(_ignore); // pass in an ec, it may throw otherwise 
    m_sock.cancel(_ignore); 
} 

当然,你需要使嵌入类里面的IO对象(插座/受体)使它们在stopListen()内可到达。

但是,说实话,我强烈推荐编写与asynchroneous模式的代码,并使用

m_acceptor.async_accept(m_sock, [&](const std::error_code &ec) { 
    if (ec) { 
     return handle_error(ec, "accept"); // bail out 
    } 

    handle_new_client(std::move(m_sock)); // move out socket 
    // restart listen here 
}); 

课程的线程的方式在更多的潜在的竞争条件,这是有时很难拉正确处理。

克里斯科尔霍夫有一些echo server examples具有不同的接受方式从阻塞到异步,显示工作模式。

干杯, A.

S:read_some()将最有可能不是你想要的。它只是返回已经到达您的计算机TCP堆栈的前几个字节。但是不能保证完整的请求可用,所以你需要多次调用async_read。如果协议是http,则asio :: async_read_until会更好,并使用“\ r \ n”分隔符。