-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Open
Description
I'm working on a small HTTP proxy using Seastar. It adds CORS headers to all OPTION requests, and most importanly, randomly drops a percentage of the traffic, to avoid overloading the backend. I ended up using raw TCP sockets because thats the way I did it without Seastar, so I was wondering if there is a more correct way of doing this? Or a faster way?
Here's my proxy:
#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/distributed.hh>
#include <seastar/http/httpd.hh>
#include <seastar/http/handlers.hh>
#include <seastar/http/reply.hh>
#include <seastar/net/inet_address.hh>
#include <seastar/util/log.hh>
#include <random>
#include <iostream>
using namespace seastar;
const sstring BACKEND_HOST_STR = "127.0.0.1";
const uint16_t BACKEND_PORT = 80;
const double DROP_PROBABILITY = 0.6;
static thread_local std::mt19937 rng(std::random_device{}());
static thread_local std::uniform_real_distribution<double> dist(0.0, 1.0);
void add_cors(http::reply& r) {
r.add_header("Access-Control-Allow-Origin", "*");
r.add_header("Access-Control-Allow-Methods", "GET,HEAD,POST,OPTIONS");
r.add_header("Access-Control-Allow-Headers", "Content-Type");
}
class proxy_handler : public http::handler_base {
public:
virtual future<std::unique_ptr<http::reply>> handle(const sstring& path,
std::unique_ptr<http::request> req, std::unique_ptr<http::reply> rep) override {
// Always add CORS
if (req->_method == "OPTIONS") {
rep->set_status(http::reply::status_type::ok);
add_cors(*rep);
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
// Drop a percentage of traffic
if (dist(rng) < DROP_PROBABILITY) {
rep->set_status(http::reply::status_type::no_content);
add_cors(*rep);
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
// Proxy to the backend
return forward_request(std::move(req), std::move(rep));
}
private:
future<std::unique_ptr<http::reply>> forward_request(std::unique_ptr<http::request> req, std::unique_ptr<http::reply> rep) {
socket_address backend_addr(ipv4_addr{BACKEND_HOST_STR, BACKEND_PORT});
return connect(make_ipv4_address(backend_addr)).then([req = std::move(req), rep = std::move(rep)](connected_socket s) mutable {
auto out = s.output();
auto in = s.input();
sstring http_req = req->_method + " " + req->_url + " HTTP/1.1\r\n";
bool host_set = false;
for (auto& h : req->_headers) {
http_req += h.first + ": " + h.second + "\r\n";
if (h.first == "Host") host_set = true;
}
if (!host_set) {
http_req += "Host: " + BACKEND_HOST_STR + "\r\n";
}
http_req += "Connection: close\r\n";
http_req += "\r\n";
if (!req->content.empty()) {
http_req += req->content;
}
return do_with(std::move(s), std::move(out), std::move(in), std::move(rep), std::move(http_req),
[](connected_socket& s, output_stream<char>& out, input_stream<char>& in, std::unique_ptr<http::reply>& rep, sstring& http_req_str) {
return out.write(http_req_str).then([&out] {
return out.flush();
}).then([&in, &rep] {
return in.read().then([&rep](temporary_buffer<char> buf) {
sstring raw_resp(buf.get(), buf.size());
size_t first_line_end = raw_resp.find("\r\n");
if (first_line_end != sstring::npos) {
sstring status_line = raw_resp.substr(0, first_line_end);
size_t space1 = status_line.find(' ');
size_t space2 = status_line.find(' ', space1 + 1);
if (space1 != sstring::npos) {
sstring code_str = status_line.substr(space1 + 1, space2 - space1 - 1);
try {
rep->set_status((http::reply::status_type)std::stoi(code_str));
} catch(...) { rep->set_status(http::reply::status_type::bad_gateway); }
}
size_t body_start = raw_resp.find("\r\n\r\n");
if (body_start != sstring::npos) {
rep->_content = raw_resp.substr(body_start + 4);
}
} else {
rep->set_status(http::reply::status_type::bad_gateway);
}
return make_ready_future<>();
});
}).then([&out] {
return out.close();
}).then([&in] {
return in.close();
}).then([&rep] {
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
});
});
});
}
};
int main(int argc, char** argv) {
app_template app;
return app.run_deprecated(argc, argv, [] {
return seastar::do_with(seastar::httpd::http_server_control(), [](seastar::httpd::http_server_control& server) {
server.start().then([&server] {
return server.set_routes([](seastar::httpd::routes& r) {
r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("").remaining("path"),
new proxy_handler());
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("").remaining("path"),
new proxy_handler());
r.add(seastar::httpd::operation_type::OPTIONS,
seastar::httpd::url("").remaining("path"),
new proxy_handler());
});
}).then([&server] {
return server.listen(ipv4_addr{8181});
}).then([] {
std::cout << "Seastar Proxy running on port 8181" << std::endl;
return seastar::keep_doing([] {
return seastar::sleep(std::chrono::seconds(1));
});
});
});
});
}Metadata
Metadata
Assignees
Labels
No labels