1 #include "connection.hpp" 
    2 #include "connection_zk.hpp" 
   11 #include <unordered_set> 
   13 #include <zookeeper/zookeeper.h> 
   22 connection::~connection() noexcept
 
   25 std::shared_ptr<connection> connection::connect(
const connection_params& params)
 
   27     return std::make_shared<connection_zk>(params);
 
   30 std::shared_ptr<connection> connection::connect(string_view conn_string)
 
   37     std::unique_lock<std::mutex> ax(_state_change_promises_protect);
 
   38     _state_change_promises.emplace_back();
 
   39     return _state_change_promises.rbegin()->get_future();
 
   44     std::unique_lock<std::mutex> ax(_state_change_promises_protect);
 
   45     auto l_state_change_promises = std::move(_state_change_promises);
 
   50             :                                                 std::exception_ptr();
 
   52     for (
auto& p : l_state_change_promises)
 
   57             p.set_value(new_state);
 
   66         _connection_schema("zk"),
 
   69         _randomize_hosts(
true),
 
   71         _timeout(default_timeout)
 
   74 connection_params::~connection_params() noexcept
 
   77 template <
typename TMatch>
 
   78 static string_view sv_from_match(
const TMatch& src)
 
   80     return string_view(src.first, std::distance(src.first, src.second));
 
   83 template <
typename FAction>
 
   84 void split_each_substr(string_view src, 
char delim, 
const FAction& action)
 
   89         auto next_div = std::find(src.begin(), src.end(), delim);
 
   90         action(string_view(src.data(), std::distance(src.begin(), next_div)));
 
   91         src.remove_prefix(std::distance(src.begin(), next_div));
 
   93             src.remove_prefix(1U);
 
   97 static connection_params::host_list extract_host_list(string_view src)
 
   99     connection_params::host_list out;
 
  100     out.reserve(std::count(src.begin(), src.end(), 
','));
 
  101     split_each_substr(src, 
',', [&] (string_view sub) { out.emplace_back(std::string(sub)); });
 
  105 static bool extract_bool(string_view key, string_view val)
 
  108         throw std::invalid_argument(std::string(
"Key ") + std::string(key) + 
" has blank value");
 
  121         throw std::invalid_argument(std::string(
"Invalid value for ") + std::string(key) + std::string(
" \"")
 
  122                                     + std::string(val) + 
"\" -- expected a boolean" 
  127 static std::chrono::milliseconds extract_millis(string_view key, string_view val)
 
  130         throw std::invalid_argument(std::string(
"Key ") + std::string(key) + 
" has blank value");
 
  134         throw std::invalid_argument(
"ISO 8601 duration is not supported (yet).");
 
  138         double seconds = std::stod(std::string(val));
 
  139         return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<double>(seconds));
 
  143 static void extract_advanced_options(string_view src, connection_params& out)
 
  145     if (src.empty() || src.size() == 1U)
 
  148         src.remove_prefix(1U);
 
  150     std::string invalid_keys_msg;
 
  151     auto invalid_key = [&] (string_view key)
 
  153                            if (invalid_keys_msg.empty())
 
  154                                invalid_keys_msg = 
"Invalid key in querystring: ";
 
  156                                invalid_keys_msg += 
", ";
 
  158                            invalid_keys_msg += std::string(key);
 
  161     split_each_substr(src, 
'&', [&] (string_view qp_part)
 
  163         auto eq_it = std::find(qp_part.begin(), qp_part.end(), 
'=');
 
  164         if (eq_it == qp_part.end())
 
  165             throw std::invalid_argument(
"Invalid connection string -- query string must be specified as " 
  166                                         "\"key1=value1&key2=value2...\"" 
  169         auto key = qp_part.substr(0, std::distance(qp_part.begin(), eq_it));
 
  170         auto val = qp_part.substr(std::distance(qp_part.begin(), eq_it) + 1);
 
  172         if (key == 
"randomize_hosts")
 
  173             out.randomize_hosts() = extract_bool(key, val);
 
  174         else if (key == 
"read_only")
 
  175             out.read_only() = extract_bool(key, val);
 
  176         else if (key == 
"timeout")
 
  177             out.timeout() = extract_millis(key, val);
 
  182     if (!invalid_keys_msg.empty())
 
  183         throw std::invalid_argument(std::move(invalid_keys_msg));
 
  188     static const std::regex expr(R
"(([^:]+)://([^/]+)((/[^\?]*)(\?.*)?)?)", 
  189                                  std::regex_constants::ECMAScript | std::regex_constants::optimize 
  191     constexpr auto schema_idx      = 1U;
 
  192     constexpr 
auto hostaddr_idx    = 2U;
 
  193     constexpr 
auto path_idx        = 4U;
 
  194     constexpr 
auto querystring_idx = 5U;
 
  197     if (!std::regex_match(conn_string.begin(), conn_string.end(), match, expr))
 
  198         throw std::invalid_argument(std::string(
"Invalid connection string (") + std::string(conn_string)
 
  199                                     + 
" -- format is \"schema://[auth@]${host_addrs}/[path][?options]\"" 
  204     out.
hosts()             = extract_host_list(sv_from_match(match[hostaddr_idx]));
 
  205     out.
chroot()            = match[path_idx].str();
 
  209     extract_advanced_options(sv_from_match(match[querystring_idx]), out);
 
  224 bool operator!=(
const connection_params& lhs, 
const connection_params& rhs)
 
  226     return !(lhs == rhs);
 
  229 std::ostream& operator<<(std::ostream& os, 
const connection_params& x)
 
  231     os << x.connection_schema() << 
"://";
 
  233     for (
const auto& host : x.hosts())
 
  245     auto query_string = [&] (ptr<const char> key, 
const auto& val)
 
  257                             os << key << 
'=' << val;
 
  259     if (!x.randomize_hosts())
 
  260         query_string(
"randomize_hosts", 
"false");
 
  262         query_string(
"read_only", 
"true");
 
  263     if (x.timeout() != connection_params::default_timeout)
 
  264         query_string(
"timeout", std::chrono::duration<double>(x.timeout()).count());
 
  268 std::string to_string(
const connection_params& x)
 
  270     std::ostringstream os;
 
std::chrono::milliseconds timeout() const 
 
static connection_params parse(string_view conn_string)
Create an instance from a connection string. 
 
state
Enumeration of states the client may be at when a watch triggers. 
 
connection_params() noexcept
Create an instance with default values. 
 
std::exception_ptr get_exception_ptr_of(error_code code)
Get an std::exception_ptr containing an exception with the proper type for the given code...
 
const std::string & connection_schema() const 
 
Authentication has failed – connection requires a new connection instance with different credentials...
 
const host_list & hosts() const 
 
Used to specify parameters for a connection. 
 
const std::string & chroot() const 
 
virtual future< zk::state > watch_state()
Watch for a state change. 
 
The serving cluster has expired this session. 
 
bool randomize_hosts() const 
 
virtual void on_session_event(zk::state new_state)
Call this from derived classes when a session event happens. 
 
Code for authentication_failed. 
 
Code for session_expired.