2017-07-25 860 views
-1

我们在使GRPC在RHEL 7下运行方面取得了良好进展。
我们的应用程序有一个相当复杂的结构,其中三层嵌套与外层实现“oneof”关键词。
我们发现我们所有的其他结构运行良好,但是这个给了我们一个代码为14的RPC失败。
我们尽可能简化了这部分应用程序,希望能够轻松地重新编译和运行。我们的简单GRPC示例程序中的RPC失败代码14

这里的.proto文件,更新,以适应乌利的问题:

syntax = "proto3"; 

option java_multiple_files = true; 
option java_package = "io.grpc.examples.debug"; 
option java_outer_classname = "DebugProto"; 
option objc_class_prefix = "DEBUG"; 

package DEBUGpackage; 

service DEBUGservice { 
    rpc DEBUG_val_container_get (input_int32_request) returns (outer_container) {} 
} 

message input_int32_request { 
    int32 ival = 1; 
} 

message inner_container { 
    repeated uint32 val_array = 1; 
} 

message middle_container { 
    inner_container vac = 1; 
} 

message other_container { 
    int32 other_val = 1; 
} 

message outer_container { 
    oneof reply { 
    middle_container r1 = 1; 
    other_container r2 = 2; 
    } 
} 

(请注意,在这个原型代码的Java行只是在那里,因为他们是在GRPC网站的例子我们的代码是完全是C++,没有java。不知道这是否意味着我们可以在没有这些“选项java ...”行的情况下完成)。

这是我们的客户端源代码:

#include <iostream> 
#include <memory> 
#include <string> 

#include <grpc++/grpc++.h> 
#include <grpc/support/log.h> 
#include <thread> 
#include <unistd.h> 

#include "debug.grpc.pb.h" 

using grpc::Channel; 
using grpc::ClientAsyncResponseReader; 
using grpc::ClientContext; 
using grpc::CompletionQueue; 
using grpc::Status; 
using DEBUGpackage::input_int32_request; 
using DEBUGpackage::inner_container; 
using DEBUGpackage::middle_container; 
using DEBUGpackage::outer_container; 
using DEBUGpackage::DEBUGservice; 

class DEBUGClient { 
    public: 

    explicit DEBUGClient(std::shared_ptr<Channel> channel) 
      : stub_(DEBUGservice::NewStub(channel)) {} 

    void DEBUG_val_container_get() { 
     std::cout << "in DEBUG_val_container_get" << std::endl; 
     // Data we are sending to the server 
     input_int32_request val; 
     val.set_ival(0); 
     AsyncClientCall* call = new AsyncClientCall; 
     call->response_reader = stub_->AsyncDEBUG_val_container_get(&call->context, val, &cq_); 
     call->response_reader->Finish(&call->reply_, &call->status, (void*)call); 

    } 

    void AsyncCompleteRpc() { 
     void* got_tag; 
     bool ok = false; 

     while (cq_.Next(&got_tag, &ok)) { 
      AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); 
      GPR_ASSERT(ok); 
      if (call->status.ok()) { 
       if (call->reply_.has_r1()) { 
        std::cout << call << " DEBUG received: " 
          << call->reply_.r1().vac().val_array(0) << std::endl; 
       } 
      } 
      else { 
       std::cout << call << " RPC failed" << std::endl; 
       std::cout << " RPC failure code = " << call->status.error_code() << std::endl; 
       std::cout << " RPC failure message = " << call->status.error_message() << std::endl; 
      } 
      delete call; 
     } 
    } 

    private: 
    struct AsyncClientCall { 
     outer_container reply_; 
     ClientContext context; 
     Status status; 
     std::unique_ptr<ClientAsyncResponseReader<outer_container>> response_reader; 
    }; 

    std::unique_ptr<DEBUGservice::Stub> stub_; 
    CompletionQueue cq_; 
}; 

int main(int argc, char** argv) { 
    DEBUGClient DEBUG0(grpc::CreateChannel("172.16.17.46:50050", grpc::InsecureChannelCredentials())); 
    std::thread thread0_ = std::thread(&DEBUGClient::AsyncCompleteRpc, &DEBUG0); 
    DEBUG0.DEBUG_val_container_get(); 
    sleep(1); 
    std::cout << "Press control-c to quit" << std::endl << std::endl; 
    thread0_.join(); //blocks forever 
    return 0; 
} 

而且,这里是我们的服务器的源代码:

#include <memory> 
#include <iostream> 
#include <string> 
#include <thread> 

#include <grpc++/grpc++.h> 
#include <grpc/support/log.h> 

#include "debug.grpc.pb.h" 

#include <time.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <stdio.h> 

using grpc::Server; 
using grpc::ServerAsyncResponseWriter; 
using grpc::ServerBuilder; 
using grpc::ServerContext; 
using grpc::ServerCompletionQueue; 
using grpc::Status; 
using DEBUGpackage::inner_container; 
using DEBUGpackage::input_int32_request; 
using DEBUGpackage::middle_container; 
using DEBUGpackage::outer_container; 
using DEBUGpackage::DEBUGservice; 

std::string save_server_address; 

class ServerImpl final { 

    public: 

    ~ServerImpl() { 
     server_->Shutdown(); 
     cq_->Shutdown(); 
    } 

    void Run() { 
     std::string server_address("0.0.0.0:50050"); 
     ServerBuilder builder; 
     builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); 
     builder.RegisterService(&service_); 
     cq_ = builder.AddCompletionQueue(); 
     server_ = builder.BuildAndStart(); 
     std::cout << "Server listening on " << server_address << std::endl; 
     save_server_address = server_address; 
     HandleRpcs(); 
    } 

    private: 

    class CallData { 
     public: 
     virtual void Proceed() = 0; 
    }; 

    class DebugGetCallData final : public CallData{ 

     public: 

     DebugGetCallData(DEBUGservice::AsyncService* service, ServerCompletionQueue* cq) 
      : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) { 
      Proceed(); 
     } 
     void Proceed() { 
      if (status_ == CREATE) { 
      status_ = PROCESS; 
      service_->RequestDEBUG_val_container_get(&ctx_, &request_, &responder_, cq_, cq_, this); 
      } else if (status_ == PROCESS) { 
      new DebugGetCallData(service_, cq_); 
      char *portchar; 
      portchar = (char *) save_server_address.c_str(); 
      long cq_addr = (long) cq_; 
      int cq_addr32 = (int) (cq_addr & 0xfffffff); 
      srand(cq_addr32); 
      fprintf(stderr, "%s task started\n", portchar); fflush(stderr); 
      unsigned int return_val = 10; 
      inner_container ic; 
      ic.add_val_array(return_val); 
      middle_container reply_temp; 
      reply_temp.set_allocated_vac(&ic); 
      reply_.set_allocated_r1(&reply_temp); 
      fprintf(stderr, "%s %s task done\n", portchar, "val_container_get"); fflush(stderr); 
      status_ = FINISH; 
      responder_.Finish(reply_, Status::OK, this); 
      } else { 
      GPR_ASSERT(status_ == FINISH); 
      } 
     } 

     private: 

     DEBUGservice::AsyncService* service_; 
     ServerCompletionQueue* cq_; 
     ServerContext ctx_; 
     input_int32_request request_; 
     outer_container reply_; 
     ServerAsyncResponseWriter<outer_container> responder_; 
     enum CallStatus { CREATE, PROCESS, FINISH }; 
     CallStatus status_; 
    }; 

    void HandleRpcs() { 
     new DebugGetCallData(&service_, cq_.get()); 
     void* tag; 
     bool ok; 
     while (true) { 
     GPR_ASSERT(cq_->Next(&tag, &ok)); 
     GPR_ASSERT(ok); 
     static_cast<CallData*>(tag)->Proceed(); 
     } 
    } 

    std::unique_ptr<ServerCompletionQueue> cq_; 
    DEBUGservice::AsyncService service_; 
    std::unique_ptr<Server> server_; 
}; 

int main() { 
    ServerImpl server; 
    server.Run(); 
    return 0; 
} 

输出,当我运行它看起来像这样:

[[email protected] debug]$ DEBUG_client2 
in DEBUG_val_container_get 
0xb73ff0 RPC failed 
RPC failure code = 14 
RPC failure message = Endpoint read failed 
Press control-c to quit 

我们在gdb下运行服务器,并在生成的 文件“debug.pb.cc”中找到一个地方,如果我们有t注释掉一行,这一切都开始工作。

下面是相关的部分生成的文件“debug.pb.cc”的:

middle_container::~middle_container() { 
    // @@protoc_insertion_point(destructor:DEBUGpackage.middle_container) 
    SharedDtor(); 
} 

void middle_container::SharedDtor() { 
    if (this != internal_default_instance()) { 
    delete vac_; // comment out this one line, to make the problem go away 
    } 
} 

的“删除vac_”行似乎是一个试图删除,要么已被删除的存储,或者是即将被删除的地方。请有人看看这个吗? [下面的文件仍然是我们用来生成此代码的文件,并且将问题调试到这一点]

我不知道我是否发现了GRPC中的错误,或者我是否编写了错误代码。

+0

当你真的把多个子类型放入其中时会发生什么?现在它不是很重要的一个。 – Uli

+0

@uli 感谢您的快速回复!我添加了另一个子类型,行为也是一样的。我还添加了我看到的行为的打印输出。 –

+0

好的。不幸的是,我不知道grpc C的实现。在grpc.io网站或https://github.com/grpc/grpc/tree/master/doc上是否有关于错误代码14的更多信息?我似乎想起了一些带有错误代码的文档。如果您在此没有得到答复,也许grpc Google群组中的某人可以提供帮助。 – Uli

回答

0

问题是您在服务器的堆栈中分配了middle_container reply_tmp。因此,只要你超出范围,它就会被破坏。那时候,你叫Finish,但还没有等到它的结果。由于这是一个异步服务器,数据必须保持活动状态,直到您收到该标签为止。这就是为什么手动编辑你的析构函数在你的情况下工作的原因;你基本上无效析构函数(并因此导致内存泄漏)。

+0

@VijayPai谢谢你。 我在这整个“等待结束的结果”事情上有点虚弱。 我们的代码是基于在 实例/ CPP /的HelloWorld/greeter_async_server.cc 和 实例/ CPP /的HelloWorld/greeter_async_client2.cc发现GRPC示例代码。 所以,如果你能指出在这段代码中他们正在等待Finish的结果,我会非常感激。 –