zookeeper-cpp
ZooKeeper Client for C++
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Groups
server.cpp
1 #include "server.hpp"
2 
3 #include <zk/future.hpp>
4 
5 #include <cerrno>
6 #include <exception>
7 #include <iostream>
8 #include <system_error>
9 
10 #include <signal.h>
11 #include <sys/select.h>
12 
13 #include "classpath.hpp"
14 #include "configuration.hpp"
15 #include "detail/event_handle.hpp"
16 #include "detail/subprocess.hpp"
17 
18 namespace zk::server
19 {
20 
21 static void validate_settings(const configuration& settings)
22 {
23  if (settings.is_minimal())
24  {
25  return;
26  }
27  else if (!settings.source_file())
28  {
29  throw std::invalid_argument("Configuration has not been saved to a file");
30  }
31 }
32 
33 server::server(classpath packages, configuration settings) :
34  _running(true),
35  _shutdown_event(std::make_unique<detail::event_handle>())
36 {
37  validate_settings(settings);
38  _worker = std::thread([this, packages = std::move(packages), settings = std::move(settings)] ()
39  {
40  this->run_process(packages, settings);
41  }
42  );
43 }
44 
46  server(classpath::system_default(), std::move(settings))
47 { }
48 
49 server::~server() noexcept
50 {
51  shutdown(true);
52 }
53 
54 void server::shutdown(bool wait_for_stop)
55 {
56  _running.store(false, std::memory_order_release);
57  _shutdown_event->notify_one();
58 
59  if (wait_for_stop && _worker.joinable())
60  _worker.join();
61 }
62 
63 static void wait_for_event(int fd1, int fd2, int fd3)
64 {
65  // This could be implemented with epoll instead of select, but since N=3, it doesn't really matter
66  ::fd_set read_fds;
67  FD_ZERO(&read_fds);
68  FD_SET(fd1, &read_fds);
69  FD_SET(fd2, &read_fds);
70  FD_SET(fd3, &read_fds);
71 
72  int nfds = std::max(std::max(fd1, fd2), fd3) + 1;
73  int rc = ::select(nfds, &read_fds, nullptr, nullptr, nullptr);
74  if (rc < 0)
75  {
76  if (errno == EINTR)
77  return;
78  else
79  throw std::system_error(errno, std::system_category(), "select");
80  }
81 }
82 
83 void server::run_process(const classpath& packages, const configuration& settings)
84 {
85  detail::subprocess::argument_list args = { "-cp", packages.command_line(),
86  "org.apache.zookeeper.server.quorum.QuorumPeerMain",
87  };
88  if (settings.is_minimal())
89  {
90  args.emplace_back(std::to_string(settings.client_port()));
91  args.emplace_back(settings.data_directory().value());
92  }
93  else
94  {
95  args.emplace_back(settings.source_file().value());
96  }
97 
98  detail::subprocess proc("java", std::move(args));
99 
100  auto drain_pipes = [&] ()
101  {
102  bool read_anything = true;
103  while (read_anything)
104  {
105  read_anything = false;
106 
107  auto out = proc.stdout().read();
108  if (!out.empty())
109  {
110  read_anything = true;
111  std::cout << out;
112  }
113 
114  auto err = proc.stderr().read();
115  if (!err.empty())
116  {
117  read_anything = true;
118  std::cerr << out;
119  }
120  }
121  };
122 
123  while (_running.load(std::memory_order_acquire))
124  {
125  wait_for_event(proc.stdout().native_read_handle(),
126  proc.stderr().native_read_handle(),
127  _shutdown_event->native_handle()
128  );
129 
130  drain_pipes();
131  }
132  proc.terminate();
133  drain_pipes();
134 }
135 
136 }
Represents a collection of JARs or other Java entities that should be provided as the --classpath to ...
Definition: classpath.hpp:18
Controls a ZooKeeper server process on this local machine.
Definition: server.hpp:29
Controls the import of future and promise types.
void shutdown(bool wait_for_stop=false)
Initiate shutting down the server process.
Definition: server.cpp:54
Represents a configuration which should be run by server instance.