zookeeper-cpp
ZooKeeper Client for C++
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Groups
connection_zk.cpp
1 #include "connection_zk.hpp"
2 
3 #include <algorithm>
4 #include <cassert>
5 #include <cerrno>
6 #include <cstring>
7 #include <iostream>
8 #include <map>
9 #include <memory>
10 #include <sstream>
11 #include <stdexcept>
12 #include <string>
13 #include <system_error>
14 #include <tuple>
15 
16 #include <zookeeper/zookeeper.h>
17 
18 #include "acl.hpp"
19 #include "error.hpp"
20 #include "multi.hpp"
21 #include "results.hpp"
22 #include "types.hpp"
23 
24 namespace zk
25 {
26 
28 // Utility Functions //
30 
31 template <typename FAction>
32 auto with_str(string_view src, FAction&& action) noexcept(noexcept(std::forward<FAction>(action)(ptr<const char>())))
33  -> decltype(std::forward<FAction>(action)(ptr<const char>()))
34 {
35  char buffer[src.size() + 1];
36  buffer[src.size()] = '\0';
37  std::memcpy(buffer, src.data(), src.size());
38  return std::forward<FAction>(action)(buffer);
39 }
40 
41 static ACL encode_acl_part(const acl_rule& src)
42 {
43  ACL out;
44  out.perms = static_cast<int>(src.permissions());
45  out.id.scheme = const_cast<ptr<char>>(src.scheme().c_str());
46  out.id.id = const_cast<ptr<char>>(src.id().c_str());
47  return out;
48 }
49 
50 template <typename FAction>
51 auto with_acl(const acl& rules, FAction&& action) noexcept(noexcept(std::forward<FAction>(action)(ptr<ACL_vector>())))
52  -> decltype(std::forward<FAction>(action)(ptr<ACL_vector>()))
53 {
54  ACL parts[rules.size()];
55  for (std::size_t idx = 0; idx < rules.size(); ++idx)
56  parts[idx] = encode_acl_part(rules[idx]);
57 
58  ACL_vector vec;
59  vec.count = int(rules.size());
60  vec.data = parts;
61  return std::forward<FAction>(action)(&vec);
62 }
63 
65 // Native Adaptors //
67 
68 static error_code error_code_from_raw(int raw)
69 {
70  switch (raw)
71  {
72  case ZOPERATIONTIMEOUT:
73  raw = ZCONNECTIONLOSS;
74  break;
75  case ZINVALIDCALLBACK:
76  case ZINVALIDACL:
77  raw = ZBADARGUMENTS;
78  break;
79  case ZSESSIONMOVED:
80  raw = ZCONNECTIONLOSS;
81  break;
82  default:
83  break;
84  }
85  return static_cast<error_code>(raw);
86 }
87 
88 static event_type event_from_raw(int raw)
89 {
90  return static_cast<event_type>(raw);
91 }
92 
93 // ZooKeeper does not have this concept pre-3.5
94 #if ZOO_MAJOR_VERSION <= 3 && ZOO_MINOR_VERSION <= 4
95 static const int ZOO_NOTCONNECTED_STATE = 999;
96 #endif
97 
98 static state state_from_raw(int raw)
99 {
100  // The C client will put us into `ZOO_NOTCONNECTED_STATE` for two reasons:
101  //
102  // 1. This is the state of the initial connection (zookeeper_init_internal), which is then replaced when the adaptor
103  // threads first call the interest function.
104  // 2. During a reconfiguration, the client disconnects and transitions to this state (update_addrs), which is then
105  // updated the next time the I/O thread touches interest.
106  //
107  // In both cases, the state is still "connecting" from the point of view of a client.
108  if (raw == ZOO_NOTCONNECTED_STATE)
109  {
110  raw = ZOO_CONNECTING_STATE;
111  }
112  // `ZOO_ASSOCIATING_STATE` means we have connected to a server, but have not yet authenticated and created the
113  // session. We still can't perform any operations, so treat it as connecting -- the client does not care about the
114  // difference between establishing a TCP connection and negotiating credentials.
115  else if (raw == ZOO_ASSOCIATING_STATE)
116  {
117  raw = ZOO_CONNECTING_STATE;
118  }
119 
120  return static_cast<state>(raw);
121 }
122 
123 static stat stat_from_raw(const struct Stat& raw)
124 {
125  stat out;
126  out.acl_version = acl_version(raw.aversion);
127  out.child_modified_transaction = transaction_id(raw.pzxid);
128  out.child_version = child_version(raw.cversion);
129  out.children_count = raw.numChildren;
130  out.create_time = stat::time_point() + std::chrono::milliseconds(raw.ctime);
131  out.create_transaction = transaction_id(raw.czxid);
132  out.data_size = raw.dataLength;
133  out.data_version = version(raw.version);
134  out.ephemeral_owner = raw.ephemeralOwner;
135  out.modified_time = stat::time_point() + std::chrono::milliseconds(raw.mtime);
136  out.modified_transaction = transaction_id(raw.mzxid);
137  return out;
138 }
139 
140 static std::vector<std::string> string_vector_from_raw(const struct String_vector& raw)
141 {
142  std::vector<std::string> out;
143  out.reserve(raw.count);
144  for (std::int32_t idx = 0; idx < raw.count; ++idx)
145  out.emplace_back(raw.data[idx]);
146  return out;
147 }
148 
149 static acl acl_from_raw(const struct ACL_vector& raw)
150 {
151  auto sz = std::size_t(raw.count);
152 
153  acl out;
154  out.reserve(sz);
155  for (std::size_t idx = 0; idx < sz; ++idx)
156  {
157  const auto& item = raw.data[idx];
158  out.emplace_back(item.id.scheme, item.id.id, static_cast<permission>(item.perms));
159  }
160  return out;
161 }
162 
164 // connection_zk //
166 
167 connection_zk::connection_zk(const connection_params& params) :
168  _handle(nullptr)
169 {
170  if (params.connection_schema() != "zk")
171  throw std::invalid_argument(std::string("Invalid connection string \"") + to_string(params) + "\"");
172 
173  auto conn_string = [&] ()
174  {
175  std::ostringstream os;
176  bool first = true;
177  for (const auto& host : params.hosts())
178  {
179  if (first)
180  first = false;
181  else
182  os << ',';
183 
184  os << host;
185  }
186  return os.str();
187  }();
188 
189  _handle = ::zookeeper_init(conn_string.c_str(),
190  on_session_event_raw,
191  static_cast<int>(params.timeout().count()),
192  nullptr,
193  this,
194  0
195  );
196 
197  if (!_handle)
198  std::system_error(errno, std::system_category(), "Failed to create ZooKeeper client");
199 }
200 
201 connection_zk::~connection_zk() noexcept
202 {
203  close();
204 }
205 
207 {
208 public:
209  watcher() :
210  _event_delivered(false)
211  { }
212 
213  virtual ~watcher() noexcept {}
214 
215  virtual void deliver_event(event ev)
216  {
217  if (!_event_delivered.exchange(true, std::memory_order_relaxed))
218  {
219  _event_promise.set_value(std::move(ev));
220  }
221  }
222 
223  future<event> get_event_future()
224  {
225  return _event_promise.get_future();
226  }
227 
228 protected:
229  std::atomic<bool> _event_delivered;
230  promise<event> _event_promise;
231 };
232 
233 template <typename TResult>
236 {
237 public:
238  basic_watcher() :
239  _data_delivered(false)
240  { }
241 
242  future<TResult> get_data_future()
243  {
244  return _data_promise.get_future();
245  }
246 
247  virtual void deliver_event(event ev) override
248  {
249  if (!_data_delivered.load(std::memory_order_relaxed))
250  {
251  deliver_data(nullopt, get_exception_ptr_of(error_code::closed));
252  }
253 
254  watcher::deliver_event(std::move(ev));
255  }
256 
257  void deliver_data(optional<TResult> data, std::exception_ptr ex_ptr)
258  {
259  if (!_data_delivered.exchange(true, std::memory_order_relaxed))
260  {
261  if (ex_ptr)
262  {
263  _data_promise.set_exception(std::move(ex_ptr));
264  }
265  else
266  {
267  _data_promise.set_value(std::move(*data));
268  }
269  }
270  }
271 
272 private:
273  std::atomic<bool> _data_delivered;
274  promise<TResult> _data_promise;
275 };
276 
277 std::shared_ptr<connection_zk::watcher> connection_zk::try_extract_watch(ptr<const void> addr)
278 {
279  std::unique_lock<std::mutex> ax(_watches_protect);
280  auto iter = _watches.find(addr);
281  if (iter != _watches.end())
282  return _watches.extract(iter).mapped();
283  else
284  return nullptr;
285 }
286 
287 static ptr<connection_zk> connection_from_context(ptr<zhandle_t> zh)
288 {
289  return (ptr<connection_zk>) zoo_get_context(zh);
290 }
291 
292 void connection_zk::deliver_watch(ptr<zhandle_t> zh,
293  int type_in,
294  int state_in,
295  ptr<const char> path [[gnu::unused]],
296  ptr<void> proms_in
297  )
298 {
299  auto& self = *connection_from_context(zh);
300  if (auto watcher = self.try_extract_watch(proms_in))
301  watcher->deliver_event(event(event_from_raw(type_in), state_from_raw(state_in)));
302 }
303 
304 void connection_zk::close()
305 {
306  if (_handle)
307  {
308  auto err = error_code_from_raw(::zookeeper_close(_handle));
309  if (err != error_code::ok)
310  throw_error(err);
311 
312  _handle = nullptr;
313 
314  // Deliver a session event as if there was a close.
315  std::unique_lock<std::mutex> ax(_watches_protect);
316  auto l_watches = std::move(_watches);
317  ax.unlock();
318  for (const auto& pair : l_watches)
319  pair.second->deliver_event(event(event_type::session, zk::state::closed));
320  }
321 }
322 
323 zk::state connection_zk::state() const
324 {
325  if (_handle)
326  return state_from_raw(::zoo_state(_handle));
327  else
328  return zk::state::closed;
329 }
330 
331 future<get_result> connection_zk::get(string_view path)
332 {
333  ::data_completion_t callback =
334  [] (int rc_in, ptr<const char> data, int data_sz, ptr<const struct Stat> pstat, ptr<const void> prom_in) noexcept
335  {
336  std::unique_ptr<promise<get_result>> prom((ptr<promise<get_result>>) prom_in);
337  auto rc = error_code_from_raw(rc_in);
338  if (rc == error_code::ok)
339  prom->set_value(get_result(buffer(data, data + data_sz), stat_from_raw(*pstat)));
340  else
341  prom->set_exception(get_exception_ptr_of(rc));
342  };
343 
344  return with_str(path, [&] (ptr<const char> path) noexcept
345  {
346  auto ppromise = std::make_unique<promise<get_result>>();
347  auto fut = ppromise->get_future();
348  auto rc = error_code_from_raw(::zoo_aget(_handle, path, 0, callback, ppromise.get()));
349  if (rc == error_code::ok)
350  {
351  ppromise.release();
352  return fut;
353  }
354  else
355  {
356  ppromise->set_exception(get_exception_ptr_of(rc));
357  return fut;
358  }
359  });
360 }
361 
363  public connection_zk::basic_watcher<watch_result>
364 {
365 public:
366  static void deliver_raw(int rc_in,
367  ptr<const char> data,
368  int data_sz,
370  ptr<const void> self_in
371  ) noexcept
372  {
373  auto& self = *static_cast<ptr<data_watcher>>(const_cast<ptr<void>>(self_in));
374  auto rc = error_code_from_raw(rc_in);
375 
376  if (rc == error_code::ok)
377  {
378  self.deliver_data(watch_result(get_result(buffer(data, data + data_sz), stat_from_raw(*pstat)),
379  self.get_event_future()
380  ),
381  std::exception_ptr()
382  );
383  }
384  else
385  {
386  self.deliver_data(nullopt, get_exception_ptr_of(rc));
387  }
388  }
389 };
390 
391 future<watch_result> connection_zk::watch(string_view path)
392 {
393  return with_str(path, [&] (ptr<const char> path) noexcept
394  {
395  std::unique_lock<std::mutex> ax(_watches_protect);
396  auto watcher = std::make_shared<data_watcher>();
397  auto rc = error_code_from_raw(::zoo_awget(_handle,
398  path,
399  deliver_watch,
400  watcher.get(),
401  data_watcher::deliver_raw,
402  watcher.get()
403  )
404  );
405  if (rc == error_code::ok)
406  _watches.emplace(watcher.get(), watcher);
407  else
408  watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
409 
410  return watcher->get_data_future();
411  });
412 }
413 
414 future<get_children_result> connection_zk::get_children(string_view path)
415 {
416  ::strings_stat_completion_t callback =
417  [] (int rc_in,
418  ptr<const struct String_vector> strings_in,
419  ptr<const struct Stat> stat_in,
420  ptr<const void> prom_in
421  )
422  {
423  std::unique_ptr<promise<get_children_result>> prom((ptr<promise<get_children_result>>) prom_in);
424  auto rc = error_code_from_raw(rc_in);
425  try
426  {
427  if (rc != error_code::ok)
428  throw_error(rc);
429 
430  prom->set_value(get_children_result(string_vector_from_raw(*strings_in), stat_from_raw(*stat_in)));
431  }
432  catch (...)
433  {
434  prom->set_exception(std::current_exception());
435  }
436  };
437 
438  return with_str(path, [&] (ptr<const char> path) noexcept
439  {
440  auto ppromise = std::make_unique<promise<get_children_result>>();
441  auto fut = ppromise->get_future();
442  auto rc = error_code_from_raw(::zoo_aget_children2(_handle,
443  path,
444  0,
445  callback,
446  ppromise.get()
447  )
448  );
449  if (rc == error_code::ok)
450  {
451  ppromise.release();
452  return fut;
453  }
454  else
455  {
456  ppromise->set_exception(get_exception_ptr_of(rc));
457  return fut;
458  }
459  });
460 }
461 
463  public connection_zk::basic_watcher<watch_children_result>
464 {
465 public:
466  static void deliver_raw(int rc_in,
468  ptr<const struct Stat> stat_in,
469  ptr<const void> prom_in
470  ) noexcept
471  {
472  auto& self = *static_cast<ptr<child_watcher>>(const_cast<ptr<void>>(prom_in));
473  auto rc = error_code_from_raw(rc_in);
474 
475  try
476  {
477  if (rc != error_code::ok)
478  throw_error(rc);
479 
480  self.deliver_data(watch_children_result(get_children_result(string_vector_from_raw(*strings_in),
481  stat_from_raw(*stat_in)
482  ),
483  self.get_event_future()
484  ),
485  std::exception_ptr()
486  );
487  }
488  catch (...)
489  {
490  self.deliver_data(nullopt, std::current_exception());
491  }
492 
493  }
494 };
495 
496 future<watch_children_result> connection_zk::watch_children(string_view path)
497 {
498  return with_str(path, [&] (ptr<const char> path) noexcept
499  {
500  std::unique_lock<std::mutex> ax(_watches_protect);
501  auto watcher = std::make_shared<child_watcher>();
502  auto rc = error_code_from_raw(::zoo_awget_children2(_handle,
503  path,
504  deliver_watch,
505  watcher.get(),
506  child_watcher::deliver_raw,
507  watcher.get()
508  )
509  );
510  if (rc == error_code::ok)
511  _watches.emplace(watcher.get(), watcher);
512  else
513  watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
514 
515  return watcher->get_data_future();
516  });
517 }
518 
519 future<exists_result> connection_zk::exists(string_view path)
520 {
521  ::stat_completion_t callback =
522  [] (int rc_in, ptr<const struct Stat> stat_in, ptr<const void> prom_in)
523  {
524  std::unique_ptr<promise<exists_result>> prom((ptr<promise<exists_result>>) prom_in);
525  auto rc = error_code_from_raw(rc_in);
526  if (rc == error_code::ok)
527  prom->set_value(exists_result(stat_from_raw(*stat_in)));
528  else if (rc == error_code::no_entry)
529  prom->set_value(exists_result(nullopt));
530  else
531  prom->set_exception(get_exception_ptr_of(rc));
532  };
533 
534  return with_str(path, [&] (ptr<const char> path) noexcept
535  {
536  auto ppromise = std::make_unique<promise<exists_result>>();
537  auto fut = ppromise->get_future();
538  auto rc = error_code_from_raw(::zoo_aexists(_handle, path, 0, callback, ppromise.get()));
539  if (rc == error_code::ok)
540  {
541  ppromise.release();
542  return fut;
543  }
544  else
545  {
546  ppromise->set_exception(get_exception_ptr_of(rc));
547  return fut;
548  }
549  });
550 }
551 
553  public connection_zk::basic_watcher<watch_exists_result>
554 {
555 public:
556  static void deliver_raw(int rc_in, ptr<const struct Stat> stat_in, ptr<const void> self_in)
557  {
558  auto& self = *static_cast<ptr<exists_watcher>>(const_cast<ptr<void>>(self_in));
559  auto rc = error_code_from_raw(rc_in);
560 
561  if (rc == error_code::ok)
562  {
563  self.deliver_data(watch_exists_result(exists_result(stat_from_raw(*stat_in)), self.get_event_future()),
564  std::exception_ptr()
565  );
566  }
567  else if (rc == error_code::no_entry)
568  {
569  self.deliver_data(watch_exists_result(exists_result(nullopt), self.get_event_future()),
570  std::exception_ptr()
571  );
572  }
573  else
574  {
575  self.deliver_data(nullopt, get_exception_ptr_of(rc));
576  }
577  }
578 };
579 
580 future<watch_exists_result> connection_zk::watch_exists(string_view path)
581 {
582  return with_str(path, [&] (ptr<const char> path) noexcept
583  {
584  std::unique_lock<std::mutex> ax(_watches_protect);
585  auto watcher = std::make_shared<exists_watcher>();
586  auto rc = error_code_from_raw(::zoo_awexists(_handle,
587  path,
588  deliver_watch,
589  watcher.get(),
590  exists_watcher::deliver_raw,
591  watcher.get()
592  )
593  );
594  if (rc == error_code::ok)
595  _watches.emplace(watcher.get(), watcher);
596  else
597  watcher->deliver_data(nullopt, get_exception_ptr_of(rc));
598 
599  return watcher->get_data_future();
600  });
601 }
602 
603 future<create_result> connection_zk::create(string_view path,
604  const buffer& data,
605  const acl& rules,
606  create_mode mode
607  )
608 {
609  ::string_completion_t callback =
610  [] (int rc_in, ptr<const char> name_in, ptr<const void> prom_in)
611  {
612  std::unique_ptr<promise<create_result>> prom((ptr<promise<create_result>>) prom_in);
613  auto rc = error_code_from_raw(rc_in);
614  if (rc == error_code::ok)
615  prom->set_value(create_result(std::string(name_in)));
616  else
617  prom->set_exception(get_exception_ptr_of(rc));
618  };
619 
620  return with_str(path, [&] (ptr<const char> path) noexcept
621  {
622  auto ppromise = std::make_unique<promise<create_result>>();
623  auto fut = ppromise->get_future();
624  auto rc = with_acl(rules, [&] (ptr<const ACL_vector> rules) noexcept
625  {
626  return error_code_from_raw(::zoo_acreate(_handle,
627  path,
628  data.data(),
629  int(data.size()),
630  rules,
631  static_cast<int>(mode),
632  callback,
633  ppromise.get()
634  )
635  );
636  });
637 
638  if (rc == error_code::ok)
639  {
640  ppromise.release();
641  return fut;
642  }
643  else
644  {
645  ppromise->set_exception(get_exception_ptr_of(rc));
646  return fut;
647  }
648  });
649 }
650 
651 future<set_result> connection_zk::set(string_view path, const buffer& data, version check)
652 {
653  ::stat_completion_t callback =
654  [] (int rc_in, ptr<const struct Stat> stat_raw, ptr<const void> prom_in)
655  {
656  std::unique_ptr<promise<set_result>> prom((ptr<promise<set_result>>) prom_in);
657  auto rc = error_code_from_raw(rc_in);
658  if (rc == error_code::ok)
659  prom->set_value(set_result(stat_from_raw(*stat_raw)));
660  else
661  prom->set_exception(get_exception_ptr_of(rc));
662  };
663 
664  return with_str(path, [&] (ptr<const char> path) noexcept
665  {
666  auto ppromise = std::make_unique<promise<set_result>>();
667  auto fut = ppromise->get_future();
668  auto rc = error_code_from_raw(::zoo_aset(_handle,
669  path,
670  data.data(),
671  int(data.size()),
672  check.value,
673  callback,
674  ppromise.get()
675  ));
676 
677  if (rc == error_code::ok)
678  {
679  ppromise.release();
680  return fut;
681  }
682  else
683  {
684  ppromise->set_exception(get_exception_ptr_of(rc));
685  return fut;
686  }
687  });
688 }
689 
690 future<void> connection_zk::erase(string_view path, version check)
691 {
692  ::void_completion_t callback =
693  [] (int rc_in, ptr<const void> prom_in)
694  {
695  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
696  auto rc = error_code_from_raw(rc_in);
697  if (rc == error_code::ok)
698  prom->set_value();
699  else
700  prom->set_exception(get_exception_ptr_of(rc));
701  };
702 
703  return with_str(path, [&] (ptr<const char> path) noexcept
704  {
705  auto ppromise = std::make_unique<promise<void>>();
706  auto fut = ppromise->get_future();
707  auto rc = error_code_from_raw(::zoo_adelete(_handle, path, check.value, callback, ppromise.get()));
708  if (rc == error_code::ok)
709  {
710  ppromise.release();
711  return fut;
712  }
713  else
714  {
715  ppromise->set_exception(get_exception_ptr_of(rc));
716  return fut;
717  }
718  });
719 }
720 
721 future<get_acl_result> connection_zk::get_acl(string_view path) const
722 {
723  ::acl_completion_t callback =
724  [] (int rc_in, ptr<struct ACL_vector> acl_raw, ptr<struct Stat> stat_raw, ptr<const void> prom_in) noexcept
725  {
726  std::unique_ptr<promise<get_acl_result>> prom((ptr<promise<get_acl_result>>) prom_in);
727  auto rc = error_code_from_raw(rc_in);
728  if (rc == error_code::ok)
729  prom->set_value(get_acl_result(acl_from_raw(*acl_raw), stat_from_raw(*stat_raw)));
730  else
731  prom->set_exception(get_exception_ptr_of(rc));
732  };
733 
734  return with_str(path, [&] (ptr<const char> path) noexcept
735  {
736  auto ppromise = std::make_unique<promise<get_acl_result>>();
737  auto fut = ppromise->get_future();
738  auto rc = error_code_from_raw(::zoo_aget_acl(_handle, path, callback, ppromise.get()));
739  if (rc == error_code::ok)
740  {
741  ppromise.release();
742  return fut;
743  }
744  else
745  {
746  ppromise->set_exception(get_exception_ptr_of(rc));
747  return fut;
748  }
749  });
750 }
751 
752 future<void> connection_zk::set_acl(string_view path, const acl& rules, acl_version check)
753 {
754  ::void_completion_t callback =
755  [] (int rc_in, ptr<const void> prom_in)
756  {
757  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
758  auto rc = error_code_from_raw(rc_in);
759  if (rc == error_code::ok)
760  prom->set_value();
761  else
762  prom->set_exception(get_exception_ptr_of(rc));
763  };
764 
765  return with_str(path, [&] (ptr<const char> path) noexcept
766  {
767  return with_acl(rules, [&] (ptr<struct ACL_vector> rules) noexcept
768  {
769  auto ppromise = std::make_unique<promise<void>>();
770  auto fut = ppromise->get_future();
771  auto rc = error_code_from_raw(::zoo_aset_acl(_handle,
772  path,
773  check.value,
774  rules,
775  callback,
776  ppromise.get()
777  )
778  );
779  if (rc == error_code::ok)
780  {
781  ppromise.release();
782  return fut;
783  }
784  else
785  {
786  ppromise->set_exception(get_exception_ptr_of(rc));
787  return fut;
788  }
789  });
790  });
791 }
792 
794 {
795  multi_op source_txn;
796  promise<multi_result> prom;
797  std::vector<zoo_op_result_t> raw_results;
798  std::map<std::size_t, Stat> raw_stats;
799  std::map<std::size_t, std::vector<char>> path_buffers;
800 
801  explicit connection_zk_commit_completer(multi_op&& src) :
802  source_txn(std::move(src)),
803  raw_results(source_txn.size())
804  {
805  for (zoo_op_result_t& x : raw_results)
806  x.err = -42;
807  }
808 
809  ptr<Stat> raw_stat_at(std::size_t idx)
810  {
811  return &raw_stats[idx];
812  }
813 
814  ptr<std::vector<char>> path_buffer_for(std::size_t idx, const std::string& path, create_mode mode)
815  {
816  // If the creation is sequential, append 12 extra characters to store the digits
817  auto sz = path.size() + (is_set(mode, create_mode::sequential) ? 12 : 1);
818  path_buffers[idx] = std::vector<char>(sz);
819  return &path_buffers[idx];
820  }
821 
822  void deliver(error_code rc)
823  {
824  try
825  {
826  if (rc == error_code::ok)
827  {
828  multi_result out;
829  out.reserve(raw_results.size());
830  for (std::size_t idx = 0; idx < source_txn.size(); ++idx)
831  {
832  const auto& raw_res = raw_results[idx];
833 
834  switch (source_txn[idx].type())
835  {
836  case op_type::create:
837  out.emplace_back(create_result(std::string(raw_res.value)));
838  break;
839  case op_type::set:
840  out.emplace_back(set_result(stat_from_raw(*raw_res.stat)));
841  break;
842  default:
843  out.emplace_back(source_txn[idx].type(), nullptr);
844  break;
845  }
846  }
847 
848  prom.set_value(std::move(out));
849  }
850  else
851  {
852  // All results until the failure are 0, equal to rc where we care, and runtime_inconsistency after that.
853  auto iter = std::partition_point(raw_results.begin(), raw_results.end(),
854  [] (auto res) { return res.err == 0; }
855  );
856  throw transaction_failed(rc, std::size_t(std::distance(raw_results.begin(), iter)));
857  }
858  }
859  catch (...)
860  {
861  prom.set_exception(std::current_exception());
862  }
863  }
864 };
865 
866 future<multi_result> connection_zk::commit(multi_op&& txn_in)
867 {
868  ::void_completion_t callback =
869  [] (int rc_in, ptr<const void> completer_in)
870  {
871  std::unique_ptr<connection_zk_commit_completer>
872  completer((ptr<connection_zk_commit_completer>) completer_in);
873  completer->deliver(error_code_from_raw(rc_in));
874  };
875 
876  auto pcompleter = std::make_unique<connection_zk_commit_completer>(std::move(txn_in));
877  auto& txn = pcompleter->source_txn;
878  try
879  {
880  ::zoo_op raw_ops[txn.size()];
881  std::size_t create_op_count = 0;
882  std::size_t acl_piece_count = 0;
883  for (const auto& tx : txn)
884  {
885  if (tx.type() == op_type::create)
886  {
887  ++create_op_count;
888  acl_piece_count += tx.as_create().rules.size();
889  }
890  }
891  ACL_vector encoded_acls[create_op_count];
892  ACL acl_pieces[acl_piece_count];
893  ptr<ACL_vector> encoded_acl_iter = encoded_acls;
894  ptr<ACL> acl_piece_iter = acl_pieces;
895 
896  for (std::size_t idx = 0; idx < txn.size(); ++idx)
897  {
898  auto& raw_op = raw_ops[idx];
899  auto& src_op = txn[idx];
900  switch (src_op.type())
901  {
902  case op_type::check:
903  zoo_check_op_init(&raw_op, src_op.as_check().path.c_str(), src_op.as_check().check.value);
904  break;
905  case op_type::create:
906  {
907  const auto& cdata = src_op.as_create();
908  encoded_acl_iter->count = int(cdata.rules.size());
909  encoded_acl_iter->data = acl_piece_iter;
910  for (const auto& acl : cdata.rules)
911  {
912  *acl_piece_iter = encode_acl_part(acl);
913  ++acl_piece_iter;
914  }
915 
916  auto path_buf_ref = pcompleter->path_buffer_for(idx, cdata.path, cdata.mode);
917  zoo_create_op_init(&raw_op,
918  cdata.path.c_str(),
919  cdata.data.data(),
920  int(cdata.data.size()),
921  encoded_acl_iter,
922  static_cast<int>(cdata.mode),
923  path_buf_ref->data(),
924  int(path_buf_ref->size())
925  );
926  ++encoded_acl_iter;
927  break;
928  }
929  case op_type::erase:
930  zoo_delete_op_init(&raw_op, src_op.as_erase().path.c_str(), src_op.as_erase().check.value);
931  break;
932  case op_type::set:
933  {
934  const auto& setting = src_op.as_set();
935  zoo_set_op_init(&raw_op,
936  setting.path.c_str(),
937  setting.data.data(),
938  int(setting.data.size()),
939  setting.check.value,
940  pcompleter->raw_stat_at(idx)
941  );
942  break;
943  }
944  default:
945  {
946  using std::to_string;
947  throw std::invalid_argument("Invalid op_type at index=" + to_string(idx) + ": "
948  + to_string(src_op.type())
949  );
950  }
951  }
952  }
953  auto fut = pcompleter->prom.get_future();
954  auto rc = error_code_from_raw(::zoo_amulti(_handle,
955  int(txn.size()),
956  raw_ops,
957  pcompleter->raw_results.data(),
958  callback,
959  pcompleter.get()
960  )
961  );
962  if (rc == error_code::ok)
963  {
964  pcompleter.release();
965  return fut;
966  }
967  else
968  {
969  pcompleter->prom.set_exception(get_exception_ptr_of(rc));
970  return pcompleter->prom.get_future();
971  }
972  }
973  catch (...)
974  {
975  pcompleter->prom.set_exception(std::current_exception());
976  return pcompleter->prom.get_future();
977  }
978 }
979 
980 future<void> connection_zk::load_fence()
981 {
982  ::string_completion_t callback =
983  [] (int rc_in, ptr<const char>, ptr<const void> prom_in)
984  {
985  std::unique_ptr<promise<void>> prom((ptr<promise<void>>) prom_in);
986  auto rc = error_code_from_raw(rc_in);
987  if (rc == error_code::ok)
988  prom->set_value();
989  else
990  prom->set_exception(get_exception_ptr_of(rc));
991  };
992 
993  auto ppromise = std::make_unique<std::promise<void>>();
994  auto rc = error_code_from_raw(::zoo_async(_handle, "/", callback, ppromise.get()));
995  if (rc == error_code::ok)
996  {
997  auto f = ppromise->get_future();
998  ppromise.release();
999  return f;
1000  }
1001  else
1002  {
1003  ppromise->set_exception(get_exception_ptr_of(rc));
1004  return ppromise->get_future();
1005  }
1006 }
1007 
1008 void connection_zk::on_session_event_raw(ptr<zhandle_t> handle [[gnu::unused]],
1009  int ev_type,
1010  int state,
1011  ptr<const char> path_ptr,
1012  ptr<void> watcher_ctx
1013  ) noexcept
1014 {
1015  auto self = static_cast<ptr<connection_zk>>(watcher_ctx);
1016  // Most of the time, self's _handle will be the same thing that ZK provides to us. However, if we connect very
1017  // quickly, a session event will happen trigger *before* we set the _handle. This isn't a problem, just something to
1018  // be aware of.
1019  assert(self->_handle == nullptr || self->_handle == handle);
1020  auto ev = event_from_raw(ev_type);
1021  auto st = state_from_raw(state);
1022  auto path = string_view(path_ptr);
1023 
1024  if (ev != event_type::session)
1025  {
1026  // TODO: Remove this usage of std::cerr
1027  std::cerr << "WARNING: Got unexpected event " << ev << " in state=" << st << " with path=" << path << std::endl;
1028  return;
1029  }
1030  self->on_session_event(st);
1031 }
1032 
1033 }
Data delivered when a watched event triggers.
Definition: results.hpp:202
T * ptr
A simple, unowned pointer.
Definition: config.hpp:37
This value is issued as part of an event when the state changes.
Describes the various result types of client operations.
Code for transaction_failed.
constexpr bool is_set(create_mode self, create_mode flags)
Check that self has flags set.
Definition: types.hpp:321
The client is not connected to any server in the ensemble.
state
Enumeration of states the client may be at when a watch triggers.
Definition: types.hpp:385
The result type of client::watch_exists.
Definition: results.hpp:290
size_type size() const
The number of operations in this transaction bundle.
Definition: multi.hpp:185
The result type of client::create.
Definition: results.hpp:116
The result type of client::exists.
Definition: results.hpp:88
std::exception_ptr get_exception_ptr_of(error_code code)
Get an std::exception_ptr containing an exception with the proper type for the given code...
Definition: error.cpp:58
The result type of client::watch.
Definition: results.hpp:224
create_mode
When used in client::set, this value determines how the entry is created on the server.
Definition: types.hpp:283
The result type of client::watch_children.
Definition: results.hpp:257
Never thrown.
void reserve(size_type capacity)
Increase the reserved memory block so it can store at least capacity results without reallocating...
Definition: multi.hpp:331
void throw_error(error_code code)
Throw an exception for the given code.
Definition: error.cpp:31
A collection of operations that will be performed atomically.
Definition: multi.hpp:162
Code for no_entry.
The result type of client::set.
Definition: results.hpp:141
event_type
Enumeration of types of events that may occur.
Definition: types.hpp:331
The name of the entry will be appended with a monotonically increasing number.
The result type of client::get_children.
Definition: results.hpp:55
The result of a successful client::commit operation.
Definition: multi.hpp:242
Code for closed.
ZKPP_BUFFER_TYPE buffer
The buffer type.
Definition: buffer.hpp:79
The result type of client::get.
Definition: results.hpp:24
void emplace_back(TArgs &&...args)
Construct a result emplace on the end of the list using args.
Definition: multi.hpp:337
zk::acl_version acl_version
The number of changes to the ACL of the entry.
Definition: types.hpp:256
error_code
Code for all error types thrown by the client library.
Definition: error.hpp:18