Skip to content

Seastar-way of proxying HTTP requests? #3112

@VinnyVicious

Description

@VinnyVicious

I'm working on a small HTTP proxy using Seastar. It adds CORS headers to all OPTION requests, and most importanly, randomly drops a percentage of the traffic, to avoid overloading the backend. I ended up using raw TCP sockets because thats the way I did it without Seastar, so I was wondering if there is a more correct way of doing this? Or a faster way?

Here's my proxy:

#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/distributed.hh>
#include <seastar/http/httpd.hh>
#include <seastar/http/handlers.hh>
#include <seastar/http/reply.hh>
#include <seastar/net/inet_address.hh>
#include <seastar/util/log.hh>
#include <random>
#include <iostream>

using namespace seastar;

const sstring BACKEND_HOST_STR = "127.0.0.1";
const uint16_t BACKEND_PORT = 80;
const double DROP_PROBABILITY = 0.6;

static thread_local std::mt19937 rng(std::random_device{}());
static thread_local std::uniform_real_distribution<double> dist(0.0, 1.0);

void add_cors(http::reply& r) {
    r.add_header("Access-Control-Allow-Origin", "*");
    r.add_header("Access-Control-Allow-Methods", "GET,HEAD,POST,OPTIONS");
    r.add_header("Access-Control-Allow-Headers", "Content-Type");
}

class proxy_handler : public http::handler_base {
public:
    virtual future<std::unique_ptr<http::reply>> handle(const sstring& path,
            std::unique_ptr<http::request> req, std::unique_ptr<http::reply> rep) override {
        
        // Always add CORS
        if (req->_method == "OPTIONS") {
            rep->set_status(http::reply::status_type::ok);
            add_cors(*rep);
            return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
        }

        // Drop a percentage of traffic
        if (dist(rng) < DROP_PROBABILITY) {
            rep->set_status(http::reply::status_type::no_content);
            add_cors(*rep);
            return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
        }

        // Proxy to the backend
        return forward_request(std::move(req), std::move(rep));
    }

private:
    future<std::unique_ptr<http::reply>> forward_request(std::unique_ptr<http::request> req, std::unique_ptr<http::reply> rep) {        
        socket_address backend_addr(ipv4_addr{BACKEND_HOST_STR, BACKEND_PORT});

        return connect(make_ipv4_address(backend_addr)).then([req = std::move(req), rep = std::move(rep)](connected_socket s) mutable {
            auto out = s.output();
            auto in = s.input();
            sstring http_req = req->_method + " " + req->_url + " HTTP/1.1\r\n";
            
            bool host_set = false;
            for (auto& h : req->_headers) {
                http_req += h.first + ": " + h.second + "\r\n";
                if (h.first == "Host") host_set = true;
            }
            if (!host_set) {
                http_req += "Host: " + BACKEND_HOST_STR + "\r\n";
            }
            http_req += "Connection: close\r\n";
            http_req += "\r\n";
            
            if (!req->content.empty()) {
                http_req += req->content;
            }

            return do_with(std::move(s), std::move(out), std::move(in), std::move(rep), std::move(http_req),
                [](connected_socket& s, output_stream<char>& out, input_stream<char>& in, std::unique_ptr<http::reply>& rep, sstring& http_req_str) {
                    
                return out.write(http_req_str).then([&out] {
                    return out.flush();
                }).then([&in, &rep] {
                    return in.read().then([&rep](temporary_buffer<char> buf) {
                        sstring raw_resp(buf.get(), buf.size());
                        
                        size_t first_line_end = raw_resp.find("\r\n");
                        if (first_line_end != sstring::npos) {
                            sstring status_line = raw_resp.substr(0, first_line_end);
                            size_t space1 = status_line.find(' ');
                            size_t space2 = status_line.find(' ', space1 + 1);
                            if (space1 != sstring::npos) {
                                sstring code_str = status_line.substr(space1 + 1, space2 - space1 - 1);
                                try {
                                    rep->set_status((http::reply::status_type)std::stoi(code_str));
                                } catch(...) { rep->set_status(http::reply::status_type::bad_gateway); }
                            }

                            size_t body_start = raw_resp.find("\r\n\r\n");
                            if (body_start != sstring::npos) {
                                rep->_content = raw_resp.substr(body_start + 4);
                            }
                        } else {
                             rep->set_status(http::reply::status_type::bad_gateway);
                        }
                        
                        return make_ready_future<>();
                    });
                }).then([&out] {
                    return out.close();
                }).then([&in] {
                    return in.close();
                }).then([&rep] {
                    return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
                });
            });
        });
    }
};

int main(int argc, char** argv) {
    app_template app;

    return app.run_deprecated(argc, argv, [] {
        return seastar::do_with(seastar::httpd::http_server_control(), [](seastar::httpd::http_server_control& server) {
            server.start().then([&server] {
                return server.set_routes([](seastar::httpd::routes& r) {
                    r.add(seastar::httpd::operation_type::GET,
                        seastar::httpd::url("").remaining("path"),
                        new proxy_handler());
                    r.add(seastar::httpd::operation_type::POST,
                        seastar::httpd::url("").remaining("path"),
                        new proxy_handler());
                    r.add(seastar::httpd::operation_type::OPTIONS,
                        seastar::httpd::url("").remaining("path"),
                        new proxy_handler());
                });
            }).then([&server] {
                return server.listen(ipv4_addr{8181});
            }).then([] {
                std::cout << "Seastar Proxy running on port 8181" << std::endl;
                return seastar::keep_doing([] {
                    return seastar::sleep(std::chrono::seconds(1));
                });
            });
        });
    });
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions