Skip to content

Commit 177799f

Browse files
committed
feat: add max request id handler
1 parent fb84484 commit 177799f

File tree

3 files changed

+51
-1
lines changed

3 files changed

+51
-1
lines changed

lib/transport/connection.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,19 @@ export class Connection {
105105
async #recv(msg: Control.MessageWithType) {
106106
if (msg.type === Control.ControlMessageType.GoAway) {
107107
await this.#handleGoAway(msg.message)
108+
} else if (msg.type === Control.ControlMessageType.MaxRequestId) {
109+
await this.#handleMaxRequestId(msg.message)
108110
} else if (Control.isPublisher(msg.type)) {
109111
await this.#subscriber.recv(msg)
110112
} else {
111113
await this.#publisher.recv(msg)
112114
}
113115
}
114116

117+
async #handleMaxRequestId(msg: Control.MaxRequestId) {
118+
this.#controlStream.setRemoteMaxRequestId(msg.id)
119+
}
120+
115121
async #handleGoAway(msg: Control.GoAway) {
116122
console.log("preparing for migration, got go_away message:", msg)
117123
if (this.#migrationState === "in_progress") {

lib/transport/control/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { FetchCancel } from "./fetch_cancel"
2121
import { GoAway } from "./go_away"
2222
import { ClientSetup } from "./client_setup"
2323
import { ServerSetup } from "./server_setup"
24+
import { MaxRequestId } from "./max_request_id"
2425

2526

2627
enum Version {
@@ -60,6 +61,7 @@ type MessageWithType =
6061
| { type: ControlMessageType.SubscribeNamespaceError; message: SubscribeNamespaceError }
6162
| { type: ControlMessageType.Unsubscribe; message: Unsubscribe }
6263
| { type: ControlMessageType.GoAway; message: GoAway }
64+
| { type: ControlMessageType.MaxRequestId; message: MaxRequestId }
6365

6466
type Message = Subscriber | Publisher
6567

@@ -185,6 +187,7 @@ export {
185187
FetchError,
186188
FetchCancel,
187189
GoAway,
190+
MaxRequestId,
188191
ClientSetup,
189192
ServerSetup,
190193

lib/transport/stream.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@ import {
99
Subscribe, SubscribeOk, SubscribeError,
1010
SubscribeUpdate, SubscribeNamespace,
1111
SubscribeNamespaceOk, SubscribeNamespaceError,
12-
GoAway,
12+
GoAway, MaxRequestId,
1313
} from "./control"
1414
import { debug } from "./utils"
1515
import { ImmutableBytesBuffer, ReadableWritableStreamBuffer, Reader, Writer } from "./buffer"
1616

1717
export class ControlStream {
1818
private decoder: Decoder
1919
private encoder: Encoder
20+
// Our next request ID to use when sending requests
2021
#nextRequestId = 0n
22+
// Remote's maximum request ID (first invalid ID). Requests we send must be < this value.
23+
#remoteMaxRequestId?: bigint
2124

2225
#mutex = Promise.resolve()
2326

@@ -61,11 +64,41 @@ export class ControlStream {
6164
return lock
6265
}
6366

67+
/**
68+
* Returns the next request ID or throws if max request ID is reached.
69+
* Per spec: "If a Request ID equal to or larger than this [MAX_REQUEST_ID] is received
70+
* by the endpoint that sent the MAX_REQUEST_ID in any request message, the endpoint
71+
* MUST close the session with an error of TOO_MANY_REQUESTS."
72+
* @param incr number at which the next request ID should be incremented (default 2 for client)
73+
* @returns next request ID
74+
*/
6475
nextRequestId(incr: bigint = 2n): bigint {
6576
const id = this.#nextRequestId
77+
78+
// Check if we're about to exceed the remote's max request ID
79+
if (this.#remoteMaxRequestId !== undefined && id >= this.#remoteMaxRequestId) {
80+
throw new Error(`TOO_MANY_REQUESTS: Request ID ${id} >= remote max ${this.#remoteMaxRequestId}`)
81+
}
82+
6683
this.#nextRequestId += incr
6784
return id
6885
}
86+
87+
/**
88+
* Sets the remote's maximum request ID. Per spec:
89+
* "The Maximum Request ID MUST only increase within a session, and receipt of a
90+
* MAX_REQUEST_ID message with an equal or smaller Request ID value is a PROTOCOL_VIOLATION."
91+
*
92+
* Note: The id parameter is the "Maximum Request ID + 1" (first invalid ID).
93+
* @param id The new maximum request ID for the session plus 1
94+
*/
95+
setRemoteMaxRequestId(id: bigint) {
96+
// Validate that MAX_REQUEST_ID only increases
97+
if (this.#remoteMaxRequestId !== undefined && id <= this.#remoteMaxRequestId) {
98+
throw new Error(`PROTOCOL_VIOLATION: MAX_REQUEST_ID must only increase (received ${id}, current ${this.#remoteMaxRequestId})`)
99+
}
100+
this.#remoteMaxRequestId = id
101+
}
69102
}
70103

71104
export class Decoder {
@@ -221,6 +254,12 @@ export class Decoder {
221254
message: SubscribeNamespaceError.deserialize(payload),
222255
}
223256
break
257+
case ControlMessageType.MaxRequestId:
258+
res = {
259+
type: t,
260+
message: MaxRequestId.deserialize(payload),
261+
}
262+
break
224263
default:
225264
throw new Error(`unknown message kind: ${t}`)
226265
}
@@ -280,6 +319,8 @@ export class Encoder {
280319
return FetchOk.serialize(message as FetchOk)
281320
case ControlMessageType.FetchError:
282321
return FetchError.serialize(message as FetchError)
322+
case ControlMessageType.MaxRequestId:
323+
return MaxRequestId.serialize(message as MaxRequestId)
283324
default:
284325
throw new Error(`unknown message kind in encoder`)
285326
}

0 commit comments

Comments
 (0)