Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8a98113
build(deps): bump linkerd/dev from v44 to v45
olix0r Jan 5, 2025
5229b3c
wip: update kube and hyper
olix0r Jan 5, 2025
347be22
build(deps): bump hyper from 0.3 to 1.0
olix0r Jan 6, 2025
b059d62
chore: bump kubert v0.23.0-alpha6
olix0r Jan 13, 2025
69f3406
Merge branch 'main' into ver/kube-alpha
olix0r Jan 13, 2025
9982985
Merge branch 'main' into ver/kube-alpha
olix0r Jan 25, 2025
e3d2641
Merge branch 'main' into ver/kube-alpha
olix0r Feb 12, 2025
198790d
Merge branch 'main' into ver/kube-alpha
olix0r Feb 17, 2025
989ac54
build(deps): bump kubert to v0.23.0-alpha8
olix0r Feb 17, 2025
2267a80
feat(policy): instrument runtime metrics
olix0r Feb 17, 2025
c3914ba
build(deps): bump kubert to v0.23.0-alpha9
olix0r Feb 17, 2025
f86886a
build(deps): bump kubert to v0.23.0-alpha10
olix0r Feb 17, 2025
47f8b85
Merge branch 'main' into ver/kube-alpha
olix0r Feb 17, 2025
983ce06
build(deps): address dependency auditing lints
olix0r Feb 17, 2025
b542e14
chore(policy): fix lint
olix0r Feb 17, 2025
a3dbbf6
lessen diff
olix0r Feb 17, 2025
eb9c78a
Merge branch 'main' into ver/kube-alpha
olix0r Feb 17, 2025
c389c0b
refactor(policy): extract lease init into a separate module
olix0r Feb 17, 2025
24146ce
Merge branch 'ver/lease-prep' into ver/kube-alpha
olix0r Feb 17, 2025
013f1e3
Merge branch 'main' into ver/kube-alpha
olix0r Feb 18, 2025
f3e09a9
chore(policy): disable runtime-diagnostics initially
olix0r Feb 18, 2025
11586a7
chore(policy): lessen diff
olix0r Feb 18, 2025
6a880fc
chore(cargo): make tower a workspace dep
olix0r Feb 18, 2025
b812771
build(deps): bump kubert to v0.23.0
olix0r Feb 18, 2025
e9d2983
build(deps): bump kubert to v0.23.0
olix0r Feb 18, 2025
992e2c0
Merge branch 'main' into ver/kube-alpha
olix0r Feb 18, 2025
406f49b
Merge branch 'main' into ver/kube-alpha
olix0r Feb 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
984 changes: 440 additions & 544 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 12 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@ members = [
lto = "thin"

[workspace.dependencies]
http = "0.2"
hyper = { version = "0.14" }
k8s-openapi = { version = "0.20", features = ["v1_22"] }
kube = { version = "0.87.1", default-features = false }
kubert = { version = "0.22", default-features = false }
prometheus-client = { version = "0.22", default-features = false }
tonic = { version = "0.10", default-features = false }
http = "1"
hyper = "1"
k8s-openapi = { version = "0.24", features = ["v1_31"] }
kube = { version = "0.98", default-features = false }
kubert = { version = "0.23", default-features = false }
prometheus-client = { version = "0.23", default-features = false }
tonic = { version = "0.12", default-features = false }
tower = { version = "0.5", default-features = false }

[workspace.dependencies.k8s-gateway-api]
version = "0.16"
# TODO(ver): Remove this once we update to a proper generated version of the gateway api bindings.
git = "https://github.com/linkerd/k8s-gateway-api-rs"
features = ["experimental"]

[workspace.dependencies.linkerd2-proxy-api]
version = "0.15"
git = "https://github.com/linkerd/linkerd2-proxy-api"
branch = "main"
features = [
"inbound",
"outbound",
Expand Down
20 changes: 7 additions & 13 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ targets = [
db-path = "~/.cargo/advisory-db"
db-urls = ["https://github.com/rustsec/advisory-db"]
ignore = [
# Pending kube update
# instant is unmaintained, but pulled in via kube
"RUSTSEC-2024-0384",
"RUSTSEC-2024-0388",
]

[licenses]
Expand All @@ -22,6 +21,7 @@ allow = [
"ISC",
"MIT",
"Unicode-3.0",
"Zlib",
]
confidence-threshold = 0.8
exceptions = [
Expand All @@ -30,10 +30,6 @@ exceptions = [
"MIT",
"OpenSSL",
], name = "ring", version = "*" },

{ allow = [
"Zlib",
], name = "adler32" },
]

[[licenses.clarify]]
Expand All @@ -48,12 +44,6 @@ multiple-versions = "deny"
wildcards = "allow"
highlight = "all"
skip = [
# `rustls-pemfile` and `k8s-openapi` depend on versions of `base64` that
# have diverged significantly.
{ name = "base64" },
# `tower-http` (a transitive dep via `kubert`) depends on v2.x of `bitflags`,
# while pretty much the entire rest of the world is still on v1.x
{ name = "bitflags", version = "1.0" },
# https://github.com/hawkw/matchers/pull/4
{ name = "regex-automata", version = "0.1" },
{ name = "regex-syntax", version = "0.6" },
Expand All @@ -66,6 +56,8 @@ skip-tree = [
{ name = "thiserror", version = "1" },
# rand v0.9 is still making its way through the ecosystem
{ name = "rand", version = "0.8" },
# https://github.com/hyperium/tonic/issues/2135
{ name = "tonic", version = "0.12" },
]

[sources]
Expand All @@ -75,4 +67,6 @@ allow-registry = ["https://github.com/rust-lang/crates.io-index"]
allow-git = []

[sources.allow-org]
github = []
github = [
"linkerd",
]
4 changes: 2 additions & 2 deletions policy-controller/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ async-trait = "0.1"
http = { workspace = true }
drain = "0.1"
futures = { version = "0.3", default-features = false }
hyper = { workspace = true, features = ["http2", "server", "tcp"] }
hyper = { workspace = true, features = ["http2", "server"] }
linkerd-policy-controller-core = { path = "../core" }
maplit = "1"
prost-types = "0.12.6"
prost-types = "0.13"
tokio = { version = "1", features = ["macros"] }
tonic = { workspace = true }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/grpc/src/outbound/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub(crate) fn protocol(
}),
http1: Some(outbound::proxy_protocol::Http1 {
routes: routes.clone(),
failure_accrual: accrual.clone(),
failure_accrual: accrual,
}),
http2: Some(outbound::proxy_protocol::Http2 {
routes,
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/status/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ kubert = { workspace = true, default-features = false, features = [
linkerd-policy-controller-core = { path = "../../core" }
linkerd-policy-controller-k8s-api = { path = "../api" }
parking_lot = "0.12"
prometheus-client = { workspace = true, default-features = false }
prometheus-client = { workspace = true }
serde = "1"
serde_json = "1.0.138"
thiserror = "2"
Expand Down
3 changes: 1 addition & 2 deletions policy-controller/k8s/status/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ impl ControllerMetrics {
patch_timeout.clone(),
);

let patch_duration =
Histogram::new([0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0].into_iter());
let patch_duration = Histogram::new([0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]);
prom.register_with_unit(
"patch_duration",
"Histogram of time taken to apply patch operations",
Expand Down
9 changes: 7 additions & 2 deletions policy-controller/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ rustls-tls = ["kube/rustls-tls"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
bytes = "1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
k8s-openapi = { workspace = true }
hyper = { workspace = true, features = ["http1", "http2", "runtime", "server"] }
http-body-util = "0.1"
hyper = { workspace = true, features = ["http1", "http2", "server"] }
ipnet = { version = "2", default-features = false }
openssl = { version = "0.10.71", optional = true }
parking_lot = "0.12"
prometheus-client = { workspace = true, default-features = false }
prometheus-client = { workspace = true }
serde = "1"
serde_json = "1"
thiserror = "2"
tokio-stream = { version = "0.1", features = ["sync"] }
tower = { workspace = true }
tracing = "0.1"
regex = "1"

Expand Down Expand Up @@ -56,6 +59,8 @@ features = [
"lease",
"prometheus-client",
"runtime",
# TODO(ver): enable runtime diagnostics
#"runtime-diagnostics",
"server",
"rustls-tls",
]
Expand Down
12 changes: 8 additions & 4 deletions policy-controller/runtime/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::k8s::policy::{
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use futures::future;
use hyper::{body::Buf, http, Body, Request, Response};
use http_body_util::BodyExt;
use hyper::{http, Request, Response};
use k8s_openapi::api::core::v1::{Namespace, ServiceAccount};
use kube::{core::DynamicObject, Resource, ResourceExt};
use linkerd_policy_controller_k8s_api::gateway;
Expand Down Expand Up @@ -48,9 +49,11 @@ trait Validate<T> {
) -> Result<()>;
}

type Body = http_body_util::Full<bytes::Bytes>;

// === impl AdmissionService ===

impl hyper::service::Service<Request<Body>> for Admission {
impl tower::Service<Request<hyper::body::Incoming>> for Admission {
type Response = Response<Body>;
type Error = Error;
type Future = future::BoxFuture<'static, Result<Response<Body>, Error>>;
Expand All @@ -62,7 +65,7 @@ impl hyper::service::Service<Request<Body>> for Admission {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<hyper::body::Incoming>) -> Self::Future {
trace!(?req);
if req.method() != http::Method::POST || req.uri().path() != "/" {
return Box::pin(future::ok(
Expand All @@ -75,7 +78,8 @@ impl hyper::service::Service<Request<Body>> for Admission {

let admission = self.clone();
Box::pin(async move {
let bytes = hyper::body::aggregate(req.into_body()).await?;
use bytes::Buf;
let bytes = req.into_body().collect().await?.to_bytes();
let review: Review = match serde_json::from_reader(bytes.reader()) {
Ok(review) => review,
Err(error) => {
Expand Down
44 changes: 23 additions & 21 deletions policy-controller/runtime/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,32 @@ use tokio::{sync::watch, time};
const LEASE_DURATION: time::Duration = time::Duration::from_secs(30);
const LEASE_NAME: &str = "policy-controller-write";
const RENEW_GRACE_PERIOD: time::Duration = time::Duration::from_secs(1);
const FIELD_MANAGER: &str = "policy-controller";

pub async fn init<T>(
runtime: &kubert::Runtime<T>,
ns: &str,
namespace: &str,
deployment_name: &str,
hostname: &str,
claimant: &str,
) -> Result<watch::Receiver<Arc<kubert::lease::Claim>>> {
let params = kubert::LeaseParams {
name: LEASE_NAME.to_string(),
namespace: namespace.to_string(),
claimant: claimant.to_string(),
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
field_manager: Some(FIELD_MANAGER.into()),
};

// Fetch the policy-controller deployment so that we can use it as an owner
// reference of the Lease.
let api = k8s::Api::<Deployment>::namespaced(runtime.client(), ns);
let api = k8s::Api::<Deployment>::namespaced(runtime.client(), &params.namespace);
let deployment = api.get(deployment_name).await?;

let lease = coordv1::Lease {
metadata: ObjectMeta {
name: Some(LEASE_NAME.to_string()),
namespace: Some(ns.to_string()),
name: Some(params.name.clone()),
namespace: Some(params.namespace.clone()),
// Specifying a resource version of "0" means that we will
// only create the Lease if it does not already exist.
resource_version: Some("0".to_string()),
Expand All @@ -34,7 +44,10 @@ pub async fn init<T>(
"linkerd.io/control-plane-component".to_string(),
"destination".to_string(),
),
("linkerd.io/control-plane-ns".to_string(), ns.to_string()),
(
"linkerd.io/control-plane-ns".to_string(),
params.namespace.clone(),
),
]
.into_iter()
.collect(),
Expand All @@ -43,12 +56,11 @@ pub async fn init<T>(
},
spec: None,
};
let api = k8s::Api::<coordv1::Lease>::namespaced(runtime.client(), ns);
match api
match k8s::Api::<coordv1::Lease>::namespaced(runtime.client(), &params.namespace)
.patch(
LEASE_NAME,
&PatchParams {
field_manager: Some("policy-controller".to_string()),
field_manager: params.field_manager.clone().map(Into::into),
..Default::default()
},
&kube::api::Patch::Apply(lease),
Expand All @@ -62,16 +74,6 @@ pub async fn init<T>(
}
};

// Create the lease manager used for trying to claim the policy
// controller write lease.
// todo: Do we need to use LeaseManager::field_manager here?
let params = kubert::lease::ClaimParams {
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
};
let (claims, _task) = kubert::lease::LeaseManager::init(api, LEASE_NAME)
.await?
.spawn(hostname, params)
.await?;
Ok(claims)
let (claim, _task) = runtime.spawn_lease(params).await?;
Ok(claim)
}
4 changes: 4 additions & 0 deletions policy-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ publish = false

[dependencies]
anyhow = "1"
bytes = "1"
http-body-util = "0.1"
hyper = { workspace = true, features = ["client", "http2"] }
hyper-util = { version = "0.1" }
futures = { version = "0.3", default-features = false }
ipnet = "2"
k8s-gateway-api = { workspace = true }
Expand All @@ -22,6 +25,7 @@ serde_json = "1"
schemars = "0.8"
tonic = { workspace = true }
tokio = { version = "1", features = ["macros", "rt"] }
tower = { workspace = true }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand Down
21 changes: 12 additions & 9 deletions policy-test/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! forwarding to connect to a running instance.

use anyhow::Result;
use futures::{future, prelude::*};
pub use linkerd2_proxy_api::*;
use linkerd2_proxy_api::{
inbound::inbound_server_policies_client::InboundServerPoliciesClient,
Expand Down Expand Up @@ -105,7 +106,7 @@ pub struct OutboundPolicyClient {

#[derive(Debug)]
struct GrpcHttp {
tx: hyper::client::conn::SendRequest<tonic::body::BoxBody>,
tx: hyper::client::conn::http2::SendRequest<tonic::body::BoxBody>,
}

async fn get_policy_controller_pod(client: &kube::Client) -> Result<String> {
Expand Down Expand Up @@ -322,19 +323,19 @@ impl GrpcHttp {
where
I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static,
{
let (tx, conn) = hyper::client::conn::Builder::new()
.http2_only(true)
.handshake(io)
.await?;
let (tx, conn) =
hyper::client::conn::http2::Builder::new(hyper_util::rt::TokioExecutor::new())
.handshake(hyper_util::rt::TokioIo::new(io))
.await?;
tokio::spawn(conn);
Ok(Self { tx })
}
}

impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
type Response = hyper::Response<hyper::Body>;
impl tower::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
type Response = hyper::Response<hyper::body::Incoming>;
type Error = hyper::Error;
type Future = hyper::client::conn::ResponseFuture;
type Future = future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
Expand All @@ -355,7 +356,9 @@ impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp
);
parts.uri = hyper::Uri::from_parts(uri).unwrap();

self.tx.call(hyper::Request::from_parts(parts, body))
self.tx
.send_request(hyper::Request::from_parts(parts, body))
.boxed()
}
}

Expand Down
16 changes: 6 additions & 10 deletions policy-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub async fn create_ready_pod(client: &kube::Client, pod: k8s::Pod) -> k8s::Pod
ip = %pod
.status.as_ref().expect("pod must have a status")
.pod_ips.as_ref().unwrap()[0]
.ip.as_deref().expect("pod ip must be set"),
.ip,
containers = ?pod
.spec.as_ref().expect("pod must have a spec")
.containers.iter().map(|c| &*c.name).collect::<Vec<_>>(),
Expand Down Expand Up @@ -769,15 +769,11 @@ pub async fn await_service_account(client: &kube::Client, ns: &str, name: &str)
.expect("serviceaccounts watch must not fail");
tracing::info!(?ev);
match ev {
kube::runtime::watcher::Event::Restarted(sas) => {
if sas.iter().any(|sa| sa.name_unchecked() == name) {
return;
}
}
kube::runtime::watcher::Event::Applied(sa) => {
if sa.name_unchecked() == name {
return;
}
kube::runtime::watcher::Event::InitApply(sa)
| kube::runtime::watcher::Event::Apply(sa)
if sa.name_unchecked() == name =>
{
return
}
_ => {}
}
Expand Down
Loading