1 #include "connection_zk.hpp"
13 #include <system_error>
16 #include <zookeeper/zookeeper.h>
31 template <
typename FAction>
32 auto with_str(string_view src, FAction&& action)
33 -> decltype(std::forward<FAction>(action)(ptr<const char>()))
35 char buffer[src.size() + 1];
36 buffer[src.size()] =
'\0';
37 std::memcpy(buffer, src.data(), src.size());
38 return std::forward<FAction>(action)(buffer);
41 static ACL encode_acl_part(
const acl_rule& src)
44 out.perms =
static_cast<int>(src.permissions());
45 out.id.scheme =
const_cast<ptr<char>
>(src.scheme().c_str());
46 out.id.id =
const_cast<ptr<char>
>(src.id().c_str());
50 template <
typename FAction>
51 auto with_acl(
const acl& rules, FAction&& action)
52 -> decltype(std::forward<FAction>(action)(ptr<ACL_vector>()))
54 ACL parts[rules.size()];
55 for (std::size_t idx = 0; idx < rules.size(); ++idx)
56 parts[idx] = encode_acl_part(rules[idx]);
59 vec.count = int(rules.size());
61 return std::forward<FAction>(action)(&vec);
68 static error_code error_code_from_raw(
int raw)
70 return static_cast<error_code
>(raw);
78 static state state_from_raw(
int raw)
80 return static_cast<state>(raw);
83 static stat stat_from_raw(
const struct Stat& raw)
87 out.child_modified_transaction = transaction_id(raw.pzxid);
88 out.child_version = child_version(raw.cversion);
89 out.children_count = raw.numChildren;
90 out.create_time = stat::time_point() + std::chrono::milliseconds(raw.ctime);
91 out.create_transaction = transaction_id(raw.czxid);
92 out.data_size = raw.dataLength;
93 out.data_version = version(raw.version);
94 out.ephemeral_owner = raw.ephemeralOwner;
95 out.modified_time = stat::time_point() + std::chrono::milliseconds(raw.mtime);
96 out.modified_transaction = transaction_id(raw.mzxid);
100 static std::vector<std::string> string_vector_from_raw(
const struct String_vector& raw)
102 std::vector<std::string> out;
103 out.reserve(raw.count);
104 for (std::int32_t idx = 0; idx < raw.count; ++idx)
105 out.emplace_back(raw.data[idx]);
109 static acl acl_from_raw(
const struct ACL_vector& raw)
111 auto sz = std::size_t(raw.count);
115 for (std::size_t idx = 0; idx < sz; ++idx)
117 const auto& item = raw.data[idx];
118 out.emplace_back(item.id.scheme, item.id.id, static_cast<permission>(item.perms));
127 connection_zk::connection_zk(
const connection_params& params) :
130 if (params.connection_schema() !=
"zk")
131 throw std::invalid_argument(std::string(
"Invalid connection string \"") + to_string(params) +
"\"");
133 auto conn_string = [&] ()
135 std::ostringstream os;
137 for (
const auto& host : params.hosts())
149 _handle = ::zookeeper_init(conn_string.c_str(),
150 on_session_event_raw,
151 static_cast<int>(params.timeout().count()),
158 std::system_error(errno, std::system_category(),
"Failed to create ZooKeeper client");
161 connection_zk::~connection_zk() noexcept
170 _event_delivered(
false)
175 virtual void deliver_event(
event ev)
177 if (!_event_delivered.exchange(
true, std::memory_order_relaxed))
179 _event_promise.set_value(std::move(ev));
183 future<event> get_event_future()
185 return _event_promise.get_future();
189 std::atomic<bool> _event_delivered;
190 promise<event> _event_promise;
193 template <
typename TResult>
199 _data_delivered(
false)
202 future<TResult> get_data_future()
204 return _data_promise.get_future();
207 virtual void deliver_event(
event ev)
override
209 if (!_data_delivered.load(std::memory_order_relaxed))
211 deliver_data(nullopt, get_exception_ptr_of(error_code::closing));
214 watcher::deliver_event(std::move(ev));
217 void deliver_data(optional<TResult> data, std::exception_ptr ex_ptr)
219 if (!_data_delivered.exchange(
true, std::memory_order_relaxed))
223 _data_promise.set_exception(std::move(ex_ptr));
227 _data_promise.set_value(std::move(*data));
233 std::atomic<bool> _data_delivered;
234 promise<TResult> _data_promise;
237 std::shared_ptr<connection_zk::watcher> connection_zk::try_extract_watch(
ptr<const void> addr)
239 std::unique_lock<std::mutex> ax(_watches_protect);
240 auto iter = _watches.find(addr);
241 if (iter != _watches.end())
242 return _watches.extract(iter).mapped();
252 void connection_zk::deliver_watch(ptr<zhandle_t> zh,
255 ptr<const char> path [[gnu::unused]],
259 auto&
self = *connection_from_context(zh);
260 if (
auto watcher =
self.try_extract_watch(proms_in))
261 watcher->deliver_event(event(event_from_raw(type_in), state_from_raw(state_in)));
264 void connection_zk::close()
268 auto err = error_code_from_raw(::zookeeper_close(_handle));
269 if (err != error_code::ok)
275 std::unique_lock<std::mutex> ax(_watches_protect);
276 auto l_watches = std::move(_watches);
278 for (
const auto& pair : l_watches)
286 return state_from_raw(::zoo_state(_handle));
291 future<get_result> connection_zk::get(string_view path)
293 ::data_completion_t callback =
294 [] (
int rc_in, ptr<const char> data,
int data_sz, ptr<const struct Stat> pstat, ptr<const void> prom_in) noexcept
296 std::unique_ptr<promise<get_result>> prom((
ptr<promise<get_result>>) prom_in);
297 auto rc = error_code_from_raw(rc_in);
298 if (rc == error_code::ok)
299 prom->set_value(get_result(
buffer(data, data + data_sz), stat_from_raw(*pstat)));
301 prom->set_exception(get_exception_ptr_of(rc));
304 return with_str(path, [&] (ptr<const char> path)
306 auto ppromise = std::make_unique<promise<get_result>>();
307 auto rc = error_code_from_raw(::zoo_aget(_handle, path, 0, callback, ppromise.get()));
308 if (rc == error_code::ok)
310 auto f = ppromise->get_future();
316 ppromise->set_exception(get_exception_ptr_of(rc));
317 return ppromise->get_future();
326 static void deliver_raw(
int rc_in,
334 auto rc = error_code_from_raw(rc_in);
336 if (rc == error_code::ok)
339 self.get_event_future()
346 self.deliver_data(nullopt, get_exception_ptr_of(rc));
351 future<watch_result> connection_zk::watch(string_view path)
355 std::unique_lock<std::mutex> ax(_watches_protect);
356 auto watcher = std::make_shared<data_watcher>();
357 auto rc = error_code_from_raw(::zoo_awget(_handle,
361 data_watcher::deliver_raw,
365 if (rc == error_code::ok)
368 watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
370 return watcher->get_data_future();
374 future<get_children_result> connection_zk::get_children(string_view path)
376 ::strings_stat_completion_t callback =
378 ptr<const struct String_vector> strings_in,
379 ptr<const struct Stat> stat_in,
380 ptr<const void> prom_in
383 std::unique_ptr<promise<get_children_result>> prom((
ptr<promise<get_children_result>>) prom_in);
384 auto rc = error_code_from_raw(rc_in);
387 if (rc != error_code::ok)
390 prom->set_value(get_children_result(string_vector_from_raw(*strings_in), stat_from_raw(*stat_in)));
394 prom->set_exception(std::current_exception());
398 return with_str(path, [&] (ptr<const char> path)
400 auto ppromise = std::make_unique<promise<get_children_result>>();
401 auto rc = error_code_from_raw(::zoo_aget_children2(_handle,
408 if (rc == error_code::ok)
410 auto f = ppromise->get_future();
416 ppromise->set_exception(get_exception_ptr_of(rc));
417 return ppromise->get_future();
426 static void deliver_raw(
int rc_in,
433 auto rc = error_code_from_raw(rc_in);
437 if (rc != error_code::ok)
441 stat_from_raw(*stat_in)
443 self.get_event_future()
450 self.deliver_data(nullopt, std::current_exception());
456 future<watch_children_result> connection_zk::watch_children(string_view path)
460 std::unique_lock<std::mutex> ax(_watches_protect);
461 auto watcher = std::make_shared<child_watcher>();
462 auto rc = error_code_from_raw(::zoo_awget_children2(_handle,
466 child_watcher::deliver_raw,
470 if (rc == error_code::ok)
473 watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
475 return watcher->get_data_future();
479 future<exists_result> connection_zk::exists(string_view path)
481 ::stat_completion_t callback =
482 [] (
int rc_in, ptr<const struct Stat> stat_in, ptr<const void> prom_in)
484 std::unique_ptr<promise<exists_result>> prom((
ptr<promise<exists_result>>) prom_in);
485 auto rc = error_code_from_raw(rc_in);
486 if (rc == error_code::ok)
487 prom->set_value(exists_result(stat_from_raw(*stat_in)));
488 else if (rc == error_code::no_node)
489 prom->set_value(exists_result(nullopt));
491 prom->set_exception(get_exception_ptr_of(rc));
494 return with_str(path, [&] (ptr<const char> path)
496 auto ppromise = std::make_unique<promise<exists_result>>();
497 auto rc = error_code_from_raw(::zoo_aexists(_handle, path, 0, callback, ppromise.get()));
498 if (rc == error_code::ok)
500 auto f = ppromise->get_future();
506 ppromise->set_exception(get_exception_ptr_of(rc));
507 return ppromise->get_future();
519 auto rc = error_code_from_raw(rc_in);
521 if (rc == error_code::ok)
527 else if (rc == error_code::no_node)
535 self.deliver_data(nullopt, get_exception_ptr_of(rc));
540 future<watch_exists_result> connection_zk::watch_exists(string_view path)
544 std::unique_lock<std::mutex> ax(_watches_protect);
545 auto watcher = std::make_shared<exists_watcher>();
546 auto rc = error_code_from_raw(::zoo_awexists(_handle,
550 exists_watcher::deliver_raw,
554 if (rc == error_code::ok)
557 watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
559 return watcher->get_data_future();
563 future<create_result> connection_zk::create(string_view path,
569 ::string_completion_t callback =
570 [] (
int rc_in, ptr<const char> name_in, ptr<const void> prom_in)
572 std::unique_ptr<promise<create_result>> prom((
ptr<promise<create_result>>) prom_in);
573 auto rc = error_code_from_raw(rc_in);
574 if (rc == error_code::ok)
575 prom->set_value(create_result(std::string(name_in)));
577 prom->set_exception(get_exception_ptr_of(rc));
580 return with_str(path, [&] (ptr<const char> path)
582 auto ppromise = std::make_unique<promise<create_result>>();
583 auto rc = with_acl(rules, [&] (ptr<const ACL_vector> rules)
585 return error_code_from_raw(::zoo_acreate(_handle,
590 static_cast<int>(mode),
596 if (rc == error_code::ok)
598 auto f = ppromise->get_future();
604 ppromise->set_exception(get_exception_ptr_of(rc));
605 return ppromise->get_future();
610 future<set_result> connection_zk::set(string_view path,
const buffer& data, version check)
612 ::stat_completion_t callback =
613 [] (
int rc_in, ptr<const struct Stat> stat_raw, ptr<const void> prom_in)
615 std::unique_ptr<promise<set_result>> prom((
ptr<promise<set_result>>) prom_in);
616 auto rc = error_code_from_raw(rc_in);
617 if (rc == error_code::ok)
618 prom->set_value(set_result(stat_from_raw(*stat_raw)));
620 prom->set_exception(get_exception_ptr_of(rc));
623 return with_str(path, [&] (ptr<const char> path)
625 auto ppromise = std::make_unique<promise<set_result>>();
626 auto rc = error_code_from_raw(::zoo_aset(_handle,
634 if (rc == error_code::ok)
636 auto f = ppromise->get_future();
642 ppromise->set_exception(get_exception_ptr_of(rc));
643 return ppromise->get_future();
648 future<void> connection_zk::erase(string_view path, version check)
650 ::void_completion_t callback =
651 [] (
int rc_in, ptr<const void> prom_in)
653 std::unique_ptr<promise<void>> prom((
ptr<promise<void>>) prom_in);
654 auto rc = error_code_from_raw(rc_in);
655 if (rc == error_code::ok)
658 prom->set_exception(get_exception_ptr_of(rc));
661 return with_str(path, [&] (ptr<const char> path)
663 auto ppromise = std::make_unique<promise<void>>();
664 auto rc = error_code_from_raw(::zoo_adelete(_handle, path, check.value, callback, ppromise.get()));
665 if (rc == error_code::ok)
667 auto f = ppromise->get_future();
673 ppromise->set_exception(get_exception_ptr_of(rc));
674 return ppromise->get_future();
679 future<get_acl_result> connection_zk::get_acl(string_view path)
const
681 ::acl_completion_t callback =
682 [] (
int rc_in, ptr<struct ACL_vector> acl_raw, ptr<struct Stat> stat_raw, ptr<const void> prom_in) noexcept
684 std::unique_ptr<promise<get_acl_result>> prom((
ptr<promise<get_acl_result>>) prom_in);
685 auto rc = error_code_from_raw(rc_in);
686 if (rc == error_code::ok)
687 prom->set_value(get_acl_result(acl_from_raw(*acl_raw), stat_from_raw(*stat_raw)));
689 prom->set_exception(get_exception_ptr_of(rc));
692 return with_str(path, [&] (ptr<const char> path)
694 auto ppromise = std::make_unique<promise<get_acl_result>>();
695 auto rc = error_code_from_raw(::zoo_aget_acl(_handle, path, callback, ppromise.get()));
696 if (rc == error_code::ok)
698 auto f = ppromise->get_future();
704 ppromise->set_exception(get_exception_ptr_of(rc));
705 return ppromise->get_future();
710 future<void> connection_zk::set_acl(string_view path,
const acl& rules, acl_version check)
712 ::void_completion_t callback =
713 [] (
int rc_in, ptr<const void> prom_in)
715 std::unique_ptr<promise<void>> prom((
ptr<promise<void>>) prom_in);
716 auto rc = error_code_from_raw(rc_in);
717 if (rc == error_code::ok)
720 prom->set_exception(get_exception_ptr_of(rc));
723 return with_str(path, [&] (ptr<const char> path)
725 return with_acl(rules, [&] (ptr<struct ACL_vector> rules)
727 auto ppromise = std::make_unique<promise<void>>();
728 auto rc = error_code_from_raw(::zoo_aset_acl(_handle,
736 if (rc == error_code::ok)
738 auto f = ppromise->get_future();
744 ppromise->set_exception(get_exception_ptr_of(rc));
745 return ppromise->get_future();
754 promise<multi_result> prom;
755 std::vector<zoo_op_result_t> raw_results;
756 std::map<std::size_t, Stat> raw_stats;
757 std::map<std::size_t, std::vector<char>> path_buffers;
760 source_txn(std::move(src)),
761 raw_results(source_txn.size())
763 for (zoo_op_result_t& x : raw_results)
769 return &raw_stats[idx];
776 path_buffers[idx] = std::vector<char>(sz);
777 return &path_buffers[idx];
780 void deliver(error_code rc)
784 if (rc == error_code::ok)
787 out.reserve(raw_results.size());
788 for (std::size_t idx = 0; idx < source_txn.size(); ++idx)
790 const auto& raw_res = raw_results[idx];
792 switch (source_txn[idx].type())
794 case op_type::create:
798 out.emplace_back(
set_result(stat_from_raw(*raw_res.stat)));
801 out.emplace_back(source_txn[idx].type(),
nullptr);
806 prom.set_value(std::move(out));
808 else if (is_api_error(rc))
811 auto iter = std::partition_point(raw_results.begin(), raw_results.end(),
812 [] (
auto res) {
return res.err == 0; }
823 prom.set_exception(std::current_exception());
828 future<multi_result> connection_zk::commit(
multi_op&& txn_in)
830 ::void_completion_t callback =
833 std::unique_ptr<connection_zk_commit_completer>
835 completer->deliver(error_code_from_raw(rc_in));
838 auto pcompleter = std::make_unique<connection_zk_commit_completer>(std::move(txn_in));
839 auto& txn = pcompleter->source_txn;
842 ::zoo_op raw_ops[txn.size()];
843 std::size_t create_op_count = 0;
844 std::size_t acl_piece_count = 0;
845 for (
const auto& tx : txn)
847 if (tx.type() == op_type::create)
850 acl_piece_count += tx.as_create().rules.size();
853 ACL_vector encoded_acls[create_op_count];
854 ACL acl_pieces[acl_piece_count];
855 ptr<ACL_vector> encoded_acl_iter = encoded_acls;
856 ptr<ACL> acl_piece_iter = acl_pieces;
858 for (std::size_t idx = 0; idx < txn.size(); ++idx)
860 auto& raw_op = raw_ops[idx];
861 auto& src_op = txn[idx];
862 switch (src_op.type())
865 zoo_check_op_init(&raw_op, src_op.as_check().path.c_str(), src_op.as_check().check.value);
867 case op_type::create:
869 const auto& cdata = src_op.as_create();
870 encoded_acl_iter->count = int(cdata.rules.size());
871 encoded_acl_iter->data = acl_piece_iter;
872 for (
const auto& acl : cdata.rules)
874 *acl_piece_iter = encode_acl_part(acl);
878 auto path_buf_ref = pcompleter->path_buffer_for(idx, cdata.path, cdata.mode);
879 zoo_create_op_init(&raw_op,
882 int(cdata.data.size()),
884 static_cast<int>(cdata.mode),
885 path_buf_ref->data(),
886 int(path_buf_ref->size())
892 zoo_delete_op_init(&raw_op, src_op.as_erase().path.c_str(), src_op.as_erase().check.value);
896 const auto& setting = src_op.as_set();
897 zoo_set_op_init(&raw_op,
898 setting.path.c_str(),
900 int(setting.data.size()),
902 pcompleter->raw_stat_at(idx)
908 using std::to_string;
909 throw std::invalid_argument(
"Invalid op_type at index=" + to_string(idx) +
": "
910 + to_string(src_op.type())
915 auto rc = error_code_from_raw(::zoo_amulti(_handle,
918 pcompleter->raw_results.data(),
923 if (rc == error_code::ok)
925 auto f = pcompleter->prom.get_future();
926 pcompleter.release();
931 pcompleter->prom.set_exception(get_exception_ptr_of(rc));
932 return pcompleter->prom.get_future();
937 pcompleter->prom.set_exception(std::current_exception());
938 return pcompleter->prom.get_future();
942 future<void> connection_zk::load_fence()
944 ::string_completion_t callback =
945 [] (
int rc_in, ptr<const char>, ptr<const void> prom_in)
947 std::unique_ptr<promise<void>> prom((
ptr<promise<void>>) prom_in);
948 auto rc = error_code_from_raw(rc_in);
949 if (rc == error_code::ok)
952 prom->set_exception(get_exception_ptr_of(rc));
955 auto ppromise = std::make_unique<std::promise<void>>();
956 auto rc = error_code_from_raw(::zoo_async(_handle,
"/", callback, ppromise.get()));
957 if (rc == error_code::ok)
959 auto f = ppromise->get_future();
965 ppromise->set_exception(get_exception_ptr_of(rc));
966 return ppromise->get_future();
970 void connection_zk::on_session_event_raw(ptr<zhandle_t> handle [[gnu::unused]],
973 ptr<const char> path_ptr,
974 ptr<void> watcher_ctx
977 auto self =
static_cast<ptr<connection_zk>
>(watcher_ctx);
981 assert(self->_handle ==
nullptr || self->_handle == handle);
982 auto ev = event_from_raw(ev_type);
983 auto st = state_from_raw(
state);
984 auto path = string_view(path_ptr);
989 std::cerr <<
"WARNING: Got unexpected event " << ev <<
" in state=" << st <<
" with path=" << path << std::endl;
992 self->on_session_event(st);
Data delivered when a watched event triggers.
T * ptr
A simple, unowned pointer.
This value is issued as part of an event when the state changes.
Describes the various result types of client operations.
constexpr bool is_set(create_mode self, create_mode flags)
Check that self has flags set.
The client is not connected to any server in the ensemble.
state
Enumeration of states the client may be at when a watch triggers.
The result type of client::watch_exists.
The result type of client::create.
The result type of client::exists.
The result type of client::watch.
Thrown from client::commit when a transaction cannot be committed to the system.
create_mode
When used in client::set, this value determines how the znode is created on the server.
The result type of client::watch_children.
The result type of client::set.
event_type
Enumeration of types of events that may occur.
The name of the znode will be appended with a monotonically increasing number.
The result type of client::get_children.
ZKPP_BUFFER_TYPE buffer
The buffer type.
The result type of client::get.
zk::acl_version acl_version
The number of changes to the ACL of the znode.