zookeeper-cpp
ZooKeeper Client for C++
connection_zk.cpp
1 #include "connection_zk.hpp"
2 
3 #include <algorithm>
4 #include <cassert>
5 #include <cerrno>
6 #include <cstring>
7 #include <iostream>
8 #include <map>
9 #include <memory>
10 #include <stdexcept>
11 #include <string>
12 #include <system_error>
13 
14 #include <zookeeper/zookeeper.h>
15 
16 #include "acl.hpp"
17 #include "error.hpp"
18 #include "multi.hpp"
19 #include "results.hpp"
20 #include "types.hpp"
21 
22 namespace zk
23 {
24 
26 // Utility Functions //
28 
29 template <typename FAction>
30 auto with_str(string_view src, FAction&& action)
31  -> decltype(std::forward<FAction>(action)(ptr<const char>()))
32 {
33  char buffer[src.size() + 1];
34  buffer[src.size()] = '\0';
35  std::memcpy(buffer, src.data(), src.size());
36  return std::forward<FAction>(action)(buffer);
37 }
38 
39 static ACL encode_acl_part(const acl_rule& src)
40 {
41  ACL out;
42  out.perms = static_cast<int>(src.permissions());
43  out.id.scheme = const_cast<ptr<char>>(src.scheme().c_str());
44  out.id.id = const_cast<ptr<char>>(src.id().c_str());
45  return out;
46 }
47 
48 template <typename FAction>
49 auto with_acl(const acl& rules, FAction&& action)
50  -> decltype(std::forward<FAction>(action)(ptr<ACL_vector>()))
51 {
52  ACL parts[rules.size()];
53  for (std::size_t idx = 0; idx < rules.size(); ++idx)
54  parts[idx] = encode_acl_part(rules[idx]);
55 
56  ACL_vector vec;
57  vec.count = int(rules.size());
58  vec.data = parts;
59  return std::forward<FAction>(action)(&vec);
60 }
61 
63 // Native Adaptors //
65 
66 static error_code error_code_from_raw(int raw)
67 {
68  return static_cast<error_code>(raw);
69 }
70 
71 static event_type event_from_raw(int raw)
72 {
73  return static_cast<event_type>(raw);
74 }
75 
76 static state state_from_raw(int raw)
77 {
78  return static_cast<state>(raw);
79 }
80 
81 static stat stat_from_raw(const struct Stat& raw)
82 {
83  stat out;
84  out.acl_version = acl_version(raw.aversion);
85  out.child_modified_transaction = transaction_id(raw.pzxid);
86  out.child_version = child_version(raw.cversion);
87  out.children_count = raw.numChildren;
88  out.create_time = stat::time_point() + std::chrono::milliseconds(raw.ctime);
89  out.create_transaction = transaction_id(raw.czxid);
90  out.data_size = raw.dataLength;
91  out.data_version = version(raw.version);
92  out.ephemeral_owner = raw.ephemeralOwner;
93  out.modified_time = stat::time_point() + std::chrono::milliseconds(raw.mtime);
94  out.modified_transaction = transaction_id(raw.mzxid);
95  return out;
96 }
97 
98 static std::vector<std::string> string_vector_from_raw(const struct String_vector& raw)
99 {
100  std::vector<std::string> out;
101  out.reserve(raw.count);
102  for (std::int32_t idx = 0; idx < raw.count; ++idx)
103  out.emplace_back(raw.data[idx]);
104  return out;
105 }
106 
107 static acl acl_from_raw(const struct ACL_vector& raw)
108 {
109  auto sz = std::size_t(raw.count);
110 
111  acl out;
112  out.reserve(sz);
113  for (std::size_t idx = 0; idx < sz; ++idx)
114  {
115  const auto& item = raw.data[idx];
116  out.emplace_back(item.id.scheme, item.id.id, static_cast<permission>(item.perms));
117  }
118  return out;
119 }
120 
122 // connection_zk //
124 
125 connection_zk::connection_zk(string_view conn_string, std::chrono::milliseconds recv_timeout) :
126  _handle(nullptr)
127 {
128  if (conn_string.find("zk://") != 0U)
129  throw std::invalid_argument(std::string("Invalid connection string \"") + std::string(conn_string) + "\"");
130  conn_string.remove_prefix(5);
131 
132  _handle = with_str
133  (
134  conn_string,
135  [&] (ptr<const char> conn_c_string)
136  {
137  return ::zookeeper_init(conn_c_string,
138  on_session_event_raw,
139  static_cast<int>(recv_timeout.count()),
140  nullptr,
141  this,
142  0
143  );
144  }
145  );
146  if (!_handle)
147  std::system_error(errno, std::system_category(), "Failed to create ZooKeeper client");
148 }
149 
150 connection_zk::~connection_zk() noexcept
151 {
152  close();
153 }
154 
155 void connection_zk::close()
156 {
157  if (_handle)
158  {
159  auto err = error_code_from_raw(::zookeeper_close(_handle));
160  if (err != error_code::ok)
161  throw_error(err);
162 
163  _handle = nullptr;
164  }
165 }
166 
167 zk::state connection_zk::state() const
168 {
169  if (_handle)
170  return state_from_raw(::zoo_state(_handle));
171  else
172  return zk::state::closed;
173 }
174 
175 future<get_result> connection_zk::get(string_view path)
176 {
177  ::data_completion_t callback =
178  [] (int rc_in, ptr<const char> data, int data_sz, ptr<const struct Stat> pstat, ptr<const void> prom_in) noexcept
179  {
180  std::unique_ptr<promise<get_result>> prom((ptr<promise<get_result>>) prom_in);
181  auto rc = error_code_from_raw(rc_in);
182  if (rc == error_code::ok)
183  prom->set_value(get_result(buffer(data, data + data_sz), stat_from_raw(*pstat)));
184  else
185  prom->set_exception(get_exception_ptr_of(rc));
186  };
187 
188  return with_str(path, [&] (ptr<const char> path)
189  {
190  auto ppromise = std::make_unique<promise<get_result>>();
191  auto rc = error_code_from_raw(::zoo_aget(_handle, path, 0, callback, ppromise.get()));
192  if (rc == error_code::ok)
193  {
194  auto f = ppromise->get_future();
195  ppromise.release();
196  return f;
197  }
198  else
199  {
200  ppromise->set_exception(get_exception_ptr_of(rc));
201  return ppromise->get_future();
202  }
203  });
204 }
205 
206 future<watch_result> connection_zk::watch(string_view path)
207 {
208  using watch_promises = std::pair<std::promise<watch_result>, std::promise<event>>;
209 
210  ::data_completion_t data_callback =
211  [] (int rc_in, ptr<const char> data, int data_sz, ptr<const struct Stat> pstat, ptr<const void> prom_in) noexcept
212  {
213  std::unique_ptr<watch_promises> prom((ptr<watch_promises>) prom_in);
214  auto rc = error_code_from_raw(rc_in);
215  if (rc == error_code::ok)
216  {
217  prom->first.set_value(watch_result(get_result(buffer(data, data + data_sz), stat_from_raw(*pstat)),
218  prom->second.get_future()
219  )
220  );
221  // Since there was no error, we know the watch will be triggered
222  prom.release();
223  }
224  else
225  {
226  prom->first.set_exception(get_exception_ptr_of(rc));
227  }
228  };
229 
230  ::watcher_fn watch_callback =
231  [] (ptr<zhandle_t>, int type_in, int state_in, ptr<const char>, ptr<void> proms_in)
232  {
233  std::unique_ptr<watch_promises> prom(static_cast<ptr<watch_promises>>(proms_in));
234  prom->second.set_value(event(event_from_raw(type_in), state_from_raw(state_in)));
235  };
236 
237  return with_str(path, [&] (ptr<const char> path)
238  {
239  auto ppromises = std::make_unique<watch_promises>();
240  auto rc = error_code_from_raw(::zoo_awget(_handle,
241  path,
242  watch_callback,
243  ppromises.get(),
244  data_callback,
245  ppromises.get()
246  )
247  );
248  if (rc == error_code::ok)
249  {
250  auto f = ppromises->first.get_future();
251  ppromises.release();
252  return f;
253  }
254  else
255  {
256  ppromises->first.set_exception(get_exception_ptr_of(rc));
257  return ppromises->first.get_future();
258  }
259  });
260 }
261 
262 future<get_children_result> connection_zk::get_children(string_view path)
263 {
264  ::strings_stat_completion_t callback =
265  [] (int rc_in,
266  ptr<const struct String_vector> strings_in,
267  ptr<const struct Stat> stat_in,
268  ptr<const void> prom_in
269  )
270  {
271  std::unique_ptr<promise<get_children_result>> prom((ptr<promise<get_children_result>>) prom_in);
272  auto rc = error_code_from_raw(rc_in);
273  try
274  {
275  if (rc != error_code::ok)
276  throw_error(rc);
277 
278  prom->set_value(get_children_result(string_vector_from_raw(*strings_in), stat_from_raw(*stat_in)));
279  }
280  catch (...)
281  {
282  prom->set_exception(std::current_exception());
283  }
284  };
285 
286  return with_str(path, [&] (ptr<const char> path)
287  {
288  auto ppromise = std::make_unique<promise<get_children_result>>();
289  auto rc = error_code_from_raw(::zoo_aget_children2(_handle,
290  path,
291  0,
292  callback,
293  ppromise.get()
294  )
295  );
296  if (rc == error_code::ok)
297  {
298  auto f = ppromise->get_future();
299  ppromise.release();
300  return f;
301  }
302  else
303  {
304  ppromise->set_exception(get_exception_ptr_of(rc));
305  return ppromise->get_future();
306  }
307  });
308 }
309 
310 future<watch_children_result> connection_zk::watch_children(string_view path)
311 {
312  using watch_promises = std::pair<std::promise<watch_children_result>, std::promise<event>>;
313 
314  ::strings_stat_completion_t data_callback =
315  [] (int rc_in,
316  ptr<const struct String_vector> strings_in,
317  ptr<const struct Stat> stat_in,
318  ptr<const void> prom_in
319  )
320  {
321  std::unique_ptr<watch_promises> prom((ptr<watch_promises>) prom_in);
322  auto rc = error_code_from_raw(rc_in);
323  try
324  {
325  if (rc != error_code::ok)
326  throw_error(rc);
327 
328  prom->first.set_value(watch_children_result(get_children_result(string_vector_from_raw(*strings_in),
329  stat_from_raw(*stat_in)
330  ),
331  prom->second.get_future()
332  )
333  );
334  prom.release();
335  }
336  catch (...)
337  {
338  prom->first.set_exception(std::current_exception());
339  }
340  };
341 
342  ::watcher_fn watch_callback =
343  [] (ptr<zhandle_t>, int type_in, int state_in, ptr<const char>, ptr<void> proms_in)
344  {
345  std::unique_ptr<watch_promises> proms(static_cast<ptr<watch_promises>>(proms_in));
346  proms->second.set_value(event(event_from_raw(type_in), state_from_raw(state_in)));
347  };
348 
349  return with_str(path, [&] (ptr<const char> path)
350  {
351  auto ppromises = std::make_unique<watch_promises>();
352  auto rc = error_code_from_raw(::zoo_awget_children2(_handle,
353  path,
354  watch_callback,
355  ppromises.get(),
356  data_callback,
357  ppromises.get()
358  )
359  );
360  if (rc == error_code::ok)
361  {
362  auto f = ppromises->first.get_future();
363  ppromises.release();
364  return f;
365  }
366  else
367  {
368  ppromises->first.set_exception(get_exception_ptr_of(rc));
369  return ppromises->first.get_future();
370  }
371  });
372 }
373 
374 future<exists_result> connection_zk::exists(string_view path)
375 {
376  ::stat_completion_t callback =
377  [] (int rc_in, ptr<const struct Stat> stat_in, ptr<const void> prom_in)
378  {
379  std::unique_ptr<promise<exists_result>> prom((ptr<promise<exists_result>>) prom_in);
380  auto rc = error_code_from_raw(rc_in);
381  if (rc == error_code::ok)
382  prom->set_value(exists_result(stat_from_raw(*stat_in)));
383  else if (rc == error_code::no_node)
384  prom->set_value(exists_result(nullopt));
385  else
386  prom->set_exception(get_exception_ptr_of(rc));
387  };
388 
389  return with_str(path, [&] (ptr<const char> path)
390  {
391  auto ppromise = std::make_unique<promise<exists_result>>();
392  auto rc = error_code_from_raw(::zoo_aexists(_handle, path, 0, callback, ppromise.get()));
393  if (rc == error_code::ok)
394  {
395  auto f = ppromise->get_future();
396  ppromise.release();
397  return f;
398  }
399  else
400  {
401  ppromise->set_exception(get_exception_ptr_of(rc));
402  return ppromise->get_future();
403  }
404  });
405 }
406 
407 future<watch_exists_result> connection_zk::watch_exists(string_view path)
408 {
409  using watch_promises = std::pair<std::promise<watch_exists_result>, std::promise<event>>;
410 
411  ::stat_completion_t data_callback =
412  [] (int rc_in, ptr<const struct Stat> stat_in, ptr<const void> proms_in)
413  {
414  std::unique_ptr<watch_promises> proms((ptr<watch_promises>) proms_in);
415  auto rc = error_code_from_raw(rc_in);
416  if (rc == error_code::ok)
417  {
418  proms->first.set_value(watch_exists_result(exists_result(stat_from_raw(*stat_in)),
419  proms->second.get_future()
420  )
421  );
422  proms.release();
423  }
424  else if (rc == error_code::no_node)
425  {
426  proms->first.set_value(watch_exists_result(exists_result(nullopt),
427  proms->second.get_future()
428  )
429  );
430  proms.release();
431  }
432  else
433  {
434  proms->first.set_exception(get_exception_ptr_of(rc));
435  }
436  };
437 
438  ::watcher_fn watch_callback =
439  [] (ptr<zhandle_t>, int type_in, int state_in, ptr<const char>, ptr<void> proms_in)
440  {
441  std::unique_ptr<watch_promises> proms(static_cast<ptr<watch_promises>>(proms_in));
442  proms->second.set_value(event(event_from_raw(type_in), state_from_raw(state_in)));
443  };
444 
445  return with_str(path, [&] (ptr<const char> path)
446  {
447  auto ppromises = std::make_unique<watch_promises>();
448  auto rc = error_code_from_raw(::zoo_awexists(_handle,
449  path,
450  watch_callback,
451  ppromises.get(),
452  data_callback,
453  ppromises.get()
454  )
455  );
456  if (rc == error_code::ok)
457  {
458  auto f = ppromises->first.get_future();
459  ppromises.release();
460  return f;
461  }
462  else
463  {
464  ppromises->first.set_exception(get_exception_ptr_of(rc));
465  return ppromises->first.get_future();
466  }
467  });
468 }
469 
470 future<create_result> connection_zk::create(string_view path,
471  const buffer& data,
472  const acl& rules,
473  create_mode mode
474  )
475 {
476  ::string_completion_t callback =
477  [] (int rc_in, ptr<const char> name_in, ptr<const void> prom_in)
478  {
479  std::unique_ptr<promise<create_result>> prom((ptr<promise<create_result>>) prom_in);
480  auto rc = error_code_from_raw(rc_in);
481  if (rc == error_code::ok)
482  prom->set_value(create_result(std::string(name_in)));
483  else
484  prom->set_exception(get_exception_ptr_of(rc));
485  };
486 
487  return with_str(path, [&] (ptr<const char> path)
488  {
489  auto ppromise = std::make_unique<promise<create_result>>();
490  auto rc = with_acl(rules, [&] (ptr<const ACL_vector> rules)
491  {
492  return error_code_from_raw(::zoo_acreate(_handle,
493  path,
494  data.data(),
495  int(data.size()),
496  rules,
497  static_cast<int>(mode),
498  callback,
499  ppromise.get()
500  )
501  );
502  });
503  if (rc == error_code::ok)
504  {
505  auto f = ppromise->get_future();
506  ppromise.release();
507  return f;
508  }
509  else
510  {
511  ppromise->set_exception(get_exception_ptr_of(rc));
512  return ppromise->get_future();
513  }
514  });
515 }
516 
517 future<set_result> connection_zk::set(string_view path, const buffer& data, version check)
518 {
519  ::stat_completion_t callback =
520  [] (int rc_in, ptr<const struct Stat> stat_raw, ptr<const void> prom_in)
521  {
522  std::unique_ptr<promise<set_result>> prom((ptr<promise<set_result>>) prom_in);
523  auto rc = error_code_from_raw(rc_in);
524  if (rc == error_code::ok)
525  prom->set_value(set_result(stat_from_raw(*stat_raw)));
526  else
527  prom->set_exception(get_exception_ptr_of(rc));
528  };
529 
530  return with_str(path, [&] (ptr<const char> path)
531  {
532  auto ppromise = std::make_unique<promise<set_result>>();
533  auto rc = error_code_from_raw(::zoo_aset(_handle,
534  path,
535  data.data(),
536  int(data.size()),
537  check.value,
538  callback,
539  ppromise.get()
540  ));
541  if (rc == error_code::ok)
542  {
543  auto f = ppromise->get_future();
544  ppromise.release();
545  return f;
546  }
547  else
548  {
549  ppromise->set_exception(get_exception_ptr_of(rc));
550  return ppromise->get_future();
551  }
552  });
553 }
554 
555 future<void> connection_zk::erase(string_view path, version check)
556 {
557  ::void_completion_t callback =
558  [] (int rc_in, ptr<const void> prom_in)
559  {
560  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
561  auto rc = error_code_from_raw(rc_in);
562  if (rc == error_code::ok)
563  prom->set_value();
564  else
565  prom->set_exception(get_exception_ptr_of(rc));
566  };
567 
568  return with_str(path, [&] (ptr<const char> path)
569  {
570  auto ppromise = std::make_unique<promise<void>>();
571  auto rc = error_code_from_raw(::zoo_adelete(_handle, path, check.value, callback, ppromise.get()));
572  if (rc == error_code::ok)
573  {
574  auto f = ppromise->get_future();
575  ppromise.release();
576  return f;
577  }
578  else
579  {
580  ppromise->set_exception(get_exception_ptr_of(rc));
581  return ppromise->get_future();
582  }
583  });
584 }
585 
586 future<get_acl_result> connection_zk::get_acl(string_view path) const
587 {
588  ::acl_completion_t callback =
589  [] (int rc_in, ptr<struct ACL_vector> acl_raw, ptr<struct Stat> stat_raw, ptr<const void> prom_in) noexcept
590  {
591  std::unique_ptr<promise<get_acl_result>> prom((ptr<promise<get_acl_result>>) prom_in);
592  auto rc = error_code_from_raw(rc_in);
593  if (rc == error_code::ok)
594  prom->set_value(get_acl_result(acl_from_raw(*acl_raw), stat_from_raw(*stat_raw)));
595  else
596  prom->set_exception(get_exception_ptr_of(rc));
597  };
598 
599  return with_str(path, [&] (ptr<const char> path)
600  {
601  auto ppromise = std::make_unique<promise<get_acl_result>>();
602  auto rc = error_code_from_raw(::zoo_aget_acl(_handle, path, callback, ppromise.get()));
603  if (rc == error_code::ok)
604  {
605  auto f = ppromise->get_future();
606  ppromise.release();
607  return f;
608  }
609  else
610  {
611  ppromise->set_exception(get_exception_ptr_of(rc));
612  return ppromise->get_future();
613  }
614  });
615 }
616 
617 future<void> connection_zk::set_acl(string_view path, const acl& rules, acl_version check)
618 {
619  ::void_completion_t callback =
620  [] (int rc_in, ptr<const void> prom_in)
621  {
622  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
623  auto rc = error_code_from_raw(rc_in);
624  if (rc == error_code::ok)
625  prom->set_value();
626  else
627  prom->set_exception(get_exception_ptr_of(rc));
628  };
629 
630  return with_str(path, [&] (ptr<const char> path)
631  {
632  return with_acl(rules, [&] (ptr<struct ACL_vector> rules)
633  {
634  auto ppromise = std::make_unique<promise<void>>();
635  auto rc = error_code_from_raw(::zoo_aset_acl(_handle,
636  path,
637  check.value,
638  rules,
639  callback,
640  ppromise.get()
641  )
642  );
643  if (rc == error_code::ok)
644  {
645  auto f = ppromise->get_future();
646  ppromise.release();
647  return f;
648  }
649  else
650  {
651  ppromise->set_exception(get_exception_ptr_of(rc));
652  return ppromise->get_future();
653  }
654  });
655  });
656 }
657 
659 {
660  multi_op source_txn;
661  promise<multi_result> prom;
662  std::vector<zoo_op_result_t> raw_results;
663  std::map<std::size_t, Stat> raw_stats;
664  std::map<std::size_t, std::vector<char>> path_buffers;
665 
666  explicit connection_zk_commit_completer(multi_op&& src) :
667  source_txn(std::move(src)),
668  raw_results(source_txn.size())
669  {
670  for (zoo_op_result_t& x : raw_results)
671  x.err = -42;
672  }
673 
674  ptr<Stat> raw_stat_at(std::size_t idx)
675  {
676  return &raw_stats[idx];
677  }
678 
679  ptr<std::vector<char>> path_buffer_for(std::size_t idx, const std::string& path, create_mode mode)
680  {
681  // If the creation is sequential, append 12 extra characters to store the digits
682  auto sz = path.size() + (is_set(mode, create_mode::sequential) ? 12 : 1);
683  path_buffers[idx] = std::vector<char>(sz);
684  return &path_buffers[idx];
685  }
686 
687  void deliver(error_code rc)
688  {
689  try
690  {
691  if (rc == error_code::ok)
692  {
693  multi_result out;
694  out.reserve(raw_results.size());
695  for (std::size_t idx = 0; idx < source_txn.size(); ++idx)
696  {
697  const auto& raw_res = raw_results[idx];
698 
699  switch (source_txn[idx].type())
700  {
701  case op_type::create:
702  out.emplace_back(create_result(std::string(raw_res.value)));
703  break;
704  case op_type::set:
705  out.emplace_back(set_result(stat_from_raw(*raw_res.stat)));
706  break;
707  default:
708  out.emplace_back(source_txn[idx].type(), nullptr);
709  break;
710  }
711  }
712 
713  prom.set_value(std::move(out));
714  }
715  else if (is_api_error(rc))
716  {
717  // All results until the failure are 0, equal to rc where we care, and runtime_inconsistency after that.
718  auto iter = std::partition_point(raw_results.begin(), raw_results.end(),
719  [] (auto res) { return res.err == 0; }
720  );
721  throw transaction_failed(rc, std::size_t(std::distance(raw_results.begin(), iter)));
722  }
723  else
724  {
725  throw_error(rc);
726  }
727  }
728  catch (...)
729  {
730  prom.set_exception(std::current_exception());
731  }
732  }
733 };
734 
735 future<multi_result> connection_zk::commit(multi_op&& txn_in)
736 {
737  ::void_completion_t callback =
738  [] (int rc_in, ptr<const void> completer_in)
739  {
740  std::unique_ptr<connection_zk_commit_completer>
741  completer((ptr<connection_zk_commit_completer>) completer_in);
742  completer->deliver(error_code_from_raw(rc_in));
743  };
744 
745  auto pcompleter = std::make_unique<connection_zk_commit_completer>(std::move(txn_in));
746  auto& txn = pcompleter->source_txn;
747  try
748  {
749  ::zoo_op raw_ops[txn.size()];
750  std::size_t create_op_count = 0;
751  std::size_t acl_piece_count = 0;
752  for (const auto& tx : txn)
753  {
754  if (tx.type() == op_type::create)
755  {
756  ++create_op_count;
757  acl_piece_count += tx.as_create().rules.size();
758  }
759  }
760  ACL_vector encoded_acls[create_op_count];
761  ACL acl_pieces[acl_piece_count];
762  ptr<ACL_vector> encoded_acl_iter = encoded_acls;
763  ptr<ACL> acl_piece_iter = acl_pieces;
764 
765  for (std::size_t idx = 0; idx < txn.size(); ++idx)
766  {
767  auto& raw_op = raw_ops[idx];
768  auto& src_op = txn[idx];
769  switch (src_op.type())
770  {
771  case op_type::check:
772  zoo_check_op_init(&raw_op, src_op.as_check().path.c_str(), src_op.as_check().check.value);
773  break;
774  case op_type::create:
775  {
776  const auto& cdata = src_op.as_create();
777  encoded_acl_iter->count = int(cdata.rules.size());
778  encoded_acl_iter->data = acl_piece_iter;
779  for (const auto& acl : cdata.rules)
780  {
781  *acl_piece_iter = encode_acl_part(acl);
782  ++acl_piece_iter;
783  }
784 
785  auto path_buf_ref = pcompleter->path_buffer_for(idx, cdata.path, cdata.mode);
786  zoo_create_op_init(&raw_op,
787  cdata.path.c_str(),
788  cdata.data.data(),
789  int(cdata.data.size()),
790  encoded_acl_iter,
791  static_cast<int>(cdata.mode),
792  path_buf_ref->data(),
793  int(path_buf_ref->size())
794  );
795  ++encoded_acl_iter;
796  break;
797  }
798  case op_type::erase:
799  zoo_delete_op_init(&raw_op, src_op.as_erase().path.c_str(), src_op.as_erase().check.value);
800  break;
801  case op_type::set:
802  {
803  const auto& setting = src_op.as_set();
804  zoo_set_op_init(&raw_op,
805  setting.path.c_str(),
806  setting.data.data(),
807  int(setting.data.size()),
808  setting.check.value,
809  pcompleter->raw_stat_at(idx)
810  );
811  break;
812  }
813  default:
814  {
815  using std::to_string;
816  throw std::invalid_argument("Invalid op_type at index=" + to_string(idx) + ": "
817  + to_string(src_op.type())
818  );
819  }
820  }
821  }
822  auto rc = error_code_from_raw(::zoo_amulti(_handle,
823  int(txn.size()),
824  raw_ops,
825  pcompleter->raw_results.data(),
826  callback,
827  pcompleter.get()
828  )
829  );
830  if (rc == error_code::ok)
831  {
832  auto f = pcompleter->prom.get_future();
833  pcompleter.release();
834  return f;
835  }
836  else
837  {
838  pcompleter->prom.set_exception(get_exception_ptr_of(rc));
839  return pcompleter->prom.get_future();
840  }
841  }
842  catch (...)
843  {
844  pcompleter->prom.set_exception(std::current_exception());
845  return pcompleter->prom.get_future();
846  }
847 }
848 
849 future<void> connection_zk::load_fence()
850 {
851  ::string_completion_t callback =
852  [] (int rc_in, ptr<const char>, ptr<const void> prom_in)
853  {
854  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
855  auto rc = error_code_from_raw(rc_in);
856  if (rc == error_code::ok)
857  prom->set_value();
858  else
859  prom->set_exception(get_exception_ptr_of(rc));
860  };
861 
862  auto ppromise = std::make_unique<std::promise<void>>();
863  auto rc = error_code_from_raw(::zoo_async(_handle, "/", callback, ppromise.get()));
864  if (rc == error_code::ok)
865  {
866  auto f = ppromise->get_future();
867  ppromise.release();
868  return f;
869  }
870  else
871  {
872  ppromise->set_exception(get_exception_ptr_of(rc));
873  return ppromise->get_future();
874  }
875 }
876 
877 void connection_zk::on_session_event_raw(ptr<zhandle_t> handle [[gnu::unused]],
878  int ev_type,
879  int state,
880  ptr<const char> path_ptr,
881  ptr<void> watcher_ctx
882  ) noexcept
883 {
884  auto self = static_cast<ptr<connection_zk>>(watcher_ctx);
885  // Most of the time, self's _handle will be the same thing that ZK provides to us. However, if we connect very
886  // quickly, a session event will happen trigger *before* we set the _handle. This isn't a problem, just something to
887  // be aware of.
888  assert(self->_handle == nullptr || self->_handle == handle);
889  auto ev = event_from_raw(ev_type);
890  auto st = state_from_raw(state);
891  auto path = string_view(path_ptr);
892  if (ev != event_type::session)
893  {
894  // TODO: Remove this usage of std::cerr
895  std::cerr << "WARNING: Got unexpected event " << ev << " in state=" << st << " with path=" << path << std::endl;
896  return;
897  }
898  self->on_session_event(st);
899 }
900 
901 }
T * ptr
A simple, unowned pointer.
Definition: config.hpp:75
This value is issued as part of an event when the state changes.
Describes the various result types of client operations.
constexpr bool is_set(create_mode self, create_mode flags)
Check that self has flags set.
Definition: types.hpp:288
The client is not connected to any server in the ensemble.
state
Enumeration of states the client may be at when a watch triggers.
Definition: types.hpp:320
Definition: acl.cpp:8
The result type of client::create.
Definition: results.hpp:101
Thrown from client::commit when a transaction cannot be committed to the system.
Definition: error.hpp:370
create_mode
When used in client::set, this value determines how the znode is created on the server.
Definition: types.hpp:252
The result type of client::set.
Definition: results.hpp:123
event_type
Enumeration of types of events that may occur.
Definition: types.hpp:298
An access control list is a wrapper around acl_rule instances.
Definition: acl.hpp:124
The name of the znode will be appended with a monotonically increasing number.
ZKPP_BUFFER_TYPE buffer
The buffer type.
Definition: buffer.hpp:67
zk::acl_version acl_version
The number of changes to the ACL of the znode.
Definition: types.hpp:227