1 #include "connection_zk.hpp" 12 #include <system_error> 14 #include <zookeeper/zookeeper.h> 29 template <
typename FAction>
30 auto with_str(string_view src, FAction&& action)
31 -> decltype(std::forward<FAction>(action)(ptr<const char>()))
33 char buffer[src.size() + 1];
34 buffer[src.size()] =
'\0';
35 std::memcpy(buffer, src.data(), src.size());
36 return std::forward<FAction>(action)(buffer);
39 static ACL encode_acl_part(
const acl_rule& src)
42 out.perms =
static_cast<int>(src.permissions());
43 out.id.scheme =
const_cast<ptr<char>
>(src.scheme().c_str());
44 out.id.id =
const_cast<ptr<char>
>(src.id().c_str());
48 template <
typename FAction>
49 auto with_acl(
const acl& rules, FAction&& action)
50 -> decltype(std::forward<FAction>(action)(ptr<ACL_vector>()))
52 ACL parts[rules.size()];
53 for (std::size_t idx = 0; idx < rules.size(); ++idx)
54 parts[idx] = encode_acl_part(rules[idx]);
57 vec.count = int(rules.size());
59 return std::forward<FAction>(action)(&vec);
66 static error_code error_code_from_raw(
int raw)
68 return static_cast<error_code
>(raw);
76 static state state_from_raw(
int raw)
78 return static_cast<state>(raw);
81 static stat stat_from_raw(
const struct Stat& raw)
85 out.child_modified_transaction = transaction_id(raw.pzxid);
86 out.child_version = child_version(raw.cversion);
87 out.children_count = raw.numChildren;
88 out.create_time = stat::time_point() + std::chrono::milliseconds(raw.ctime);
89 out.create_transaction = transaction_id(raw.czxid);
90 out.data_size = raw.dataLength;
91 out.data_version = version(raw.version);
92 out.ephemeral_owner = raw.ephemeralOwner;
93 out.modified_time = stat::time_point() + std::chrono::milliseconds(raw.mtime);
94 out.modified_transaction = transaction_id(raw.mzxid);
98 static std::vector<std::string> string_vector_from_raw(
const struct String_vector& raw)
100 std::vector<std::string> out;
101 out.reserve(raw.count);
102 for (std::int32_t idx = 0; idx < raw.count; ++idx)
103 out.emplace_back(raw.data[idx]);
107 static acl acl_from_raw(
const struct ACL_vector& raw)
109 auto sz = std::size_t(raw.count);
113 for (std::size_t idx = 0; idx < sz; ++idx)
115 const auto& item = raw.data[idx];
116 out.emplace_back(item.id.scheme, item.id.id, static_cast<permission>(item.perms));
125 connection_zk::connection_zk(string_view conn_string, std::chrono::milliseconds recv_timeout) :
128 if (conn_string.find(
"zk://") != 0U)
129 throw std::invalid_argument(std::string(
"Invalid connection string \"") + std::string(conn_string) +
"\"");
130 conn_string.remove_prefix(5);
135 [&] (ptr<const char> conn_c_string)
137 return ::zookeeper_init(conn_c_string,
138 on_session_event_raw,
139 static_cast<int>(recv_timeout.count()),
147 std::system_error(errno, std::system_category(),
"Failed to create ZooKeeper client");
150 connection_zk::~connection_zk() noexcept
155 void connection_zk::close()
159 auto err = error_code_from_raw(::zookeeper_close(_handle));
160 if (err != error_code::ok)
170 return state_from_raw(::zoo_state(_handle));
175 future<get_result> connection_zk::get(string_view path)
177 ::data_completion_t callback =
178 [] (
int rc_in, ptr<const char> data,
int data_sz, ptr<const struct Stat> pstat, ptr<const void> prom_in) noexcept
180 std::unique_ptr<promise<get_result>> prom((
ptr<promise<get_result>>) prom_in);
181 auto rc = error_code_from_raw(rc_in);
182 if (rc == error_code::ok)
183 prom->set_value(get_result(
buffer(data, data + data_sz), stat_from_raw(*pstat)));
185 prom->set_exception(get_exception_ptr_of(rc));
188 return with_str(path, [&] (ptr<const char> path)
190 auto ppromise = std::make_unique<promise<get_result>>();
191 auto rc = error_code_from_raw(::zoo_aget(_handle, path, 0, callback, ppromise.get()));
192 if (rc == error_code::ok)
194 auto f = ppromise->get_future();
200 ppromise->set_exception(get_exception_ptr_of(rc));
201 return ppromise->get_future();
206 future<watch_result> connection_zk::watch(string_view path)
208 using watch_promises = std::pair<std::promise<watch_result>, std::promise<event>>;
210 ::data_completion_t data_callback =
211 [] (
int rc_in, ptr<const char> data,
int data_sz, ptr<const struct Stat> pstat, ptr<const void> prom_in) noexcept
213 std::unique_ptr<watch_promises> prom((ptr<watch_promises>) prom_in);
214 auto rc = error_code_from_raw(rc_in);
215 if (rc == error_code::ok)
217 prom->first.set_value(watch_result(get_result(
buffer(data, data + data_sz), stat_from_raw(*pstat)),
218 prom->second.get_future()
226 prom->first.set_exception(get_exception_ptr_of(rc));
230 ::watcher_fn watch_callback =
231 [] (ptr<zhandle_t>,
int type_in,
int state_in, ptr<const char>, ptr<void> proms_in)
233 std::unique_ptr<watch_promises> prom(
static_cast<ptr<watch_promises>
>(proms_in));
234 prom->second.set_value(event(event_from_raw(type_in), state_from_raw(state_in)));
237 return with_str(path, [&] (ptr<const char> path)
239 auto ppromises = std::make_unique<watch_promises>();
240 auto rc = error_code_from_raw(::zoo_awget(_handle,
248 if (rc == error_code::ok)
250 auto f = ppromises->first.get_future();
256 ppromises->first.set_exception(get_exception_ptr_of(rc));
257 return ppromises->first.get_future();
262 future<get_children_result> connection_zk::get_children(string_view path)
264 ::strings_stat_completion_t callback =
266 ptr<const struct String_vector> strings_in,
267 ptr<const struct Stat> stat_in,
268 ptr<const void> prom_in
271 std::unique_ptr<promise<get_children_result>> prom((
ptr<promise<get_children_result>>) prom_in);
272 auto rc = error_code_from_raw(rc_in);
275 if (rc != error_code::ok)
278 prom->set_value(get_children_result(string_vector_from_raw(*strings_in), stat_from_raw(*stat_in)));
282 prom->set_exception(std::current_exception());
286 return with_str(path, [&] (ptr<const char> path)
288 auto ppromise = std::make_unique<promise<get_children_result>>();
289 auto rc = error_code_from_raw(::zoo_aget_children2(_handle,
296 if (rc == error_code::ok)
298 auto f = ppromise->get_future();
304 ppromise->set_exception(get_exception_ptr_of(rc));
305 return ppromise->get_future();
310 future<watch_children_result> connection_zk::watch_children(string_view path)
312 using watch_promises = std::pair<std::promise<watch_children_result>, std::promise<event>>;
314 ::strings_stat_completion_t data_callback =
316 ptr<const struct String_vector> strings_in,
317 ptr<const struct Stat> stat_in,
318 ptr<const void> prom_in
321 std::unique_ptr<watch_promises> prom((ptr<watch_promises>) prom_in);
322 auto rc = error_code_from_raw(rc_in);
325 if (rc != error_code::ok)
328 prom->first.set_value(watch_children_result(get_children_result(string_vector_from_raw(*strings_in),
329 stat_from_raw(*stat_in)
331 prom->second.get_future()
338 prom->first.set_exception(std::current_exception());
342 ::watcher_fn watch_callback =
343 [] (ptr<zhandle_t>,
int type_in,
int state_in, ptr<const char>, ptr<void> proms_in)
345 std::unique_ptr<watch_promises> proms(
static_cast<ptr<watch_promises>
>(proms_in));
346 proms->second.set_value(event(event_from_raw(type_in), state_from_raw(state_in)));
349 return with_str(path, [&] (ptr<const char> path)
351 auto ppromises = std::make_unique<watch_promises>();
352 auto rc = error_code_from_raw(::zoo_awget_children2(_handle,
360 if (rc == error_code::ok)
362 auto f = ppromises->first.get_future();
368 ppromises->first.set_exception(get_exception_ptr_of(rc));
369 return ppromises->first.get_future();
374 future<exists_result> connection_zk::exists(string_view path)
376 ::stat_completion_t callback =
377 [] (
int rc_in, ptr<const struct Stat> stat_in, ptr<const void> prom_in)
379 std::unique_ptr<promise<exists_result>> prom((
ptr<promise<exists_result>>) prom_in);
380 auto rc = error_code_from_raw(rc_in);
381 if (rc == error_code::ok)
382 prom->set_value(exists_result(stat_from_raw(*stat_in)));
383 else if (rc == error_code::no_node)
384 prom->set_value(exists_result(nullopt));
386 prom->set_exception(get_exception_ptr_of(rc));
389 return with_str(path, [&] (ptr<const char> path)
391 auto ppromise = std::make_unique<promise<exists_result>>();
392 auto rc = error_code_from_raw(::zoo_aexists(_handle, path, 0, callback, ppromise.get()));
393 if (rc == error_code::ok)
395 auto f = ppromise->get_future();
401 ppromise->set_exception(get_exception_ptr_of(rc));
402 return ppromise->get_future();
407 future<watch_exists_result> connection_zk::watch_exists(string_view path)
409 using watch_promises = std::pair<std::promise<watch_exists_result>, std::promise<event>>;
411 ::stat_completion_t data_callback =
412 [] (
int rc_in, ptr<const struct Stat> stat_in, ptr<const void> proms_in)
414 std::unique_ptr<watch_promises> proms((ptr<watch_promises>) proms_in);
415 auto rc = error_code_from_raw(rc_in);
416 if (rc == error_code::ok)
418 proms->first.set_value(watch_exists_result(exists_result(stat_from_raw(*stat_in)),
419 proms->second.get_future()
424 else if (rc == error_code::no_node)
426 proms->first.set_value(watch_exists_result(exists_result(nullopt),
427 proms->second.get_future()
434 proms->first.set_exception(get_exception_ptr_of(rc));
438 ::watcher_fn watch_callback =
439 [] (ptr<zhandle_t>,
int type_in,
int state_in, ptr<const char>, ptr<void> proms_in)
441 std::unique_ptr<watch_promises> proms(
static_cast<ptr<watch_promises>
>(proms_in));
442 proms->second.set_value(event(event_from_raw(type_in), state_from_raw(state_in)));
445 return with_str(path, [&] (ptr<const char> path)
447 auto ppromises = std::make_unique<watch_promises>();
448 auto rc = error_code_from_raw(::zoo_awexists(_handle,
456 if (rc == error_code::ok)
458 auto f = ppromises->first.get_future();
464 ppromises->first.set_exception(get_exception_ptr_of(rc));
465 return ppromises->first.get_future();
470 future<create_result> connection_zk::create(string_view path,
476 ::string_completion_t callback =
477 [] (
int rc_in, ptr<const char> name_in, ptr<const void> prom_in)
479 std::unique_ptr<promise<create_result>> prom((
ptr<promise<create_result>>) prom_in);
480 auto rc = error_code_from_raw(rc_in);
481 if (rc == error_code::ok)
482 prom->set_value(create_result(std::string(name_in)));
484 prom->set_exception(get_exception_ptr_of(rc));
487 return with_str(path, [&] (ptr<const char> path)
489 auto ppromise = std::make_unique<promise<create_result>>();
490 auto rc = with_acl(rules, [&] (ptr<const ACL_vector> rules)
492 return error_code_from_raw(::zoo_acreate(_handle,
497 static_cast<int>(mode),
503 if (rc == error_code::ok)
505 auto f = ppromise->get_future();
511 ppromise->set_exception(get_exception_ptr_of(rc));
512 return ppromise->get_future();
517 future<set_result> connection_zk::set(string_view path,
const buffer& data, version check)
519 ::stat_completion_t callback =
520 [] (
int rc_in, ptr<const struct Stat> stat_raw, ptr<const void> prom_in)
522 std::unique_ptr<promise<set_result>> prom((
ptr<promise<set_result>>) prom_in);
523 auto rc = error_code_from_raw(rc_in);
524 if (rc == error_code::ok)
525 prom->set_value(set_result(stat_from_raw(*stat_raw)));
527 prom->set_exception(get_exception_ptr_of(rc));
530 return with_str(path, [&] (ptr<const char> path)
532 auto ppromise = std::make_unique<promise<set_result>>();
533 auto rc = error_code_from_raw(::zoo_aset(_handle,
541 if (rc == error_code::ok)
543 auto f = ppromise->get_future();
549 ppromise->set_exception(get_exception_ptr_of(rc));
550 return ppromise->get_future();
555 future<void> connection_zk::erase(string_view path, version check)
557 ::void_completion_t callback =
558 [] (
int rc_in, ptr<const void> prom_in)
560 std::unique_ptr<promise<void>> prom((
ptr<promise<void>>) prom_in);
561 auto rc = error_code_from_raw(rc_in);
562 if (rc == error_code::ok)
565 prom->set_exception(get_exception_ptr_of(rc));
568 return with_str(path, [&] (ptr<const char> path)
570 auto ppromise = std::make_unique<promise<void>>();
571 auto rc = error_code_from_raw(::zoo_adelete(_handle, path, check.value, callback, ppromise.get()));
572 if (rc == error_code::ok)
574 auto f = ppromise->get_future();
580 ppromise->set_exception(get_exception_ptr_of(rc));
581 return ppromise->get_future();
586 future<get_acl_result> connection_zk::get_acl(string_view path)
const 588 ::acl_completion_t callback =
589 [] (
int rc_in, ptr<struct ACL_vector> acl_raw, ptr<struct Stat> stat_raw, ptr<const void> prom_in) noexcept
591 std::unique_ptr<promise<get_acl_result>> prom((
ptr<promise<get_acl_result>>) prom_in);
592 auto rc = error_code_from_raw(rc_in);
593 if (rc == error_code::ok)
594 prom->set_value(get_acl_result(acl_from_raw(*acl_raw), stat_from_raw(*stat_raw)));
596 prom->set_exception(get_exception_ptr_of(rc));
599 return with_str(path, [&] (ptr<const char> path)
601 auto ppromise = std::make_unique<promise<get_acl_result>>();
602 auto rc = error_code_from_raw(::zoo_aget_acl(_handle, path, callback, ppromise.get()));
603 if (rc == error_code::ok)
605 auto f = ppromise->get_future();
611 ppromise->set_exception(get_exception_ptr_of(rc));
612 return ppromise->get_future();
617 future<void> connection_zk::set_acl(string_view path,
const acl& rules, acl_version check)
619 ::void_completion_t callback =
620 [] (
int rc_in, ptr<const void> prom_in)
622 std::unique_ptr<promise<void>> prom((
ptr<promise<void>>) prom_in);
623 auto rc = error_code_from_raw(rc_in);
624 if (rc == error_code::ok)
627 prom->set_exception(get_exception_ptr_of(rc));
630 return with_str(path, [&] (ptr<const char> path)
632 return with_acl(rules, [&] (ptr<struct ACL_vector> rules)
634 auto ppromise = std::make_unique<promise<void>>();
635 auto rc = error_code_from_raw(::zoo_aset_acl(_handle,
643 if (rc == error_code::ok)
645 auto f = ppromise->get_future();
651 ppromise->set_exception(get_exception_ptr_of(rc));
652 return ppromise->get_future();
661 promise<multi_result> prom;
662 std::vector<zoo_op_result_t> raw_results;
663 std::map<std::size_t, Stat> raw_stats;
664 std::map<std::size_t, std::vector<char>> path_buffers;
667 source_txn(std::move(src)),
668 raw_results(source_txn.size())
670 for (zoo_op_result_t& x : raw_results)
676 return &raw_stats[idx];
683 path_buffers[idx] = std::vector<char>(sz);
684 return &path_buffers[idx];
687 void deliver(error_code rc)
691 if (rc == error_code::ok)
694 out.reserve(raw_results.size());
695 for (std::size_t idx = 0; idx < source_txn.size(); ++idx)
697 const auto& raw_res = raw_results[idx];
699 switch (source_txn[idx].type())
701 case op_type::create:
705 out.emplace_back(
set_result(stat_from_raw(*raw_res.stat)));
708 out.emplace_back(source_txn[idx].type(),
nullptr);
713 prom.set_value(std::move(out));
715 else if (is_api_error(rc))
718 auto iter = std::partition_point(raw_results.begin(), raw_results.end(),
719 [] (
auto res) {
return res.err == 0; }
730 prom.set_exception(std::current_exception());
735 future<multi_result> connection_zk::commit(
multi_op&& txn_in)
737 ::void_completion_t callback =
740 std::unique_ptr<connection_zk_commit_completer>
742 completer->deliver(error_code_from_raw(rc_in));
745 auto pcompleter = std::make_unique<connection_zk_commit_completer>(std::move(txn_in));
746 auto& txn = pcompleter->source_txn;
749 ::zoo_op raw_ops[txn.size()];
750 std::size_t create_op_count = 0;
751 std::size_t acl_piece_count = 0;
752 for (
const auto& tx : txn)
754 if (tx.type() == op_type::create)
757 acl_piece_count += tx.as_create().rules.size();
760 ACL_vector encoded_acls[create_op_count];
761 ACL acl_pieces[acl_piece_count];
763 ptr<ACL> acl_piece_iter = acl_pieces;
765 for (std::size_t idx = 0; idx < txn.size(); ++idx)
767 auto& raw_op = raw_ops[idx];
768 auto& src_op = txn[idx];
769 switch (src_op.type())
772 zoo_check_op_init(&raw_op, src_op.as_check().path.c_str(), src_op.as_check().check.value);
774 case op_type::create:
776 const auto& cdata = src_op.as_create();
777 encoded_acl_iter->count = int(cdata.rules.size());
778 encoded_acl_iter->data = acl_piece_iter;
779 for (
const auto&
acl : cdata.rules)
781 *acl_piece_iter = encode_acl_part(
acl);
785 auto path_buf_ref = pcompleter->path_buffer_for(idx, cdata.path, cdata.mode);
786 zoo_create_op_init(&raw_op,
789 int(cdata.data.size()),
791 static_cast<int>(cdata.mode),
792 path_buf_ref->data(),
793 int(path_buf_ref->size())
799 zoo_delete_op_init(&raw_op, src_op.as_erase().path.c_str(), src_op.as_erase().check.value);
803 const auto& setting = src_op.as_set();
804 zoo_set_op_init(&raw_op,
805 setting.path.c_str(),
807 int(setting.data.size()),
809 pcompleter->raw_stat_at(idx)
815 using std::to_string;
816 throw std::invalid_argument(
"Invalid op_type at index=" + to_string(idx) +
": " 817 + to_string(src_op.type())
822 auto rc = error_code_from_raw(::zoo_amulti(_handle,
825 pcompleter->raw_results.data(),
830 if (rc == error_code::ok)
832 auto f = pcompleter->prom.get_future();
833 pcompleter.release();
838 pcompleter->prom.set_exception(get_exception_ptr_of(rc));
839 return pcompleter->prom.get_future();
844 pcompleter->prom.set_exception(std::current_exception());
845 return pcompleter->prom.get_future();
849 future<void> connection_zk::load_fence()
851 ::string_completion_t callback =
854 std::unique_ptr<promise<void>> prom((
ptr<promise<void>>) prom_in);
855 auto rc = error_code_from_raw(rc_in);
856 if (rc == error_code::ok)
859 prom->set_exception(get_exception_ptr_of(rc));
862 auto ppromise = std::make_unique<std::promise<void>>();
863 auto rc = error_code_from_raw(::zoo_async(_handle,
"/", callback, ppromise.get()));
864 if (rc == error_code::ok)
866 auto f = ppromise->get_future();
872 ppromise->set_exception(get_exception_ptr_of(rc));
873 return ppromise->get_future();
877 void connection_zk::on_session_event_raw(
ptr<zhandle_t> handle [[gnu::unused]],
880 ptr<const char> path_ptr,
888 assert(self->_handle ==
nullptr || self->_handle == handle);
889 auto ev = event_from_raw(ev_type);
890 auto st = state_from_raw(state);
891 auto path = string_view(path_ptr);
895 std::cerr <<
"WARNING: Got unexpected event " << ev <<
" in state=" << st <<
" with path=" << path << std::endl;
898 self->on_session_event(st);
T * ptr
A simple, unowned pointer.
This value is issued as part of an event when the state changes.
Describes the various result types of client operations.
constexpr bool is_set(create_mode self, create_mode flags)
Check that self has flags set.
The client is not connected to any server in the ensemble.
state
Enumeration of states the client may be at when a watch triggers.
The result type of client::create.
Thrown from client::commit when a transaction cannot be committed to the system.
create_mode
When used in client::set, this value determines how the znode is created on the server.
The result type of client::set.
event_type
Enumeration of types of events that may occur.
An access control list is a wrapper around acl_rule instances.
The name of the znode will be appended with a monotonically increasing number.
ZKPP_BUFFER_TYPE buffer
The buffer type.
zk::acl_version acl_version
The number of changes to the ACL of the znode.