1 #include "connection.hpp"
2 #include "connection_zk.hpp"
11 #include <unordered_set>
13 #include <zookeeper/zookeeper.h>
22 connection::~connection() noexcept
25 std::shared_ptr<connection> connection::connect(
const connection_params& params)
27 return std::make_shared<connection_zk>(params);
30 std::shared_ptr<connection> connection::connect(string_view conn_string)
37 std::unique_lock<std::mutex> ax(_state_change_promises_protect);
38 _state_change_promises.emplace_back();
39 return _state_change_promises.rbegin()->get_future();
44 std::unique_lock<std::mutex> ax(_state_change_promises_protect);
45 auto l_state_change_promises = std::move(_state_change_promises);
50 : std::exception_ptr();
52 for (
auto& p : l_state_change_promises)
57 p.set_value(new_state);
66 _connection_schema("zk"),
69 _randomize_hosts(
true),
71 _timeout(default_timeout)
74 connection_params::~connection_params() noexcept
77 template <
typename TMatch>
78 static string_view sv_from_match(
const TMatch& src)
80 return string_view(src.first, std::distance(src.first, src.second));
83 template <
typename FAction>
84 void split_each_substr(string_view src,
char delim,
const FAction& action)
89 auto next_div = std::find(src.begin(), src.end(), delim);
90 action(string_view(src.data(), std::distance(src.begin(), next_div)));
91 src.remove_prefix(std::distance(src.begin(), next_div));
93 src.remove_prefix(1U);
97 static connection_params::host_list extract_host_list(string_view src)
99 connection_params::host_list out;
100 out.reserve(std::count(src.begin(), src.end(),
','));
101 split_each_substr(src,
',', [&] (string_view sub) { out.emplace_back(std::string(sub)); });
105 static bool extract_bool(string_view key, string_view val)
108 throw std::invalid_argument(std::string(
"Key ") + std::string(key) +
" has blank value");
121 throw std::invalid_argument(std::string(
"Invalid value for ") + std::string(key) + std::string(
" \"")
122 + std::string(val) +
"\" -- expected a boolean"
127 static std::chrono::milliseconds extract_millis(string_view key, string_view val)
130 throw std::invalid_argument(std::string(
"Key ") + std::string(key) +
" has blank value");
134 throw std::invalid_argument(
"ISO 8601 duration is not supported (yet).");
138 double seconds = std::stod(std::string(val));
139 return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<double>(seconds));
143 static void extract_advanced_options(string_view src, connection_params& out)
145 if (src.empty() || src.size() == 1U)
148 src.remove_prefix(1U);
150 std::string invalid_keys_msg;
151 auto invalid_key = [&] (string_view key)
153 if (invalid_keys_msg.empty())
154 invalid_keys_msg =
"Invalid key in querystring: ";
156 invalid_keys_msg +=
", ";
158 invalid_keys_msg += std::string(key);
161 split_each_substr(src,
'&', [&] (string_view qp_part)
163 auto eq_it = std::find(qp_part.begin(), qp_part.end(),
'=');
164 if (eq_it == qp_part.end())
165 throw std::invalid_argument(
"Invalid connection string -- query string must be specified as "
166 "\"key1=value1&key2=value2...\""
169 auto key = qp_part.substr(0, std::distance(qp_part.begin(), eq_it));
170 auto val = qp_part.substr(std::distance(qp_part.begin(), eq_it) + 1);
172 if (key ==
"randomize_hosts")
173 out.randomize_hosts() = extract_bool(key, val);
174 else if (key ==
"read_only")
175 out.read_only() = extract_bool(key, val);
176 else if (key ==
"timeout")
177 out.timeout() = extract_millis(key, val);
182 if (!invalid_keys_msg.empty())
183 throw std::invalid_argument(std::move(invalid_keys_msg));
188 static const std::regex expr(R
"(([^:]+)://([^/]+)((/[^\?]*)(\?.*)?)?)",
189 std::regex_constants::ECMAScript | std::regex_constants::optimize
191 constexpr auto schema_idx = 1U;
192 constexpr
auto hostaddr_idx = 2U;
193 constexpr
auto path_idx = 4U;
194 constexpr
auto querystring_idx = 5U;
197 if (!std::regex_match(conn_string.begin(), conn_string.end(), match, expr))
198 throw std::invalid_argument(std::string(
"Invalid connection string (") + std::string(conn_string)
199 +
" -- format is \"schema://[auth@]${host_addrs}/[path][?options]\""
204 out.
hosts() = extract_host_list(sv_from_match(match[hostaddr_idx]));
205 out.
chroot() = match[path_idx].str();
209 extract_advanced_options(sv_from_match(match[querystring_idx]), out);
224 bool operator!=(
const connection_params& lhs,
const connection_params& rhs)
226 return !(lhs == rhs);
229 std::ostream& operator<<(std::ostream& os,
const connection_params& x)
231 os << x.connection_schema() <<
"://";
233 for (
const auto& host : x.hosts())
245 auto query_string = [&] (ptr<const char> key,
const auto& val)
257 os << key <<
'=' << val;
259 if (!x.randomize_hosts())
260 query_string(
"randomize_hosts",
"false");
262 query_string(
"read_only",
"true");
263 if (x.timeout() != connection_params::default_timeout)
264 query_string(
"timeout", std::chrono::duration<double>(x.timeout()).count());
268 std::string to_string(
const connection_params& x)
270 std::ostringstream os;
std::chrono::milliseconds timeout() const
static connection_params parse(string_view conn_string)
Create an instance from a connection string.
state
Enumeration of states the client may be at when a watch triggers.
connection_params() noexcept
Create an instance with default values.
std::exception_ptr get_exception_ptr_of(error_code code)
Get an std::exception_ptr containing an exception with the proper type for the given code...
const std::string & connection_schema() const
Authentication has failed – connection requires a new connection instance with different credentials...
const host_list & hosts() const
Used to specify parameters for a connection.
const std::string & chroot() const
virtual future< zk::state > watch_state()
Watch for a state change.
The serving cluster has expired this session.
bool randomize_hosts() const
virtual void on_session_event(zk::state new_state)
Call this from derived classes when a session event happens.
Code for authentication_failed.
Code for session_expired.