From 60d20e686ce85b2ba1e9ce355c8740ead5d8748f Mon Sep 17 00:00:00 2001 From: Pavel Odintsov Date: Tue, 22 Dec 2020 22:49:00 +0000 Subject: [PATCH] Added functions for InfluxDB operations (#902) --- src/fast_library.cpp | 395 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 395 insertions(+) diff --git a/src/fast_library.cpp b/src/fast_library.cpp index 30a167fc..dfcdca04 100644 --- a/src/fast_library.cpp +++ b/src/fast_library.cpp @@ -17,6 +17,16 @@ #include +#include +#include + +#include +#include +#include +#include +#include + + #include "simple_packet_capnp/simple_packet.capnp.h" #include #include @@ -1488,3 +1498,388 @@ bool validate_ipv6_or_ipv4_host(const std::string host) { return true; } +// We expect something like: 122.33.11.22:8080/somepath here +// And return: 122.33.11.22, 8080 and "/somepath" as separate parts +bool split_full_url(std::string full_url, std::string& host, std::string& port, std::string& path) { + auto delimiter_position = full_url.find("/"); + + if (delimiter_position == std::string::npos) { + host = full_url; + path = ""; + } else { + host = full_url.substr(0, delimiter_position); + // Add all symbols until the end of line to the path + path = full_url.substr(delimiter_position, std::string::npos); + } + + auto port_delimiter_position = host.find(":"); + + // Let's try to extract port if we have ":" delimiter in host + if (port_delimiter_position != std::string::npos) { + std::vector splitted_host; + + split(splitted_host, host, boost::is_any_of(":"), boost::token_compress_on); + + if (splitted_host.size() != 2) { + return false; + } + + host = splitted_host[0]; + port = splitted_host[1]; + } + + return true; +} + + +// Encrypted version of execute_web_request +bool execute_web_request_secure(std::string address, + std::string request_type, + std::string post_data, + uint32_t& response_code, + std::string& response_body, + std::map& headers) { + + extern log4cpp::Category& logger; + + std::string host; + std::string path; + std::string port = "443"; + + if (address.find("https://") == std::string::npos) { + logger << log4cpp::Priority::ERROR << "URL has not supported protocol prefix: " << address; + logger << log4cpp::Priority::ERROR << "We have support only for https"; + + return false; + } + + // Remove URL prefix + boost::replace_all(address, "https://", ""); + + bool split_result = split_full_url(address, host, port, path); + + if (!split_result) { + logger << log4cpp::Priority::ERROR << "Could not split URL into components"; + return false; + } + + if (request_type != "post" && request_type != "get") { + logger << log4cpp::Priority::ERROR << "execute_web_request has support only for post and get requests"; + return false; + } + + // If customer uses address like: 11.22.33.44:8080 without any path we should add it manually to comply with http protocol + if (path == "") { + path = "/"; + } + + try { + boost::system::error_code ec; + + boost::asio::io_context ioc; + + // The SSL context is required, and holds certificates + boost::asio::ssl::context ctx{ boost::asio::ssl::context::sslv23_client }; + + // Load default CA certificates + ctx.set_default_verify_paths(); + + boost::asio::ip::tcp::resolver r(ioc); + + boost::asio::ip::tcp::resolver resolver{ ioc }; + boost::asio::ssl::stream stream{ ioc, ctx }; + + // Set SNI Hostname + if (!SSL_set_tlsext_host_name(stream.native_handle(), host.c_str())) { + boost::system::error_code ec{ static_cast(::ERR_get_error()), boost::asio::error::get_ssl_category() }; + logger << log4cpp::Priority::ERROR << "Can't set SNI hostname: " << ec.message(); + return false; + } + + auto end_point = r.resolve(boost::asio::ip::tcp::resolver::query{ host, port }, ec); + + if (ec) { + logger << log4cpp::Priority::ERROR << "Could not resolve peer address in execute_web_request " << ec; + return false; + } + + logger << log4cpp::Priority::INFO << "Resolved domain to " << end_point.size() << " IP addresses"; + + boost::asio::connect(stream.next_layer(), end_point.begin(), end_point.end(), ec); + + if (ec) { + logger << log4cpp::Priority::ERROR << "Could not connect to peer in execute_web_request " << ec.message(); + return false; + } + + stream.handshake(boost::asio::ssl::stream_base::client, ec); + + if (ec) { + logger << log4cpp::Priority::ERROR << "SSL handshake failed " << ec.message(); + return false; + } + + // logger << log4cpp::Priority::INFO << "SSL connection established"; + + // Send HTTP request using beast + boost::beast::http::request req; + + if (request_type == "post") { + req.method(boost::beast::http::verb::post); + } else if (request_type == "get") { + req.method(boost::beast::http::verb::get); + } + + for (const auto& [k, v] : headers) { + req.set(k, v); + } + + req.target(path); + req.version(11); + + // Pass data only for post request + if (request_type == "post") { + req.body() = post_data; + } + + req.set(boost::beast::http::field::content_type, "application/x-www-form-urlencoded"); + + // We must specify port explicitly if we use non standard one + std::string full_host = host + ":" + std::to_string(stream.next_layer().remote_endpoint().port()); + // logger << log4cpp::Priority::INFO << "I will use " << full_host << " as host"; + req.set(boost::beast::http::field::host, full_host.c_str()); + + // TBD: we also should add port number to host name if we use non standard one + // + ":" + std::to_string(end_point.port())); + req.set(boost::beast::http::field::user_agent, "FastNetMon"); + + req.prepare_payload(); + boost::beast::http::write(stream, req, ec); + + if (ec) { + logger << log4cpp::Priority::ERROR << "Could not write data to socket in execute_web_request: " << ec.message(); + return false; + } + + // Receive and print HTTP response using beast + // This buffer is used for reading and must be persisted + boost::beast::flat_buffer b; + + boost::beast::http::response resp; + boost::beast::http::read(stream, b, resp, ec); + + if (ec) { + logger << log4cpp::Priority::ERROR << "Could not read data inside execute_web_request: " << ec.message(); + return false; + } + + response_code = resp.result_int(); + + // Return response body to caller + response_body = resp.body(); + + // logger << log4cpp::Priority::INFO << "Response code: " << response_code; + + // Gracefully close the stream + stream.shutdown(ec); + if (ec == boost::asio::error::eof) { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec.assign(0, ec.category()); + } + + if (ec) { + logger << log4cpp::Priority::DEBUG << "Can't shutdown connection gracefully: " << ec.message(); + // But we should not return error to caller in this case because we pushed data properly + } + + return true; + } catch (std::exception& e) { + logger << log4cpp::Priority::ERROR << "execute_web_request failed with error: " << e.what(); + return false; + } + + return false; +} + + +bool execute_web_request(std::string address, + std::string request_type, + std::string post_data, + uint32_t& response_code, + std::string& response_body, + std::map& headers, + std::string& error_text) { + std::string host; + std::string path; + std::string port = "http"; + + if (address.find("https://") != std::string::npos) { + return execute_web_request_secure(address, request_type, post_data, response_code, response_body, headers); + } + + if (address.find("http://") == std::string::npos) { + error_text = "URL has not supported protocol prefix: " + address; + return false; + } + + // Remove URL prefix + boost::replace_all(address, "http://", ""); + + bool split_result = split_full_url(address, host, port, path); + + if (!split_result) { + error_text = "Could not split URL into components"; + return false; + } + + // If customer uses address like: 11.22.33.44:8080 without any path we should add it manually to comply with http protocol + if (path == "") { + path = "/"; + } + + if (request_type != "post" && request_type != "get") { + error_text = "execute_web_request has support only for post and get requests. Requested: "; + error_text += request_type; + + return false; + } + + try { + boost::system::error_code ec; + + // Normal boost::asio setup + // std::string const host = "178.62.227.110"; + boost::asio::io_service ios; + boost::asio::ip::tcp::resolver r(ios); + boost::asio::ip::tcp::socket sock(ios); + + auto end_point = r.resolve(boost::asio::ip::tcp::resolver::query{ host, port }, ec); + + if (ec) { + error_text = "Could not resolve peer address in execute_web_request " + ec.message(); + return false; + } + + boost::asio::connect(sock, end_point, ec); + + if (ec) { + error_text = "Could not connect to peer in execute_web_request " + ec.message(); + return false; + } + + // Send HTTP request using beast + boost::beast::http::request req; + + if (request_type == "post") { + req.method(boost::beast::http::verb::post); + } else if (request_type == "get") { + req.method(boost::beast::http::verb::get); + } + + for (const auto& [k, v] : headers) { + req.set(k, v); + } + + req.target(path); + req.version(11); + + // Pass data only for post request + if (request_type == "post") { + req.body() = post_data; + } + + req.set(boost::beast::http::field::content_type, "application/x-www-form-urlencoded"); + req.set(boost::beast::http::field::host, host + ":" + std::to_string(sock.remote_endpoint().port())); + req.set(boost::beast::http::field::user_agent, "FastNetMon"); + + req.prepare_payload(); + boost::beast::http::write(sock, req, ec); + + if (ec) { + error_text = "Could not write data to socket in execute_web_request: " + ec.message(); + return false; + } + + // Receive and print HTTP response using beast + // This buffer is used for reading and must be persisted + boost::beast::flat_buffer b; + + boost::beast::http::response resp; + boost::beast::http::read(sock, b, resp, ec); + + if (ec) { + error_text = "Could not read data inside execute_web_request: "; + error_text += ec.message(); + return false; + } + + response_code = resp.result_int(); + + response_body = resp.body(); + + using tcp = boost::asio::ip::tcp; + // Gracefully close the socket + sock.shutdown(tcp::socket::shutdown_both, ec); + + // We ignore ec error here from shutdown + + return true; + } catch (std::exception& e) { + error_text = "execute_web_request failed with error: "; + error_text += e.what(); + return false; + } + + return false; +} + +// Write data to influxdb +bool write_data_to_influxdb(std::string database, + std::string host, + std::string port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string query) { + uint32_t response_code = 0; + + std::string address = host + ":" + port; + + std::string influxdb_query_string = std::string("http://") + address + "/write?db=" + database; + + // Add auth credentials + if (enable_auth) { + influxdb_query_string += "&u=" + influx_user + "&p=" + influx_password; + } + + // TODO: I have an idea to reduce number of active TIME_WAIT connections and we have function + // execute_web_request_connection_close + // But I suppose issues on InfluxDB side and raised ticket about it + // https://github.com/influxdata/influxdb/issues/8525 + // And we could not switch to it yet + + // We do not need it here but function requires this option + std::string response_body; + + std::map headers; + std::string error_text; + bool result = execute_web_request(influxdb_query_string, "post", query, response_code, response_body, headers, error_text); + + if (!result) { + return false; + } + + if (response_code != 204) { + return false; + } + + return true; +} + +uint64_t get_current_unix_time_in_nanoseconds() { + auto unix_timestamp = std::chrono::seconds(std::time(NULL)); + uint64_t unix_timestamp_nanoseconds = std::chrono::milliseconds(unix_timestamp).count() * 1000 * 1000; + return unix_timestamp_nanoseconds; +} +