Added functions for InfluxDB operations (#902)

This commit is contained in:
Pavel Odintsov 2020-12-22 22:49:00 +00:00 committed by GitHub
parent 11244a4926
commit 60d20e686c
Signed by: GitHub
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,16 @@
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include "simple_packet_capnp/simple_packet.capnp.h" #include "simple_packet_capnp/simple_packet.capnp.h"
#include <capnp/message.h> #include <capnp/message.h>
#include <capnp/serialize-packed.h> #include <capnp/serialize-packed.h>
@ -1488,3 +1498,388 @@ bool validate_ipv6_or_ipv4_host(const std::string host) {
return true; return true;
} }
// We expect something like: 122.33.11.22:8080/somepath here
// And return: 122.33.11.22, 8080 and "/somepath" as separate parts
bool split_full_url(std::string full_url, std::string& host, std::string& port, std::string& path) {
auto delimiter_position = full_url.find("/");
if (delimiter_position == std::string::npos) {
host = full_url;
path = "";
} else {
host = full_url.substr(0, delimiter_position);
// Add all symbols until the end of line to the path
path = full_url.substr(delimiter_position, std::string::npos);
}
auto port_delimiter_position = host.find(":");
// Let's try to extract port if we have ":" delimiter in host
if (port_delimiter_position != std::string::npos) {
std::vector<std::string> splitted_host;
split(splitted_host, host, boost::is_any_of(":"), boost::token_compress_on);
if (splitted_host.size() != 2) {
return false;
}
host = splitted_host[0];
port = splitted_host[1];
}
return true;
}
// Encrypted version of execute_web_request
bool execute_web_request_secure(std::string address,
std::string request_type,
std::string post_data,
uint32_t& response_code,
std::string& response_body,
std::map<std::string, std::string>& headers) {
extern log4cpp::Category& logger;
std::string host;
std::string path;
std::string port = "443";
if (address.find("https://") == std::string::npos) {
logger << log4cpp::Priority::ERROR << "URL has not supported protocol prefix: " << address;
logger << log4cpp::Priority::ERROR << "We have support only for https";
return false;
}
// Remove URL prefix
boost::replace_all(address, "https://", "");
bool split_result = split_full_url(address, host, port, path);
if (!split_result) {
logger << log4cpp::Priority::ERROR << "Could not split URL into components";
return false;
}
if (request_type != "post" && request_type != "get") {
logger << log4cpp::Priority::ERROR << "execute_web_request has support only for post and get requests";
return false;
}
// If customer uses address like: 11.22.33.44:8080 without any path we should add it manually to comply with http protocol
if (path == "") {
path = "/";
}
try {
boost::system::error_code ec;
boost::asio::io_context ioc;
// The SSL context is required, and holds certificates
boost::asio::ssl::context ctx{ boost::asio::ssl::context::sslv23_client };
// Load default CA certificates
ctx.set_default_verify_paths();
boost::asio::ip::tcp::resolver r(ioc);
boost::asio::ip::tcp::resolver resolver{ ioc };
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> stream{ ioc, ctx };
// Set SNI Hostname
if (!SSL_set_tlsext_host_name(stream.native_handle(), host.c_str())) {
boost::system::error_code ec{ static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category() };
logger << log4cpp::Priority::ERROR << "Can't set SNI hostname: " << ec.message();
return false;
}
auto end_point = r.resolve(boost::asio::ip::tcp::resolver::query{ host, port }, ec);
if (ec) {
logger << log4cpp::Priority::ERROR << "Could not resolve peer address in execute_web_request " << ec;
return false;
}
logger << log4cpp::Priority::INFO << "Resolved domain to " << end_point.size() << " IP addresses";
boost::asio::connect(stream.next_layer(), end_point.begin(), end_point.end(), ec);
if (ec) {
logger << log4cpp::Priority::ERROR << "Could not connect to peer in execute_web_request " << ec.message();
return false;
}
stream.handshake(boost::asio::ssl::stream_base::client, ec);
if (ec) {
logger << log4cpp::Priority::ERROR << "SSL handshake failed " << ec.message();
return false;
}
// logger << log4cpp::Priority::INFO << "SSL connection established";
// Send HTTP request using beast
boost::beast::http::request<boost::beast::http::string_body> req;
if (request_type == "post") {
req.method(boost::beast::http::verb::post);
} else if (request_type == "get") {
req.method(boost::beast::http::verb::get);
}
for (const auto& [k, v] : headers) {
req.set(k, v);
}
req.target(path);
req.version(11);
// Pass data only for post request
if (request_type == "post") {
req.body() = post_data;
}
req.set(boost::beast::http::field::content_type, "application/x-www-form-urlencoded");
// We must specify port explicitly if we use non standard one
std::string full_host = host + ":" + std::to_string(stream.next_layer().remote_endpoint().port());
// logger << log4cpp::Priority::INFO << "I will use " << full_host << " as host";
req.set(boost::beast::http::field::host, full_host.c_str());
// TBD: we also should add port number to host name if we use non standard one
// + ":" + std::to_string(end_point.port()));
req.set(boost::beast::http::field::user_agent, "FastNetMon");
req.prepare_payload();
boost::beast::http::write(stream, req, ec);
if (ec) {
logger << log4cpp::Priority::ERROR << "Could not write data to socket in execute_web_request: " << ec.message();
return false;
}
// Receive and print HTTP response using beast
// This buffer is used for reading and must be persisted
boost::beast::flat_buffer b;
boost::beast::http::response<boost::beast::http::string_body> resp;
boost::beast::http::read(stream, b, resp, ec);
if (ec) {
logger << log4cpp::Priority::ERROR << "Could not read data inside execute_web_request: " << ec.message();
return false;
}
response_code = resp.result_int();
// Return response body to caller
response_body = resp.body();
// logger << log4cpp::Priority::INFO << "Response code: " << response_code;
// Gracefully close the stream
stream.shutdown(ec);
if (ec == boost::asio::error::eof) {
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
if (ec) {
logger << log4cpp::Priority::DEBUG << "Can't shutdown connection gracefully: " << ec.message();
// But we should not return error to caller in this case because we pushed data properly
}
return true;
} catch (std::exception& e) {
logger << log4cpp::Priority::ERROR << "execute_web_request failed with error: " << e.what();
return false;
}
return false;
}
bool execute_web_request(std::string address,
std::string request_type,
std::string post_data,
uint32_t& response_code,
std::string& response_body,
std::map<std::string, std::string>& headers,
std::string& error_text) {
std::string host;
std::string path;
std::string port = "http";
if (address.find("https://") != std::string::npos) {
return execute_web_request_secure(address, request_type, post_data, response_code, response_body, headers);
}
if (address.find("http://") == std::string::npos) {
error_text = "URL has not supported protocol prefix: " + address;
return false;
}
// Remove URL prefix
boost::replace_all(address, "http://", "");
bool split_result = split_full_url(address, host, port, path);
if (!split_result) {
error_text = "Could not split URL into components";
return false;
}
// If customer uses address like: 11.22.33.44:8080 without any path we should add it manually to comply with http protocol
if (path == "") {
path = "/";
}
if (request_type != "post" && request_type != "get") {
error_text = "execute_web_request has support only for post and get requests. Requested: ";
error_text += request_type;
return false;
}
try {
boost::system::error_code ec;
// Normal boost::asio setup
// std::string const host = "178.62.227.110";
boost::asio::io_service ios;
boost::asio::ip::tcp::resolver r(ios);
boost::asio::ip::tcp::socket sock(ios);
auto end_point = r.resolve(boost::asio::ip::tcp::resolver::query{ host, port }, ec);
if (ec) {
error_text = "Could not resolve peer address in execute_web_request " + ec.message();
return false;
}
boost::asio::connect(sock, end_point, ec);
if (ec) {
error_text = "Could not connect to peer in execute_web_request " + ec.message();
return false;
}
// Send HTTP request using beast
boost::beast::http::request<boost::beast::http::string_body> req;
if (request_type == "post") {
req.method(boost::beast::http::verb::post);
} else if (request_type == "get") {
req.method(boost::beast::http::verb::get);
}
for (const auto& [k, v] : headers) {
req.set(k, v);
}
req.target(path);
req.version(11);
// Pass data only for post request
if (request_type == "post") {
req.body() = post_data;
}
req.set(boost::beast::http::field::content_type, "application/x-www-form-urlencoded");
req.set(boost::beast::http::field::host, host + ":" + std::to_string(sock.remote_endpoint().port()));
req.set(boost::beast::http::field::user_agent, "FastNetMon");
req.prepare_payload();
boost::beast::http::write(sock, req, ec);
if (ec) {
error_text = "Could not write data to socket in execute_web_request: " + ec.message();
return false;
}
// Receive and print HTTP response using beast
// This buffer is used for reading and must be persisted
boost::beast::flat_buffer b;
boost::beast::http::response<boost::beast::http::string_body> resp;
boost::beast::http::read(sock, b, resp, ec);
if (ec) {
error_text = "Could not read data inside execute_web_request: ";
error_text += ec.message();
return false;
}
response_code = resp.result_int();
response_body = resp.body();
using tcp = boost::asio::ip::tcp;
// Gracefully close the socket
sock.shutdown(tcp::socket::shutdown_both, ec);
// We ignore ec error here from shutdown
return true;
} catch (std::exception& e) {
error_text = "execute_web_request failed with error: ";
error_text += e.what();
return false;
}
return false;
}
// Write data to influxdb
bool write_data_to_influxdb(std::string database,
std::string host,
std::string port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string query) {
uint32_t response_code = 0;
std::string address = host + ":" + port;
std::string influxdb_query_string = std::string("http://") + address + "/write?db=" + database;
// Add auth credentials
if (enable_auth) {
influxdb_query_string += "&u=" + influx_user + "&p=" + influx_password;
}
// TODO: I have an idea to reduce number of active TIME_WAIT connections and we have function
// execute_web_request_connection_close
// But I suppose issues on InfluxDB side and raised ticket about it
// https://github.com/influxdata/influxdb/issues/8525
// And we could not switch to it yet
// We do not need it here but function requires this option
std::string response_body;
std::map<std::string, std::string> headers;
std::string error_text;
bool result = execute_web_request(influxdb_query_string, "post", query, response_code, response_body, headers, error_text);
if (!result) {
return false;
}
if (response_code != 204) {
return false;
}
return true;
}
uint64_t get_current_unix_time_in_nanoseconds() {
auto unix_timestamp = std::chrono::seconds(std::time(NULL));
uint64_t unix_timestamp_nanoseconds = std::chrono::milliseconds(unix_timestamp).count() * 1000 * 1000;
return unix_timestamp_nanoseconds;
}