zookeeper-cpp
ZooKeeper Client for C++
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Groups
configuration.cpp
1 #include "configuration.hpp"
2 
3 #include <cctype>
4 #include <cstdlib>
5 #include <fstream>
6 #include <istream>
7 #include <ostream>
8 #include <regex>
9 #include <sstream>
10 #include <stdexcept>
11 
12 namespace zk::server
13 {
14 
16 {
17  if (0U < value && value < 256U)
18  return;
19 
20  std::ostringstream os;
21  os << "Server ID value " << value << " is not in the valid range [1 .. 255]";
22  throw std::out_of_range(os.str());
23 }
24 
25 std::ostream& operator<<(std::ostream& os, const server_id& self)
26 {
27  return os << self.value;
28 }
29 
30 namespace
31 {
32 
33 constexpr std::size_t not_a_line = ~0UL;
34 
35 class zero_copy_streambuf final :
36  public std::streambuf
37 {
38 public:
39  zero_copy_streambuf(string_view input)
40  {
41  // We are just going to read from it, so this const_cast is okay
42  ptr<char> p = const_cast<ptr<char>>(input.data());
43  setg(p, p, p + input.size());
44  }
45 };
46 
47 }
48 
49 const std::uint16_t configuration::default_client_port = std::uint16_t(2181);
50 const std::uint16_t configuration::default_peer_port = std::uint16_t(2888);
51 const std::uint16_t configuration::default_leader_port = std::uint16_t(3888);
52 
53 const configuration::duration_type configuration::default_tick_time = std::chrono::milliseconds(2000);
54 
55 const std::size_t configuration::default_init_limit = 10U;
56 const std::size_t configuration::default_sync_limit = 5U;
57 
58 const std::set<std::string> configuration::default_four_letter_word_whitelist = { "srvr" };
59 const std::set<std::string> configuration::all_four_letter_word_whitelist = { "*" };
60 
61 template <typename T>
62 configuration::setting<T>::setting() noexcept :
63  value(nullopt),
64  line(not_a_line)
65 { }
66 
67 template <typename T>
68 configuration::setting<T>::setting(T value, std::size_t line) noexcept :
69  value(std::move(value)),
70  line(line)
71 { }
72 
73 configuration::configuration() = default;
74 
75 configuration::~configuration() noexcept = default;
76 
77 configuration configuration::make_minimal(std::string data_directory, std::uint16_t client_port)
78 {
79  configuration out;
80  out.data_directory(std::move(data_directory))
81  .client_port(client_port)
82  .init_limit(default_init_limit)
83  .sync_limit(default_sync_limit)
84  ;
85  return out;
86 }
87 
88 static std::set<std::string> parse_whitelist(string_view source)
89 {
90  std::set<std::string> out;
91 
92  while (!source.empty())
93  {
94  auto idx = source.find_first_of(',');
95  if (idx == string_view::npos)
96  idx = source.size();
97 
98  if (idx == 0)
99  {
100  source.remove_prefix(1);
101  continue;
102  }
103 
104  auto sub = source.substr(0, idx);
105  source.remove_prefix(idx);
106  while (!sub.empty() && std::isspace(sub.front()))
107  sub.remove_prefix(1);
108 
109  while (!sub.empty() && std::isspace(sub.back()))
110  sub.remove_suffix(1);
111 
112  out.insert(std::string(sub));
113  }
114 
115  return out;
116 }
117 
118 configuration configuration::from_lines(std::vector<std::string> lines)
119 {
120  static const std::regex line_expr(R"(^([^=]+)=([^ #]+)[ #]*$)",
121  std::regex_constants::ECMAScript | std::regex_constants::optimize
122  );
123  constexpr auto name_idx = 1U;
124  constexpr auto data_idx = 2U;
125 
126  configuration out;
127 
128  for (std::size_t line_no = 0U; line_no < lines.size(); ++line_no)
129  {
130  const auto& line = lines[line_no];
131 
132  if (line.empty() || line[0] == '#')
133  continue;
134 
135  std::smatch match;
136  if (std::regex_match(line, match, line_expr))
137  {
138  auto name = match[name_idx].str();
139  auto data = match[data_idx].str();
140 
141  if (name == "clientPort")
142  {
143  out._client_port = { std::uint16_t(std::atoi(data.c_str())), line_no };
144  }
145  else if (name == "dataDir")
146  {
147  out._data_directory = { std::move(data), line_no };
148  }
149  else if (name == "tickTime")
150  {
151  out._tick_time = { std::chrono::milliseconds(std::atol(data.c_str())), line_no };
152  }
153  else if (name == "initLimit")
154  {
155  out._init_limit = { std::size_t(std::atol(data.c_str())), line_no };
156  }
157  else if (name == "syncLimit")
158  {
159  out._sync_limit = { std::size_t(std::atol(data.c_str())), line_no };
160  }
161  else if (name == "leaderServes")
162  {
163  out._leader_serves = { (data == "yes"), line_no };
164  }
165  else if (name == "4lw.commands.whitelist")
166  {
167  out._four_letter_word_whitelist = { parse_whitelist(data), line_no };
168  }
169  else if (name.find("server.") == 0U)
170  {
171  auto id = std::size_t(std::atol(name.c_str() + 7));
172  out._server_paths.insert({ server_id(id), { std::move(data), line_no } });
173  }
174  else
175  {
176  out._unknown_settings.insert({ std::move(name), { std::move(data), line_no } });
177  }
178  }
179  }
180 
181  out._lines = std::move(lines);
182  return out;
183 }
184 
186 {
187  std::vector<std::string> lines;
188  std::string line;
189 
190  while(std::getline(stream, line))
191  {
192  lines.emplace_back(std::move(line));
193  }
194 
195  if (stream.eof())
196  return from_lines(std::move(lines));
197  else
198  throw std::runtime_error("Loading configuration did not reach EOF");
199 }
200 
202 {
203  std::ifstream inf(filename.c_str());
204  auto out = from_stream(inf);
205  out._source_file = std::move(filename);
206  return out;
207 }
208 
210 {
211  zero_copy_streambuf buff(value);
212  std::istream stream(&buff);
213  return from_stream(stream);
214 }
215 
217 {
218  return _data_directory.value
219  && _client_port.value
220  && _init_limit.value && init_limit() == default_init_limit
221  && _sync_limit.value && sync_limit() == default_sync_limit
222  && _lines.size() == 4U;
223 }
224 
225 template <typename T, typename FEncode>
226 void configuration::set(setting<T>& target, optional<T> value, string_view key, const FEncode& encode)
227 {
228  std::string target_line;
229  if (value)
230  {
231  std::ostringstream os;
232  os << key << '=' << encode(*value);
233  target_line = os.str();
234  }
235 
236  if (target.line == not_a_line && value)
237  {
238  target.value = std::move(value);
239  target.line = _lines.size();
240  _lines.emplace_back(std::move(target_line));
241  }
242  else if (target.line == not_a_line && !value)
243  {
244  // do nothing -- no line means no value
245  }
246  else
247  {
248  target.value = std::move(value);
249  _lines[target.line] = std::move(target_line);
250  }
251 }
252 
253 template <typename T>
254 void configuration::set(setting<T>& target, optional<T> value, string_view key)
255 {
256  return set(target, std::move(value), key, [] (const T& x) -> const T& { return x; });
257 }
258 
259 std::uint16_t configuration::client_port() const
260 {
261  return _client_port.value.value_or(default_client_port);
262 }
263 
264 configuration& configuration::client_port(optional<std::uint16_t> port)
265 {
266  set(_client_port, port, "clientPort");
267  return *this;
268 }
269 
270 const optional<std::string>& configuration::data_directory() const
271 {
272  return _data_directory.value;
273 }
274 
275 configuration& configuration::data_directory(optional<std::string> path)
276 {
277  set(_data_directory, std::move(path), "dataDir");
278  return *this;
279 }
280 
281 configuration::duration_type configuration::tick_time() const
282 {
283  return _tick_time.value.value_or(default_tick_time);
284 }
285 
286 configuration& configuration::tick_time(optional<duration_type> tick_time)
287 {
288  set(_tick_time, tick_time, "tickTime", [] (duration_type x) { return x.count(); });
289  return *this;
290 }
291 
292 std::size_t configuration::init_limit() const
293 {
294  return _init_limit.value.value_or(default_init_limit);
295 }
296 
297 configuration& configuration::init_limit(optional<std::size_t> limit)
298 {
299  set(_init_limit, limit, "initLimit");
300  return *this;
301 }
302 
303 std::size_t configuration::sync_limit() const
304 {
305  return _sync_limit.value.value_or(default_sync_limit);
306 }
307 
308 configuration& configuration::sync_limit(optional<std::size_t> limit)
309 {
310  set(_sync_limit, limit, "syncLimit");
311  return *this;
312 }
313 
315 {
316  return _leader_serves.value.value_or(true);
317 }
318 
319 configuration& configuration::leader_serves(optional<bool> serve)
320 {
321  set(_leader_serves, serve, "leaderServes", [] (bool x) { return x ? "yes" : "no"; });
322  return *this;
323 }
324 
325 const std::set<std::string>& configuration::four_letter_word_whitelist() const
326 {
327  if (_four_letter_word_whitelist.value)
328  return *_four_letter_word_whitelist.value;
329  else
331 }
332 
333 configuration& configuration::four_letter_word_whitelist(optional<std::set<std::string>> words)
334 {
335  if (words && words->size() > 1U && words->count("*"))
336  throw std::invalid_argument("");
337 
338  set(_four_letter_word_whitelist,
339  std::move(words),
340  "4lw.commands.whitelist",
341  [] (const std::set<std::string>& words)
342  {
343  bool first = true;
344  std::ostringstream os;
345 
346  for (const auto& word : words)
347  {
348  if (!std::exchange(first, false))
349  os << ',';
350  os << word;
351  }
352 
353  return os.str();
354  }
355  );
356  return *this;
357 }
358 
359 std::map<server_id, std::string> configuration::servers() const
360 {
361  std::map<server_id, std::string> out;
362  for (const auto& entry : _server_paths)
363  out.insert({ entry.first, *entry.second.value });
364 
365  return out;
366 }
367 
369  std::string hostname,
370  std::uint16_t peer_port,
371  std::uint16_t leader_port
372  )
373 {
374  id.ensure_valid();
375 
376  if (_server_paths.count(id))
377  throw std::runtime_error(std::string("Already a server with ID ") + std::to_string(id.value));
378 
379  hostname += ":";
380  hostname += std::to_string(peer_port);
381  hostname += ":";
382  hostname += std::to_string(leader_port);
383 
384  auto iter = _server_paths.emplace(id, setting<std::string>()).first;
385  set(iter->second, some(std::move(hostname)), std::string("server.") + std::to_string(iter->first.value));
386  return *this;
387 }
388 
389 std::map<std::string, std::string> configuration::unknown_settings() const
390 {
391  std::map<std::string, std::string> out;
392  for (const auto& entry : _unknown_settings)
393  out.insert({ entry.first, *entry.second.value });
394 
395  return out;
396 }
397 
398 configuration& configuration::add_setting(std::string key, std::string value)
399 {
400  // This process is really inefficient, but people should not be using this very often. This is done this way because
401  // it is possible to specify a key that has a known setting (such as "dataDir"), which needs to be picked up
402  // correctly. `from_lines` has this logic, so just use it. Taking that matching logic out would be the best approach
403  // to take, but since this shouldn't be used, I haven't bothered.
404  auto source_file = _source_file;
405  auto lines = _lines;
406  lines.emplace_back(key + "=" + value);
407 
408  *this = configuration::from_lines(std::move(lines));
409  _source_file = std::move(source_file);
410  return *this;
411 }
412 
413 void configuration::save(std::ostream& os) const
414 {
415  for (const auto& line : _lines)
416  os << line << std::endl;
417 
418  os.flush();
419 }
420 
421 void configuration::save_file(std::string filename)
422 {
423  std::ofstream ofs(filename.c_str());
424  save(ofs);
425  if (ofs)
426  _source_file = std::move(filename);
427  else
428  throw std::runtime_error("Error saving file");
429 }
430 
431 bool operator==(const configuration& lhs, const configuration& rhs)
432 {
433  if (&lhs == &rhs)
434  return true;
435 
436  auto same_items = [] (const auto& a, const auto& b)
437  {
438  return a.first == b.first
439  && a.second.value == b.second.value;
440  };
441 
442  return lhs.client_port() == rhs.client_port()
443  && lhs.data_directory() == rhs.data_directory()
444  && lhs.tick_time() == rhs.tick_time()
445  && lhs.init_limit() == rhs.init_limit()
446  && lhs.sync_limit() == rhs.sync_limit()
447  && lhs.leader_serves() == rhs.leader_serves()
449  && lhs._server_paths.size() == rhs._server_paths.size()
450  && lhs._server_paths.end() == std::mismatch(lhs._server_paths.begin(), lhs._server_paths.end(),
451  rhs._server_paths.begin(), rhs._server_paths.end(),
452  same_items
453  ).first
454  && lhs._unknown_settings.size() == rhs._unknown_settings.size()
455  && lhs._unknown_settings.end() == std::mismatch(lhs._unknown_settings.begin(), lhs._unknown_settings.end(),
456  rhs._unknown_settings.begin(), rhs._unknown_settings.end(),
457  same_items
458  ).first
459  ;
460 }
461 
462 bool operator!=(const configuration& lhs, const configuration& rhs)
463 {
464  return !(lhs == rhs);
465 }
466 
467 }
T * ptr
A simple, unowned pointer.
Definition: config.hpp:37
std::map< server_id, std::string > servers() const
Get the servers which are part of the ZooKeeper ensemble.
Represents the ID of a server in the ensemble.
static const std::size_t default_sync_limit
The default value for sync_limit.
const optional< std::string > & source_file() const
Get the source file. This will only have a value if this was created by from_file.
bool is_minimal() const
Check if this is a "minimal" configuration – meaning it only has a data_directory and client_port se...
const optional< std::string > & data_directory() const
static configuration from_lines(std::vector< std::string > lines)
Load configuration from the provided lines.
static const std::set< std::string > all_four_letter_word_whitelist
A value for four_letter_word_whitelist that enables all commands.
const std::set< std::string > & four_letter_word_whitelist() const
std::map< std::string, std::string > unknown_settings() const
Get settings that were in the configuration file (or manually added with add_setting) but unknown to ...
static configuration from_file(std::string filename)
Load the configuration from a file.
static configuration from_stream(std::istream &stream)
Load configuration from the provided stream.
static const std::uint16_t default_client_port
The default value for client_port.
static const std::uint16_t default_leader_port
The default value for leader_port.
duration_type tick_time() const
static const std::size_t default_init_limit
The default value for init_limit.
static const std::uint16_t default_peer_port
The default value for peer_port.
static configuration from_string(string_view value)
Load configuration directly from the in-memory value.
std::size_t init_limit() const
void ensure_valid() const
Check that this ID is a valid one.
Represents a configuration which should be run by server instance.
void save(std::ostream &stream) const
Write this configuration to the provided stream.
friend bool operator==(const configuration &lhs, const configuration &rhs)
static const duration_type default_tick_time
The default value for tick_time.
void save_file(std::string filename)
Save this configuration to filename.
std::size_t sync_limit() const
std::uint16_t client_port() const
configuration & add_server(server_id id, std::string hostname, std::uint16_t peer_port=default_peer_port, std::uint16_t leader_port=default_leader_port)
Add a new server to the configuration.
value_type value
Underlying value of this ID.
Definition: types.hpp:34
static const std::set< std::string > default_four_letter_word_whitelist
The default value for four_letter_word_whitelist.
configuration & add_setting(std::string key, std::string value)
Add an arbitrary setting with the key and value.