zookeeper-cpp
ZooKeeper Client for C++
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Groups
connection.cpp
1 #include "connection.hpp"
2 #include "connection_zk.hpp"
3 #include "error.hpp"
4 #include "types.hpp"
5 
6 #include <algorithm>
7 #include <regex>
8 #include <ostream>
9 #include <sstream>
10 #include <stdexcept>
11 #include <unordered_set>
12 
13 #include <zookeeper/zookeeper.h>
14 
15 namespace zk
16 {
17 
19 // connection //
21 
22 connection::~connection() noexcept
23 { }
24 
25 std::shared_ptr<connection> connection::connect(const connection_params& params)
26 {
27  return std::make_shared<connection_zk>(params);
28 }
29 
30 std::shared_ptr<connection> connection::connect(string_view conn_string)
31 {
32  return connect(connection_params::parse(conn_string));
33 }
34 
35 future<zk::state> connection::watch_state()
36 {
37  std::unique_lock<std::mutex> ax(_state_change_promises_protect);
38  _state_change_promises.emplace_back();
39  return _state_change_promises.rbegin()->get_future();
40 }
41 
43 {
44  std::unique_lock<std::mutex> ax(_state_change_promises_protect);
45  auto l_state_change_promises = std::move(_state_change_promises);
46  ax.unlock();
47 
48  auto ex = new_state == zk::state::expired_session ? get_exception_ptr_of(error_code::session_expired)
49  : new_state == zk::state::authentication_failed ? get_exception_ptr_of(error_code::authentication_failed)
50  : std::exception_ptr();
51 
52  for (auto& p : l_state_change_promises)
53  {
54  if (ex)
55  p.set_exception(ex);
56  else
57  p.set_value(new_state);
58  }
59 }
60 
62 // connection_params //
64 
66  _connection_schema("zk"),
67  _hosts({}),
68  _chroot("/"),
69  _randomize_hosts(true),
70  _read_only(false),
71  _timeout(default_timeout)
72 { }
73 
74 connection_params::~connection_params() noexcept
75 { }
76 
77 template <typename TMatch>
78 static string_view sv_from_match(const TMatch& src)
79 {
80  return string_view(src.first, std::distance(src.first, src.second));
81 }
82 
83 template <typename FAction>
84 void split_each_substr(string_view src, char delim, const FAction& action)
85 {
86  while (!src.empty())
87  {
88  // if next_div is src.end, the logic still works
89  auto next_div = std::find(src.begin(), src.end(), delim);
90  action(string_view(src.data(), std::distance(src.begin(), next_div)));
91  src.remove_prefix(std::distance(src.begin(), next_div));
92  if (!src.empty())
93  src.remove_prefix(1U);
94  }
95 }
96 
97 static connection_params::host_list extract_host_list(string_view src)
98 {
99  connection_params::host_list out;
100  out.reserve(std::count(src.begin(), src.end(), ','));
101  split_each_substr(src, ',', [&] (string_view sub) { out.emplace_back(std::string(sub)); });
102  return out;
103 }
104 
105 static bool extract_bool(string_view key, string_view val)
106 {
107  if (val.empty())
108  throw std::invalid_argument(std::string("Key ") + std::string(key) + " has blank value");
109 
110  switch (val[0])
111  {
112  case '1':
113  case 't':
114  case 'T':
115  return true;
116  case '0':
117  case 'f':
118  case 'F':
119  return false;
120  default:
121  throw std::invalid_argument(std::string("Invalid value for ") + std::string(key) + std::string(" \"")
122  + std::string(val) + "\" -- expected a boolean"
123  );
124  }
125 }
126 
127 static std::chrono::milliseconds extract_millis(string_view key, string_view val)
128 {
129  if (val.empty())
130  throw std::invalid_argument(std::string("Key ") + std::string(key) + " has blank value");
131 
132  if (val[0] == 'P')
133  {
134  throw std::invalid_argument("ISO 8601 duration is not supported (yet).");
135  }
136  else
137  {
138  double seconds = std::stod(std::string(val));
139  return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<double>(seconds));
140  }
141 }
142 
143 static void extract_advanced_options(string_view src, connection_params& out)
144 {
145  if (src.empty() || src.size() == 1U)
146  return;
147  else
148  src.remove_prefix(1U);
149 
150  std::string invalid_keys_msg;
151  auto invalid_key = [&] (string_view key)
152  {
153  if (invalid_keys_msg.empty())
154  invalid_keys_msg = "Invalid key in querystring: ";
155  else
156  invalid_keys_msg += ", ";
157 
158  invalid_keys_msg += std::string(key);
159  };
160 
161  split_each_substr(src, '&', [&] (string_view qp_part)
162  {
163  auto eq_it = std::find(qp_part.begin(), qp_part.end(), '=');
164  if (eq_it == qp_part.end())
165  throw std::invalid_argument("Invalid connection string -- query string must be specified as "
166  "\"key1=value1&key2=value2...\""
167  );
168 
169  auto key = qp_part.substr(0, std::distance(qp_part.begin(), eq_it));
170  auto val = qp_part.substr(std::distance(qp_part.begin(), eq_it) + 1);
171 
172  if (key == "randomize_hosts")
173  out.randomize_hosts() = extract_bool(key, val);
174  else if (key == "read_only")
175  out.read_only() = extract_bool(key, val);
176  else if (key == "timeout")
177  out.timeout() = extract_millis(key, val);
178  else
179  invalid_key(key);
180  });
181 
182  if (!invalid_keys_msg.empty())
183  throw std::invalid_argument(std::move(invalid_keys_msg));
184 }
185 
187 {
188  static const std::regex expr(R"(([^:]+)://([^/]+)((/[^\?]*)(\?.*)?)?)",
189  std::regex_constants::ECMAScript | std::regex_constants::optimize
190  );
191  constexpr auto schema_idx = 1U;
192  constexpr auto hostaddr_idx = 2U;
193  constexpr auto path_idx = 4U;
194  constexpr auto querystring_idx = 5U;
195 
196  std::cmatch match;
197  if (!std::regex_match(conn_string.begin(), conn_string.end(), match, expr))
198  throw std::invalid_argument(std::string("Invalid connection string (") + std::string(conn_string)
199  + " -- format is \"schema://[auth@]${host_addrs}/[path][?options]\""
200  );
201 
202  connection_params out;
203  out.connection_schema() = match[schema_idx].str();
204  out.hosts() = extract_host_list(sv_from_match(match[hostaddr_idx]));
205  out.chroot() = match[path_idx].str();
206  if (out.chroot().empty())
207  out.chroot() = "/";
208 
209  extract_advanced_options(sv_from_match(match[querystring_idx]), out);
210 
211  return out;
212 }
213 
214 bool operator==(const connection_params& lhs, const connection_params& rhs)
215 {
216  return lhs.connection_schema() == rhs.connection_schema()
217  && lhs.hosts() == rhs.hosts()
218  && lhs.chroot() == rhs.chroot()
219  && lhs.randomize_hosts() == rhs.randomize_hosts()
220  && lhs.read_only() == rhs.read_only()
221  && lhs.timeout() == rhs.timeout();
222 }
223 
224 bool operator!=(const connection_params& lhs, const connection_params& rhs)
225 {
226  return !(lhs == rhs);
227 }
228 
229 std::ostream& operator<<(std::ostream& os, const connection_params& x)
230 {
231  os << x.connection_schema() << "://";
232  bool first = true;
233  for (const auto& host : x.hosts())
234  {
235  if (first)
236  first = false;
237  else
238  os << ',';
239 
240  os << host;
241  }
242  os << x.chroot();
243 
244  first = true;
245  auto query_string = [&] (ptr<const char> key, const auto& val)
246  {
247  if (first)
248  {
249  first = false;
250  os << '?';
251  }
252  else
253  {
254  os << '&';
255  }
256 
257  os << key << '=' << val;
258  };
259  if (!x.randomize_hosts())
260  query_string("randomize_hosts", "false");
261  if (x.read_only())
262  query_string("read_only", "true");
263  if (x.timeout() != connection_params::default_timeout)
264  query_string("timeout", std::chrono::duration<double>(x.timeout()).count());
265  return os;
266 }
267 
268 std::string to_string(const connection_params& x)
269 {
270  std::ostringstream os;
271  os << x;
272  return os.str();
273 }
274 
275 }
std::chrono::milliseconds timeout() const
The session timeout between this client and the server.
Definition: connection.hpp:169
static connection_params parse(string_view conn_string)
Create an instance from a connection string.
Definition: connection.cpp:186
state
Enumeration of states the client may be at when a watch triggers.
Definition: types.hpp:320
bool read_only() const
Allow connections to read-only servers? The default (false) is to disallow.
Definition: connection.hpp:159
connection_params() noexcept
Create an instance with default values.
Definition: connection.cpp:65
const std::string & connection_schema() const
Determines the underlying zk::connection implementation to use.
Definition: connection.hpp:132
Authentication has failed – connection requires a new connection instance with different credentials...
const host_list & hosts() const
Addresses for the ensemble to connect to.
Definition: connection.hpp:141
Used to specify parameters for a connection.
Definition: connection.hpp:84
const std::string & chroot() const
Specifying a value for chroot as something aside from "" or "/" will run the client commands while in...
Definition: connection.hpp:149
virtual future< zk::state > watch_state()
Watch for a state change.
Definition: connection.cpp:35
The serving cluster has expired this session.
bool randomize_hosts() const
Connect to a host at random (as opposed to attempting connections in order)? The default is to random...
Definition: connection.hpp:155
virtual void on_session_event(zk::state new_state)
Call this from derived classes when a session event happens.
Definition: connection.cpp:42