Server to server communication example

In the previous example we have learnt how a client can communicate to a server. There are cases when one service may need to communicate to another service. Now we will learn how to make a server to call functions on another server.
Now we will extend the previous example. Let us introduce an intermediate server and call it server_mid. This server will implement the Calculator service but instead of calculating the value itself, it will call add function on another Calculator server and then upon receiving the response, it will simply reply to the client with whatever it received from the other server.
Thrift internal diagram
This server simply acts as a proxy. But in a real world problem, a server may be doing much more than simply proxying. Purpose of this example is only to demonstrate how a server can communicate to another server so the logic inside the proxy server is kept very minimal.


Now we have three processes

The proxy server (server_mid.cpp)

#include <iostream>

#include "gen-cpp2/Calculator.h" // From generated code
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include <pthread.h>

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::async;
using namespace example::cpp2;
using namespace folly::wangle;

// This wrapper is used to capture unique_ptr<Callback> inside
// Lambda. There is no proper way to capture uniqie_ptr
// in lambdas, this wrapper is simply a workaround
struct CallbackWrapper {
  std::unique_ptr<HandlerCallback<int64_t>> ptr;
  CallbackWrapper(std::unique_ptr<HandlerCallback<int64_t>>&p)
    : ptr(std::move(p)) { }
  CallbackWrapper(const CallbackWrapper& o) {
    // The static cast is not a good idea but done to workaround
    CallbackWrapper& cw = const_cast<CallbackWrapper&>(o);
    ptr = std::move(cw.ptr);
  }
};

// The calculator service. This service is a proxy service.
// It does not do the calculation itself, instead it calls
// add function on other server and sends the same response
class CalculatorSvc : public CalculatorSvIf
{
private:
  // This function create a client object which will be used to
  // to call add function on real calculator server
  CalculatorAsyncClient* getClient(TEventBase* ev)
  {
    int port_ = 8080;
    // Create async client socket
    std::shared_ptr<TAsyncSocket> socket(
      TAsyncSocket::newSocket(ev, "127.0.0.1", port_));
    // Create HeaderClientChannel
    auto client_channel = HeaderClientChannel::newChannel(socket);
    // Create client object
    return new CalculatorAsyncClient(std::move(client_channel));
  }
public:
  virtual ~CalculatorSvc() {}
  void async_tm_add(
    std::unique_ptr<apache::thrift::HandlerCallback<int64_t>> callback,
    int32_t num1,
    int32_t num2)
  {
    cout << "Got async request " << num1 << " + " << num2 << endl;
    TEventBase* te = callback->getEventBase();
    // Create Callback wrapper so that it can be captured in lambda
    CallbackWrapper cb(callback);
    // The async_tm_add function is called in a different thread than the
    // thread which received the request from the network. Since we are going
    // make an async request to another server, current thread must be in an
    // event loop but the current thread is not in any event loop. So will
    // make the network call in the thread which actually received request
    // from client.
    te->runInEventBaseThread([num1, num2, cb, this]() {
      // Get the client handle of the real calculator server
      CalculatorAsyncClient* client = getClient(cb.ptr->getEventBase());;
      // Make the add function call on the real server
      folly::wangle::Future<int64_t> f = client->future_add(num1, num2);
      f.then(
        [cb](Try<int64_t>&& t) {
          // Now we have received the response from real calculator server
          // respond to the client with the same result
          cout << "Received response from other server" << endl;
          cb.ptr->result(t.value());
        });
    });
  }
};

int main(int argc, char **argv) {
  // The code in the main function same as in previous example except
  // the port number. This proxy calculator will listen on 8081. So the
  // client will have to connect to 8081 now.
  std::shared_ptr<CalculatorSvc> ptr(new  CalculatorSvc());
  ThriftServer* s = new ThriftServer();
  s->setInterface(ptr);
  s->setPort(8081);
  s->serve();

  return 0;
}

Compile the server_mid.cpp as

g++ -o server_mid server_mid.cpp gen-cpp2/Calculator.cpp gen-cpp2/calculator_constants.cpp gen-cpp2/calculator_types.cpp -std=c++11 -I gen-cpp -lboost_system -lpthread -lglog -lfolly -lthrift -lthriftcpp2


From the previous example, change the port number in client.cpp to 8081 and compile the client.


Now open 3 bash shells. From the first, run server from the 2nd, run server_mid then run client from the 3rd shell.


If you want more explanation of why te->runInEventBaseThread was done, then refer to Thrift internals.


up

Do you collaborate using whiteboard? Please try Lekh Board - An Intelligent Collaborate Whiteboard App