zookeeper-cpp
ZooKeeper Client for C++
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Groups
connection_zk.cpp
1 #include "connection_zk.hpp"
2 
3 #include <algorithm>
4 #include <cassert>
5 #include <cerrno>
6 #include <cstring>
7 #include <iostream>
8 #include <map>
9 #include <memory>
10 #include <sstream>
11 #include <stdexcept>
12 #include <string>
13 #include <system_error>
14 #include <tuple>
15 
16 #include <zookeeper/zookeeper.h>
17 
18 #include "acl.hpp"
19 #include "error.hpp"
20 #include "multi.hpp"
21 #include "results.hpp"
22 #include "types.hpp"
23 
24 namespace zk
25 {
26 
28 // Utility Functions //
30 
31 template <typename FAction>
32 auto with_str(string_view src, FAction&& action)
33  -> decltype(std::forward<FAction>(action)(ptr<const char>()))
34 {
35  char buffer[src.size() + 1];
36  buffer[src.size()] = '\0';
37  std::memcpy(buffer, src.data(), src.size());
38  return std::forward<FAction>(action)(buffer);
39 }
40 
41 static ACL encode_acl_part(const acl_rule& src)
42 {
43  ACL out;
44  out.perms = static_cast<int>(src.permissions());
45  out.id.scheme = const_cast<ptr<char>>(src.scheme().c_str());
46  out.id.id = const_cast<ptr<char>>(src.id().c_str());
47  return out;
48 }
49 
50 template <typename FAction>
51 auto with_acl(const acl& rules, FAction&& action)
52  -> decltype(std::forward<FAction>(action)(ptr<ACL_vector>()))
53 {
54  ACL parts[rules.size()];
55  for (std::size_t idx = 0; idx < rules.size(); ++idx)
56  parts[idx] = encode_acl_part(rules[idx]);
57 
58  ACL_vector vec;
59  vec.count = int(rules.size());
60  vec.data = parts;
61  return std::forward<FAction>(action)(&vec);
62 }
63 
65 // Native Adaptors //
67 
68 static error_code error_code_from_raw(int raw)
69 {
70  return static_cast<error_code>(raw);
71 }
72 
73 static event_type event_from_raw(int raw)
74 {
75  return static_cast<event_type>(raw);
76 }
77 
78 static state state_from_raw(int raw)
79 {
80  return static_cast<state>(raw);
81 }
82 
83 static stat stat_from_raw(const struct Stat& raw)
84 {
85  stat out;
86  out.acl_version = acl_version(raw.aversion);
87  out.child_modified_transaction = transaction_id(raw.pzxid);
88  out.child_version = child_version(raw.cversion);
89  out.children_count = raw.numChildren;
90  out.create_time = stat::time_point() + std::chrono::milliseconds(raw.ctime);
91  out.create_transaction = transaction_id(raw.czxid);
92  out.data_size = raw.dataLength;
93  out.data_version = version(raw.version);
94  out.ephemeral_owner = raw.ephemeralOwner;
95  out.modified_time = stat::time_point() + std::chrono::milliseconds(raw.mtime);
96  out.modified_transaction = transaction_id(raw.mzxid);
97  return out;
98 }
99 
100 static std::vector<std::string> string_vector_from_raw(const struct String_vector& raw)
101 {
102  std::vector<std::string> out;
103  out.reserve(raw.count);
104  for (std::int32_t idx = 0; idx < raw.count; ++idx)
105  out.emplace_back(raw.data[idx]);
106  return out;
107 }
108 
109 static acl acl_from_raw(const struct ACL_vector& raw)
110 {
111  auto sz = std::size_t(raw.count);
112 
113  acl out;
114  out.reserve(sz);
115  for (std::size_t idx = 0; idx < sz; ++idx)
116  {
117  const auto& item = raw.data[idx];
118  out.emplace_back(item.id.scheme, item.id.id, static_cast<permission>(item.perms));
119  }
120  return out;
121 }
122 
124 // connection_zk //
126 
127 connection_zk::connection_zk(const connection_params& params) :
128  _handle(nullptr)
129 {
130  if (params.connection_schema() != "zk")
131  throw std::invalid_argument(std::string("Invalid connection string \"") + to_string(params) + "\"");
132 
133  auto conn_string = [&] ()
134  {
135  std::ostringstream os;
136  bool first = true;
137  for (const auto& host : params.hosts())
138  {
139  if (first)
140  first = false;
141  else
142  os << ',';
143 
144  os << host;
145  }
146  return os.str();
147  }();
148 
149  _handle = ::zookeeper_init(conn_string.c_str(),
150  on_session_event_raw,
151  static_cast<int>(params.timeout().count()),
152  nullptr,
153  this,
154  0
155  );
156 
157  if (!_handle)
158  std::system_error(errno, std::system_category(), "Failed to create ZooKeeper client");
159 }
160 
161 connection_zk::~connection_zk() noexcept
162 {
163  close();
164 }
165 
167 {
168 public:
169  watcher() :
170  _event_delivered(false)
171  { }
172 
173  virtual ~watcher() noexcept {}
174 
175  virtual void deliver_event(event ev)
176  {
177  if (!_event_delivered.exchange(true, std::memory_order_relaxed))
178  {
179  _event_promise.set_value(std::move(ev));
180  }
181  }
182 
183  future<event> get_event_future()
184  {
185  return _event_promise.get_future();
186  }
187 
188 protected:
189  std::atomic<bool> _event_delivered;
190  promise<event> _event_promise;
191 };
192 
193 template <typename TResult>
196 {
197 public:
198  basic_watcher() :
199  _data_delivered(false)
200  { }
201 
202  future<TResult> get_data_future()
203  {
204  return _data_promise.get_future();
205  }
206 
207  virtual void deliver_event(event ev) override
208  {
209  if (!_data_delivered.load(std::memory_order_relaxed))
210  {
211  deliver_data(nullopt, get_exception_ptr_of(error_code::closing));
212  }
213 
214  watcher::deliver_event(std::move(ev));
215  }
216 
217  void deliver_data(optional<TResult> data, std::exception_ptr ex_ptr)
218  {
219  if (!_data_delivered.exchange(true, std::memory_order_relaxed))
220  {
221  if (ex_ptr)
222  {
223  _data_promise.set_exception(std::move(ex_ptr));
224  }
225  else
226  {
227  _data_promise.set_value(std::move(*data));
228  }
229  }
230  }
231 
232 private:
233  std::atomic<bool> _data_delivered;
234  promise<TResult> _data_promise;
235 };
236 
237 std::shared_ptr<connection_zk::watcher> connection_zk::try_extract_watch(ptr<const void> addr)
238 {
239  std::unique_lock<std::mutex> ax(_watches_protect);
240  auto iter = _watches.find(addr);
241  if (iter != _watches.end())
242  return _watches.extract(iter).mapped();
243  else
244  return nullptr;
245 }
246 
247 static ptr<connection_zk> connection_from_context(ptr<zhandle_t> zh)
248 {
249  return (ptr<connection_zk>) zoo_get_context(zh);
250 }
251 
252 void connection_zk::deliver_watch(ptr<zhandle_t> zh,
253  int type_in,
254  int state_in,
255  ptr<const char> path [[gnu::unused]],
256  ptr<void> proms_in
257  )
258 {
259  auto& self = *connection_from_context(zh);
260  if (auto watcher = self.try_extract_watch(proms_in))
261  watcher->deliver_event(event(event_from_raw(type_in), state_from_raw(state_in)));
262 }
263 
264 void connection_zk::close()
265 {
266  if (_handle)
267  {
268  auto err = error_code_from_raw(::zookeeper_close(_handle));
269  if (err != error_code::ok)
270  throw_error(err);
271 
272  _handle = nullptr;
273 
274  // Deliver a session event as if there was a close.
275  std::unique_lock<std::mutex> ax(_watches_protect);
276  auto l_watches = std::move(_watches);
277  ax.unlock();
278  for (const auto& pair : l_watches)
279  pair.second->deliver_event(event(event_type::session, zk::state::closed));
280  }
281 }
282 
283 zk::state connection_zk::state() const
284 {
285  if (_handle)
286  return state_from_raw(::zoo_state(_handle));
287  else
288  return zk::state::closed;
289 }
290 
291 future<get_result> connection_zk::get(string_view path)
292 {
293  ::data_completion_t callback =
294  [] (int rc_in, ptr<const char> data, int data_sz, ptr<const struct Stat> pstat, ptr<const void> prom_in) noexcept
295  {
296  std::unique_ptr<promise<get_result>> prom((ptr<promise<get_result>>) prom_in);
297  auto rc = error_code_from_raw(rc_in);
298  if (rc == error_code::ok)
299  prom->set_value(get_result(buffer(data, data + data_sz), stat_from_raw(*pstat)));
300  else
301  prom->set_exception(get_exception_ptr_of(rc));
302  };
303 
304  return with_str(path, [&] (ptr<const char> path)
305  {
306  auto ppromise = std::make_unique<promise<get_result>>();
307  auto rc = error_code_from_raw(::zoo_aget(_handle, path, 0, callback, ppromise.get()));
308  if (rc == error_code::ok)
309  {
310  auto f = ppromise->get_future();
311  ppromise.release();
312  return f;
313  }
314  else
315  {
316  ppromise->set_exception(get_exception_ptr_of(rc));
317  return ppromise->get_future();
318  }
319  });
320 }
321 
323  public connection_zk::basic_watcher<watch_result>
324 {
325 public:
326  static void deliver_raw(int rc_in,
327  ptr<const char> data,
328  int data_sz,
330  ptr<const void> self_in
331  ) noexcept
332  {
333  auto& self = *static_cast<ptr<data_watcher>>(const_cast<ptr<void>>(self_in));
334  auto rc = error_code_from_raw(rc_in);
335 
336  if (rc == error_code::ok)
337  {
338  self.deliver_data(watch_result(get_result(buffer(data, data + data_sz), stat_from_raw(*pstat)),
339  self.get_event_future()
340  ),
341  std::exception_ptr()
342  );
343  }
344  else
345  {
346  self.deliver_data(nullopt, get_exception_ptr_of(rc));
347  }
348  }
349 };
350 
351 future<watch_result> connection_zk::watch(string_view path)
352 {
353  return with_str(path, [&] (ptr<const char> path)
354  {
355  std::unique_lock<std::mutex> ax(_watches_protect);
356  auto watcher = std::make_shared<data_watcher>();
357  auto rc = error_code_from_raw(::zoo_awget(_handle,
358  path,
359  deliver_watch,
360  watcher.get(),
361  data_watcher::deliver_raw,
362  watcher.get()
363  )
364  );
365  if (rc == error_code::ok)
366  _watches.emplace(watcher.get(), watcher);
367  else
368  watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
369 
370  return watcher->get_data_future();
371  });
372 }
373 
374 future<get_children_result> connection_zk::get_children(string_view path)
375 {
376  ::strings_stat_completion_t callback =
377  [] (int rc_in,
378  ptr<const struct String_vector> strings_in,
379  ptr<const struct Stat> stat_in,
380  ptr<const void> prom_in
381  )
382  {
383  std::unique_ptr<promise<get_children_result>> prom((ptr<promise<get_children_result>>) prom_in);
384  auto rc = error_code_from_raw(rc_in);
385  try
386  {
387  if (rc != error_code::ok)
388  throw_error(rc);
389 
390  prom->set_value(get_children_result(string_vector_from_raw(*strings_in), stat_from_raw(*stat_in)));
391  }
392  catch (...)
393  {
394  prom->set_exception(std::current_exception());
395  }
396  };
397 
398  return with_str(path, [&] (ptr<const char> path)
399  {
400  auto ppromise = std::make_unique<promise<get_children_result>>();
401  auto rc = error_code_from_raw(::zoo_aget_children2(_handle,
402  path,
403  0,
404  callback,
405  ppromise.get()
406  )
407  );
408  if (rc == error_code::ok)
409  {
410  auto f = ppromise->get_future();
411  ppromise.release();
412  return f;
413  }
414  else
415  {
416  ppromise->set_exception(get_exception_ptr_of(rc));
417  return ppromise->get_future();
418  }
419  });
420 }
421 
423  public connection_zk::basic_watcher<watch_children_result>
424 {
425 public:
426  static void deliver_raw(int rc_in,
428  ptr<const struct Stat> stat_in,
429  ptr<const void> prom_in
430  ) noexcept
431  {
432  auto& self = *static_cast<ptr<child_watcher>>(const_cast<ptr<void>>(prom_in));
433  auto rc = error_code_from_raw(rc_in);
434 
435  try
436  {
437  if (rc != error_code::ok)
438  throw_error(rc);
439 
440  self.deliver_data(watch_children_result(get_children_result(string_vector_from_raw(*strings_in),
441  stat_from_raw(*stat_in)
442  ),
443  self.get_event_future()
444  ),
445  std::exception_ptr()
446  );
447  }
448  catch (...)
449  {
450  self.deliver_data(nullopt, std::current_exception());
451  }
452 
453  }
454 };
455 
456 future<watch_children_result> connection_zk::watch_children(string_view path)
457 {
458  return with_str(path, [&] (ptr<const char> path)
459  {
460  std::unique_lock<std::mutex> ax(_watches_protect);
461  auto watcher = std::make_shared<child_watcher>();
462  auto rc = error_code_from_raw(::zoo_awget_children2(_handle,
463  path,
464  deliver_watch,
465  watcher.get(),
466  child_watcher::deliver_raw,
467  watcher.get()
468  )
469  );
470  if (rc == error_code::ok)
471  _watches.emplace(watcher.get(), watcher);
472  else
473  watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
474 
475  return watcher->get_data_future();
476  });
477 }
478 
479 future<exists_result> connection_zk::exists(string_view path)
480 {
481  ::stat_completion_t callback =
482  [] (int rc_in, ptr<const struct Stat> stat_in, ptr<const void> prom_in)
483  {
484  std::unique_ptr<promise<exists_result>> prom((ptr<promise<exists_result>>) prom_in);
485  auto rc = error_code_from_raw(rc_in);
486  if (rc == error_code::ok)
487  prom->set_value(exists_result(stat_from_raw(*stat_in)));
488  else if (rc == error_code::no_node)
489  prom->set_value(exists_result(nullopt));
490  else
491  prom->set_exception(get_exception_ptr_of(rc));
492  };
493 
494  return with_str(path, [&] (ptr<const char> path)
495  {
496  auto ppromise = std::make_unique<promise<exists_result>>();
497  auto rc = error_code_from_raw(::zoo_aexists(_handle, path, 0, callback, ppromise.get()));
498  if (rc == error_code::ok)
499  {
500  auto f = ppromise->get_future();
501  ppromise.release();
502  return f;
503  }
504  else
505  {
506  ppromise->set_exception(get_exception_ptr_of(rc));
507  return ppromise->get_future();
508  }
509  });
510 }
511 
513  public connection_zk::basic_watcher<watch_exists_result>
514 {
515 public:
516  static void deliver_raw(int rc_in, ptr<const struct Stat> stat_in, ptr<const void> self_in)
517  {
518  auto& self = *static_cast<ptr<exists_watcher>>(const_cast<ptr<void>>(self_in));
519  auto rc = error_code_from_raw(rc_in);
520 
521  if (rc == error_code::ok)
522  {
523  self.deliver_data(watch_exists_result(exists_result(stat_from_raw(*stat_in)), self.get_event_future()),
524  std::exception_ptr()
525  );
526  }
527  else if (rc == error_code::no_node)
528  {
529  self.deliver_data(watch_exists_result(exists_result(nullopt), self.get_event_future()),
530  std::exception_ptr()
531  );
532  }
533  else
534  {
535  self.deliver_data(nullopt, get_exception_ptr_of(rc));
536  }
537  }
538 };
539 
540 future<watch_exists_result> connection_zk::watch_exists(string_view path)
541 {
542  return with_str(path, [&] (ptr<const char> path)
543  {
544  std::unique_lock<std::mutex> ax(_watches_protect);
545  auto watcher = std::make_shared<exists_watcher>();
546  auto rc = error_code_from_raw(::zoo_awexists(_handle,
547  path,
548  deliver_watch,
549  watcher.get(),
550  exists_watcher::deliver_raw,
551  watcher.get()
552  )
553  );
554  if (rc == error_code::ok)
555  _watches.emplace(watcher.get(), watcher);
556  else
557  watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
558 
559  return watcher->get_data_future();
560  });
561 }
562 
563 future<create_result> connection_zk::create(string_view path,
564  const buffer& data,
565  const acl& rules,
566  create_mode mode
567  )
568 {
569  ::string_completion_t callback =
570  [] (int rc_in, ptr<const char> name_in, ptr<const void> prom_in)
571  {
572  std::unique_ptr<promise<create_result>> prom((ptr<promise<create_result>>) prom_in);
573  auto rc = error_code_from_raw(rc_in);
574  if (rc == error_code::ok)
575  prom->set_value(create_result(std::string(name_in)));
576  else
577  prom->set_exception(get_exception_ptr_of(rc));
578  };
579 
580  return with_str(path, [&] (ptr<const char> path)
581  {
582  auto ppromise = std::make_unique<promise<create_result>>();
583  auto rc = with_acl(rules, [&] (ptr<const ACL_vector> rules)
584  {
585  return error_code_from_raw(::zoo_acreate(_handle,
586  path,
587  data.data(),
588  int(data.size()),
589  rules,
590  static_cast<int>(mode),
591  callback,
592  ppromise.get()
593  )
594  );
595  });
596  if (rc == error_code::ok)
597  {
598  auto f = ppromise->get_future();
599  ppromise.release();
600  return f;
601  }
602  else
603  {
604  ppromise->set_exception(get_exception_ptr_of(rc));
605  return ppromise->get_future();
606  }
607  });
608 }
609 
610 future<set_result> connection_zk::set(string_view path, const buffer& data, version check)
611 {
612  ::stat_completion_t callback =
613  [] (int rc_in, ptr<const struct Stat> stat_raw, ptr<const void> prom_in)
614  {
615  std::unique_ptr<promise<set_result>> prom((ptr<promise<set_result>>) prom_in);
616  auto rc = error_code_from_raw(rc_in);
617  if (rc == error_code::ok)
618  prom->set_value(set_result(stat_from_raw(*stat_raw)));
619  else
620  prom->set_exception(get_exception_ptr_of(rc));
621  };
622 
623  return with_str(path, [&] (ptr<const char> path)
624  {
625  auto ppromise = std::make_unique<promise<set_result>>();
626  auto rc = error_code_from_raw(::zoo_aset(_handle,
627  path,
628  data.data(),
629  int(data.size()),
630  check.value,
631  callback,
632  ppromise.get()
633  ));
634  if (rc == error_code::ok)
635  {
636  auto f = ppromise->get_future();
637  ppromise.release();
638  return f;
639  }
640  else
641  {
642  ppromise->set_exception(get_exception_ptr_of(rc));
643  return ppromise->get_future();
644  }
645  });
646 }
647 
648 future<void> connection_zk::erase(string_view path, version check)
649 {
650  ::void_completion_t callback =
651  [] (int rc_in, ptr<const void> prom_in)
652  {
653  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
654  auto rc = error_code_from_raw(rc_in);
655  if (rc == error_code::ok)
656  prom->set_value();
657  else
658  prom->set_exception(get_exception_ptr_of(rc));
659  };
660 
661  return with_str(path, [&] (ptr<const char> path)
662  {
663  auto ppromise = std::make_unique<promise<void>>();
664  auto rc = error_code_from_raw(::zoo_adelete(_handle, path, check.value, callback, ppromise.get()));
665  if (rc == error_code::ok)
666  {
667  auto f = ppromise->get_future();
668  ppromise.release();
669  return f;
670  }
671  else
672  {
673  ppromise->set_exception(get_exception_ptr_of(rc));
674  return ppromise->get_future();
675  }
676  });
677 }
678 
679 future<get_acl_result> connection_zk::get_acl(string_view path) const
680 {
681  ::acl_completion_t callback =
682  [] (int rc_in, ptr<struct ACL_vector> acl_raw, ptr<struct Stat> stat_raw, ptr<const void> prom_in) noexcept
683  {
684  std::unique_ptr<promise<get_acl_result>> prom((ptr<promise<get_acl_result>>) prom_in);
685  auto rc = error_code_from_raw(rc_in);
686  if (rc == error_code::ok)
687  prom->set_value(get_acl_result(acl_from_raw(*acl_raw), stat_from_raw(*stat_raw)));
688  else
689  prom->set_exception(get_exception_ptr_of(rc));
690  };
691 
692  return with_str(path, [&] (ptr<const char> path)
693  {
694  auto ppromise = std::make_unique<promise<get_acl_result>>();
695  auto rc = error_code_from_raw(::zoo_aget_acl(_handle, path, callback, ppromise.get()));
696  if (rc == error_code::ok)
697  {
698  auto f = ppromise->get_future();
699  ppromise.release();
700  return f;
701  }
702  else
703  {
704  ppromise->set_exception(get_exception_ptr_of(rc));
705  return ppromise->get_future();
706  }
707  });
708 }
709 
710 future<void> connection_zk::set_acl(string_view path, const acl& rules, acl_version check)
711 {
712  ::void_completion_t callback =
713  [] (int rc_in, ptr<const void> prom_in)
714  {
715  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
716  auto rc = error_code_from_raw(rc_in);
717  if (rc == error_code::ok)
718  prom->set_value();
719  else
720  prom->set_exception(get_exception_ptr_of(rc));
721  };
722 
723  return with_str(path, [&] (ptr<const char> path)
724  {
725  return with_acl(rules, [&] (ptr<struct ACL_vector> rules)
726  {
727  auto ppromise = std::make_unique<promise<void>>();
728  auto rc = error_code_from_raw(::zoo_aset_acl(_handle,
729  path,
730  check.value,
731  rules,
732  callback,
733  ppromise.get()
734  )
735  );
736  if (rc == error_code::ok)
737  {
738  auto f = ppromise->get_future();
739  ppromise.release();
740  return f;
741  }
742  else
743  {
744  ppromise->set_exception(get_exception_ptr_of(rc));
745  return ppromise->get_future();
746  }
747  });
748  });
749 }
750 
752 {
753  multi_op source_txn;
754  promise<multi_result> prom;
755  std::vector<zoo_op_result_t> raw_results;
756  std::map<std::size_t, Stat> raw_stats;
757  std::map<std::size_t, std::vector<char>> path_buffers;
758 
759  explicit connection_zk_commit_completer(multi_op&& src) :
760  source_txn(std::move(src)),
761  raw_results(source_txn.size())
762  {
763  for (zoo_op_result_t& x : raw_results)
764  x.err = -42;
765  }
766 
767  ptr<Stat> raw_stat_at(std::size_t idx)
768  {
769  return &raw_stats[idx];
770  }
771 
772  ptr<std::vector<char>> path_buffer_for(std::size_t idx, const std::string& path, create_mode mode)
773  {
774  // If the creation is sequential, append 12 extra characters to store the digits
775  auto sz = path.size() + (is_set(mode, create_mode::sequential) ? 12 : 1);
776  path_buffers[idx] = std::vector<char>(sz);
777  return &path_buffers[idx];
778  }
779 
780  void deliver(error_code rc)
781  {
782  try
783  {
784  if (rc == error_code::ok)
785  {
786  multi_result out;
787  out.reserve(raw_results.size());
788  for (std::size_t idx = 0; idx < source_txn.size(); ++idx)
789  {
790  const auto& raw_res = raw_results[idx];
791 
792  switch (source_txn[idx].type())
793  {
794  case op_type::create:
795  out.emplace_back(create_result(std::string(raw_res.value)));
796  break;
797  case op_type::set:
798  out.emplace_back(set_result(stat_from_raw(*raw_res.stat)));
799  break;
800  default:
801  out.emplace_back(source_txn[idx].type(), nullptr);
802  break;
803  }
804  }
805 
806  prom.set_value(std::move(out));
807  }
808  else if (is_api_error(rc))
809  {
810  // All results until the failure are 0, equal to rc where we care, and runtime_inconsistency after that.
811  auto iter = std::partition_point(raw_results.begin(), raw_results.end(),
812  [] (auto res) { return res.err == 0; }
813  );
814  throw transaction_failed(rc, std::size_t(std::distance(raw_results.begin(), iter)));
815  }
816  else
817  {
818  throw_error(rc);
819  }
820  }
821  catch (...)
822  {
823  prom.set_exception(std::current_exception());
824  }
825  }
826 };
827 
828 future<multi_result> connection_zk::commit(multi_op&& txn_in)
829 {
830  ::void_completion_t callback =
831  [] (int rc_in, ptr<const void> completer_in)
832  {
833  std::unique_ptr<connection_zk_commit_completer>
834  completer((ptr<connection_zk_commit_completer>) completer_in);
835  completer->deliver(error_code_from_raw(rc_in));
836  };
837 
838  auto pcompleter = std::make_unique<connection_zk_commit_completer>(std::move(txn_in));
839  auto& txn = pcompleter->source_txn;
840  try
841  {
842  ::zoo_op raw_ops[txn.size()];
843  std::size_t create_op_count = 0;
844  std::size_t acl_piece_count = 0;
845  for (const auto& tx : txn)
846  {
847  if (tx.type() == op_type::create)
848  {
849  ++create_op_count;
850  acl_piece_count += tx.as_create().rules.size();
851  }
852  }
853  ACL_vector encoded_acls[create_op_count];
854  ACL acl_pieces[acl_piece_count];
855  ptr<ACL_vector> encoded_acl_iter = encoded_acls;
856  ptr<ACL> acl_piece_iter = acl_pieces;
857 
858  for (std::size_t idx = 0; idx < txn.size(); ++idx)
859  {
860  auto& raw_op = raw_ops[idx];
861  auto& src_op = txn[idx];
862  switch (src_op.type())
863  {
864  case op_type::check:
865  zoo_check_op_init(&raw_op, src_op.as_check().path.c_str(), src_op.as_check().check.value);
866  break;
867  case op_type::create:
868  {
869  const auto& cdata = src_op.as_create();
870  encoded_acl_iter->count = int(cdata.rules.size());
871  encoded_acl_iter->data = acl_piece_iter;
872  for (const auto& acl : cdata.rules)
873  {
874  *acl_piece_iter = encode_acl_part(acl);
875  ++acl_piece_iter;
876  }
877 
878  auto path_buf_ref = pcompleter->path_buffer_for(idx, cdata.path, cdata.mode);
879  zoo_create_op_init(&raw_op,
880  cdata.path.c_str(),
881  cdata.data.data(),
882  int(cdata.data.size()),
883  encoded_acl_iter,
884  static_cast<int>(cdata.mode),
885  path_buf_ref->data(),
886  int(path_buf_ref->size())
887  );
888  ++encoded_acl_iter;
889  break;
890  }
891  case op_type::erase:
892  zoo_delete_op_init(&raw_op, src_op.as_erase().path.c_str(), src_op.as_erase().check.value);
893  break;
894  case op_type::set:
895  {
896  const auto& setting = src_op.as_set();
897  zoo_set_op_init(&raw_op,
898  setting.path.c_str(),
899  setting.data.data(),
900  int(setting.data.size()),
901  setting.check.value,
902  pcompleter->raw_stat_at(idx)
903  );
904  break;
905  }
906  default:
907  {
908  using std::to_string;
909  throw std::invalid_argument("Invalid op_type at index=" + to_string(idx) + ": "
910  + to_string(src_op.type())
911  );
912  }
913  }
914  }
915  auto rc = error_code_from_raw(::zoo_amulti(_handle,
916  int(txn.size()),
917  raw_ops,
918  pcompleter->raw_results.data(),
919  callback,
920  pcompleter.get()
921  )
922  );
923  if (rc == error_code::ok)
924  {
925  auto f = pcompleter->prom.get_future();
926  pcompleter.release();
927  return f;
928  }
929  else
930  {
931  pcompleter->prom.set_exception(get_exception_ptr_of(rc));
932  return pcompleter->prom.get_future();
933  }
934  }
935  catch (...)
936  {
937  pcompleter->prom.set_exception(std::current_exception());
938  return pcompleter->prom.get_future();
939  }
940 }
941 
942 future<void> connection_zk::load_fence()
943 {
944  ::string_completion_t callback =
945  [] (int rc_in, ptr<const char>, ptr<const void> prom_in)
946  {
947  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
948  auto rc = error_code_from_raw(rc_in);
949  if (rc == error_code::ok)
950  prom->set_value();
951  else
952  prom->set_exception(get_exception_ptr_of(rc));
953  };
954 
955  auto ppromise = std::make_unique<std::promise<void>>();
956  auto rc = error_code_from_raw(::zoo_async(_handle, "/", callback, ppromise.get()));
957  if (rc == error_code::ok)
958  {
959  auto f = ppromise->get_future();
960  ppromise.release();
961  return f;
962  }
963  else
964  {
965  ppromise->set_exception(get_exception_ptr_of(rc));
966  return ppromise->get_future();
967  }
968 }
969 
970 void connection_zk::on_session_event_raw(ptr<zhandle_t> handle [[gnu::unused]],
971  int ev_type,
972  int state,
973  ptr<const char> path_ptr,
974  ptr<void> watcher_ctx
975  ) noexcept
976 {
977  auto self = static_cast<ptr<connection_zk>>(watcher_ctx);
978  // Most of the time, self's _handle will be the same thing that ZK provides to us. However, if we connect very
979  // quickly, a session event will happen trigger *before* we set the _handle. This isn't a problem, just something to
980  // be aware of.
981  assert(self->_handle == nullptr || self->_handle == handle);
982  auto ev = event_from_raw(ev_type);
983  auto st = state_from_raw(state);
984  auto path = string_view(path_ptr);
985 
986  if (ev != event_type::session)
987  {
988  // TODO: Remove this usage of std::cerr
989  std::cerr << "WARNING: Got unexpected event " << ev << " in state=" << st << " with path=" << path << std::endl;
990  return;
991  }
992  self->on_session_event(st);
993 }
994 
995 }
Data delivered when a watched event triggers.
Definition: results.hpp:171
T * ptr
A simple, unowned pointer.
Definition: config.hpp:75
This value is issued as part of an event when the state changes.
Describes the various result types of client operations.
constexpr bool is_set(create_mode self, create_mode flags)
Check that self has flags set.
Definition: types.hpp:288
The client is not connected to any server in the ensemble.
state
Enumeration of states the client may be at when a watch triggers.
Definition: types.hpp:320
The result type of client::watch_exists.
Definition: results.hpp:252
The result type of client::create.
Definition: results.hpp:99
The result type of client::exists.
Definition: results.hpp:77
The result type of client::watch.
Definition: results.hpp:194
Thrown from client::commit when a transaction cannot be committed to the system.
Definition: error.hpp:370
create_mode
When used in client::set, this value determines how the znode is created on the server.
Definition: types.hpp:252
The result type of client::watch_children.
Definition: results.hpp:223
The result type of client::set.
Definition: results.hpp:119
event_type
Enumeration of types of events that may occur.
Definition: types.hpp:298
The name of the znode will be appended with a monotonically increasing number.
The result type of client::get_children.
Definition: results.hpp:50
ZKPP_BUFFER_TYPE buffer
The buffer type.
Definition: buffer.hpp:67
The result type of client::get.
Definition: results.hpp:26
zk::acl_version acl_version
The number of changes to the ACL of the znode.
Definition: types.hpp:227