From bca28aad68c576b548488f3a37bd1e30a66c5a66 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Thu, 19 Dec 2024 14:35:45 -0800 Subject: [PATCH 1/4] add nostrdb_net, split into crates This is just enostr from notedeck! Fixes: https://github.com/damus-io/nostrdb-rs/issues/29 --- .gitignore | 1 + .gitmodules | 2 +- Cargo.toml | 47 ++-- nostrdb_net/Cargo.toml | 19 ++ nostrdb_net/src/client/message.rs | 59 +++++ nostrdb_net/src/client/mod.rs | 3 + nostrdb_net/src/error.rs | 55 ++++ nostrdb_net/src/filter.rs | 1 + nostrdb_net/src/keypair.rs | 139 +++++++++++ nostrdb_net/src/lib.rs | 23 ++ nostrdb_net/src/note.rs | 135 ++++++++++ nostrdb_net/src/profile.rs | 38 +++ nostrdb_net/src/pubkey.rs | 124 +++++++++ nostrdb_net/src/relay/.mod.rs.swp | Bin 0 -> 12288 bytes nostrdb_net/src/relay/message.rs | 290 ++++++++++++++++++++++ nostrdb_net/src/relay/mod.rs | 100 ++++++++ nostrdb_net/src/relay/pool.rs | 254 +++++++++++++++++++ nostrdb_rs/Cargo.toml | 31 +++ build.rs => nostrdb_rs/build.rs | 0 nostrdb => nostrdb_rs/nostrdb | 0 {src => nostrdb_rs/src}/bindings.rs | 0 {src => nostrdb_rs/src}/bindings_posix.rs | 0 {src => nostrdb_rs/src}/bindings_win.rs | 0 {src => nostrdb_rs/src}/block.rs | 0 {src => nostrdb_rs/src}/config.rs | 0 {src => nostrdb_rs/src}/error.rs | 0 {src => nostrdb_rs/src}/filter.rs | 0 {src => nostrdb_rs/src}/future.rs | 0 {src => nostrdb_rs/src}/lib.rs | 0 {src => nostrdb_rs/src}/ndb.rs | 0 {src => nostrdb_rs/src}/ndb_profile.rs | 0 {src => nostrdb_rs/src}/ndb_str.rs | 0 {src => nostrdb_rs/src}/note.rs | 0 {src => nostrdb_rs/src}/profile.rs | 0 {src => nostrdb_rs/src}/query.rs | 0 {src => nostrdb_rs/src}/result.rs | 0 {src => nostrdb_rs/src}/subscription.rs | 0 {src => nostrdb_rs/src}/tags.rs | 0 {src => nostrdb_rs/src}/test_util.rs | 0 {src => nostrdb_rs/src}/transaction.rs | 0 {src => nostrdb_rs/src}/util/mod.rs | 0 {src => nostrdb_rs/src}/util/nip10.rs | 0 42 files changed, 1295 insertions(+), 26 deletions(-) create mode 100644 nostrdb_net/Cargo.toml create mode 100644 nostrdb_net/src/client/message.rs create mode 100644 nostrdb_net/src/client/mod.rs create mode 100644 nostrdb_net/src/error.rs create mode 100644 nostrdb_net/src/filter.rs create mode 100644 nostrdb_net/src/keypair.rs create mode 100644 nostrdb_net/src/lib.rs create mode 100644 nostrdb_net/src/note.rs create mode 100644 nostrdb_net/src/profile.rs create mode 100644 nostrdb_net/src/pubkey.rs create mode 100644 nostrdb_net/src/relay/.mod.rs.swp create mode 100644 nostrdb_net/src/relay/message.rs create mode 100644 nostrdb_net/src/relay/mod.rs create mode 100644 nostrdb_net/src/relay/pool.rs create mode 100644 nostrdb_rs/Cargo.toml rename build.rs => nostrdb_rs/build.rs (100%) rename nostrdb => nostrdb_rs/nostrdb (100%) rename {src => nostrdb_rs/src}/bindings.rs (100%) rename {src => nostrdb_rs/src}/bindings_posix.rs (100%) rename {src => nostrdb_rs/src}/bindings_win.rs (100%) rename {src => nostrdb_rs/src}/block.rs (100%) rename {src => nostrdb_rs/src}/config.rs (100%) rename {src => nostrdb_rs/src}/error.rs (100%) rename {src => nostrdb_rs/src}/filter.rs (100%) rename {src => nostrdb_rs/src}/future.rs (100%) rename {src => nostrdb_rs/src}/lib.rs (100%) rename {src => nostrdb_rs/src}/ndb.rs (100%) rename {src => nostrdb_rs/src}/ndb_profile.rs (100%) rename {src => nostrdb_rs/src}/ndb_str.rs (100%) rename {src => nostrdb_rs/src}/note.rs (100%) rename {src => nostrdb_rs/src}/profile.rs (100%) rename {src => nostrdb_rs/src}/query.rs (100%) rename {src => nostrdb_rs/src}/result.rs (100%) rename {src => nostrdb_rs/src}/subscription.rs (100%) rename {src => nostrdb_rs/src}/tags.rs (100%) rename {src => nostrdb_rs/src}/test_util.rs (100%) rename {src => nostrdb_rs/src}/transaction.rs (100%) rename {src => nostrdb_rs/src}/util/mod.rs (100%) rename {src => nostrdb_rs/src}/util/nip10.rs (100%) diff --git a/.gitignore b/.gitignore index aaf9402..f171580 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ tags target/ .build-result .buildcmd +build.log diff --git a/.gitmodules b/.gitmodules index 4d0892c..6628fbf 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "nostrdb"] - path = nostrdb + path = nostrdb_rs/nostrdb url = https://github.com/damus-io/nostrdb diff --git a/Cargo.toml b/Cargo.toml index b4c01d0..847311b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,30 +1,27 @@ -[package] -name = "nostrdb" -authors = ["William Casarin "] -description = "An unfairly fast embedded nostr database backed by lmdb" -readme = "README.md" -version = "0.5.1" -edition = "2021" -build = "build.rs" -license = "GPL-3.0-or-later" -homepage = "https://github.com/damus-io/nostrdb-rs/" -repository = "https://github.com/damus-io/nostrdb-rs/" +[workspace] +resolver = "2" +members = [ + "nostrdb_rs", + "nostrdb_net", +] -[build-dependencies] -cc = "1.0" -bindgen = "0.69.1" - -[features] -bindgen = [] - -[dependencies] -flatbuffers = "23.5.26" +[workspace.dependencies] +bech32 = { version = "0.11", default-features = false } +dirs = "5.0.1" +hex = "0.4.3" libc = "0.2.151" +nostrdb_net = { path = "nostrdb_net" } +nostrdb = { path = "nostrdb_rs" } +nostr = { version = "0.37.0", default-features = false, features = ["std", "nip49"] } +serde_derive = "1" +serde_json = "1.0.89" +serde = { version = "1", features = ["derive"] } +tempfile = "3.13.0" thiserror = "2.0.7" -futures = "0.3.31" -tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1.16", features = ["macros", "rt-multi-thread"] } +#tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" - -[dev-dependencies] -hex = "0.4.3" +url = "2.5.2" +urlencoding = "2.1.3" +uuid = { version = "1.10.0", features = ["v4"] } diff --git a/nostrdb_net/Cargo.toml b/nostrdb_net/Cargo.toml new file mode 100644 index 0000000..d30ac0b --- /dev/null +++ b/nostrdb_net/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "nostrdb_net" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ewebsock = { version = "0.8.0", features = ["tls"] } +serde_derive = "1" +serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence +serde_json = { workspace = true } +nostr = { workspace = true } +bech32 = { workspace = true } +nostrdb = { workspace = true } +hex = { workspace = true } +tracing = { workspace = true } +thiserror = { workspace = true } +url = { workspace = true } diff --git a/nostrdb_net/src/client/message.rs b/nostrdb_net/src/client/message.rs new file mode 100644 index 0000000..e912cdf --- /dev/null +++ b/nostrdb_net/src/client/message.rs @@ -0,0 +1,59 @@ +use crate::{Error, Note}; +use nostrdb::Filter; +use serde_json::json; + +/// Messages sent by clients, received by relays +#[derive(Debug)] +pub enum ClientMessage { + Event { + note: Note, + }, + Req { + sub_id: String, + filters: Vec, + }, + Close { + sub_id: String, + }, + Raw(String), +} + +impl ClientMessage { + pub fn event(note: Note) -> Self { + ClientMessage::Event { note } + } + + pub fn raw(raw: String) -> Self { + ClientMessage::Raw(raw) + } + + pub fn req(sub_id: String, filters: Vec) -> Self { + ClientMessage::Req { sub_id, filters } + } + + pub fn close(sub_id: String) -> Self { + ClientMessage::Close { sub_id } + } + + pub fn to_json(&self) -> Result { + Ok(match self { + Self::Event { note } => json!(["EVENT", note]).to_string(), + Self::Raw(raw) => raw.clone(), + Self::Req { sub_id, filters } => { + if filters.is_empty() { + format!("[\"REQ\",\"{}\",{{ }}]", sub_id) + } else if filters.len() == 1 { + let filters_json_str = filters[0].json()?; + format!("[\"REQ\",\"{}\",{}]", sub_id, filters_json_str) + } else { + let filters_json_str: Result, Error> = filters + .iter() + .map(|f| f.json().map_err(Into::::into)) + .collect(); + format!("[\"REQ\",\"{}\",{}]", sub_id, filters_json_str?.join(",")) + } + } + Self::Close { sub_id } => json!(["CLOSE", sub_id]).to_string(), + }) + } +} diff --git a/nostrdb_net/src/client/mod.rs b/nostrdb_net/src/client/mod.rs new file mode 100644 index 0000000..0ab326f --- /dev/null +++ b/nostrdb_net/src/client/mod.rs @@ -0,0 +1,3 @@ +mod message; + +pub use message::ClientMessage; diff --git a/nostrdb_net/src/error.rs b/nostrdb_net/src/error.rs new file mode 100644 index 0000000..e8e8a5a --- /dev/null +++ b/nostrdb_net/src/error.rs @@ -0,0 +1,55 @@ +//use nostr::prelude::secp256k1; +use std::array::TryFromSliceError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("message is empty")] + Empty, + + #[error("decoding failed")] + DecodeFailed, + + #[error("hex decoding failed")] + HexDecodeFailed, + + #[error("invalid bech32")] + InvalidBech32, + + #[error("invalid byte size")] + InvalidByteSize, + + #[error("invalid signature")] + InvalidSignature, + + #[error("invalid public key")] + InvalidPublicKey, + + // Secp(secp256k1::Error), + #[error("json error: {0}")] + Json(#[from] serde_json::Error), + + #[error("nostrdb error: {0}")] + Nostrdb(#[from] nostrdb::Error), + + #[error("{0}")] + Generic(String), +} + +impl From for Error { + fn from(s: String) -> Self { + Error::Generic(s) + } +} + +impl From for Error { + fn from(_e: TryFromSliceError) -> Self { + Error::InvalidByteSize + } +} + +impl From for Error { + fn from(_e: hex::FromHexError) -> Self { + Error::HexDecodeFailed + } +} diff --git a/nostrdb_net/src/filter.rs b/nostrdb_net/src/filter.rs new file mode 100644 index 0000000..7555b27 --- /dev/null +++ b/nostrdb_net/src/filter.rs @@ -0,0 +1 @@ +pub type Filter = nostrdb::Filter; diff --git a/nostrdb_net/src/keypair.rs b/nostrdb_net/src/keypair.rs new file mode 100644 index 0000000..94ba1dd --- /dev/null +++ b/nostrdb_net/src/keypair.rs @@ -0,0 +1,139 @@ +use nostr::nips::nip49::EncryptedSecretKey; +use serde::Deserialize; +use serde::Serialize; + +use crate::Pubkey; +use crate::SecretKey; + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct Keypair { + pub pubkey: Pubkey, + pub secret_key: Option, +} + +impl Keypair { + pub fn from_secret(secret_key: SecretKey) -> Self { + let cloned_secret_key = secret_key.clone(); + let nostr_keys = nostr::Keys::new(secret_key); + Keypair { + pubkey: Pubkey::new(nostr_keys.public_key().to_bytes()), + secret_key: Some(cloned_secret_key), + } + } + + pub fn new(pubkey: Pubkey, secret_key: Option) -> Self { + Keypair { pubkey, secret_key } + } + + pub fn only_pubkey(pubkey: Pubkey) -> Self { + Keypair { + pubkey, + secret_key: None, + } + } + + pub fn to_full(&self) -> Option> { + self.secret_key.as_ref().map(|secret_key| FilledKeypair { + pubkey: &self.pubkey, + secret_key, + }) + } +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct FullKeypair { + pub pubkey: Pubkey, + pub secret_key: SecretKey, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub struct FilledKeypair<'a> { + pub pubkey: &'a Pubkey, + pub secret_key: &'a SecretKey, +} + +impl<'a> FilledKeypair<'a> { + pub fn new(pubkey: &'a Pubkey, secret_key: &'a SecretKey) -> Self { + FilledKeypair { pubkey, secret_key } + } + + pub fn to_full(&self) -> FullKeypair { + FullKeypair { + pubkey: self.pubkey.to_owned(), + secret_key: self.secret_key.to_owned(), + } + } +} + +impl FullKeypair { + pub fn new(pubkey: Pubkey, secret_key: SecretKey) -> Self { + FullKeypair { pubkey, secret_key } + } + + pub fn to_filled(&self) -> FilledKeypair<'_> { + FilledKeypair::new(&self.pubkey, &self.secret_key) + } + + pub fn generate() -> Self { + let mut rng = nostr::secp256k1::rand::rngs::OsRng; + let (secret_key, _) = &nostr::SECP256K1.generate_keypair(&mut rng); + let (xopk, _) = secret_key.x_only_public_key(&nostr::SECP256K1); + let secret_key = nostr::SecretKey::from(*secret_key); + FullKeypair { + pubkey: Pubkey::new(xopk.serialize()), + secret_key, + } + } + + pub fn to_keypair(self) -> Keypair { + Keypair { + pubkey: self.pubkey, + secret_key: Some(self.secret_key), + } + } +} + +impl std::fmt::Display for Keypair { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Keypair:\n\tpublic: {}\n\tsecret: {}", + self.pubkey, + match self.secret_key { + Some(_) => "Some()", + None => "None", + } + ) + } +} + +impl std::fmt::Display for FullKeypair { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Keypair:\n\tpublic: {}\n\tsecret: ", self.pubkey) + } +} + +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct SerializableKeypair { + pub pubkey: Pubkey, + pub encrypted_secret_key: Option, +} + +impl SerializableKeypair { + pub fn from_keypair(kp: &Keypair, pass: &str, log_n: u8) -> Self { + Self { + pubkey: kp.pubkey, + encrypted_secret_key: kp.secret_key.clone().and_then(|s| { + EncryptedSecretKey::new(&s, pass, log_n, nostr::nips::nip49::KeySecurity::Weak).ok() + }), + } + } + + pub fn to_keypair(&self, pass: &str) -> Keypair { + Keypair::new( + self.pubkey, + self.encrypted_secret_key + .and_then(|e| e.to_secret_key(pass).ok()), + ) + } +} diff --git a/nostrdb_net/src/lib.rs b/nostrdb_net/src/lib.rs new file mode 100644 index 0000000..e33e563 --- /dev/null +++ b/nostrdb_net/src/lib.rs @@ -0,0 +1,23 @@ +mod client; +mod error; +mod filter; +mod keypair; +mod note; +mod profile; +mod pubkey; +mod relay; + +pub use client::ClientMessage; +pub use error::Error; +pub use ewebsock; +pub use filter::Filter; +pub use keypair::{FilledKeypair, FullKeypair, Keypair, SerializableKeypair}; +pub use nostr::SecretKey; +pub use note::{Note, NoteId}; +pub use profile::Profile; +pub use pubkey::Pubkey; +pub use relay::message::{RelayEvent, RelayMessage}; +pub use relay::pool::{PoolEvent, RelayPool}; +pub use relay::{Relay, RelayStatus}; + +pub type Result = std::result::Result; diff --git a/nostrdb_net/src/note.rs b/nostrdb_net/src/note.rs new file mode 100644 index 0000000..389f083 --- /dev/null +++ b/nostrdb_net/src/note.rs @@ -0,0 +1,135 @@ +use crate::{Error, Pubkey}; + +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; +use std::hash::{Hash, Hasher}; + +#[derive(Clone, Copy, Eq, PartialEq, Hash)] +pub struct NoteId([u8; 32]); + +impl fmt::Debug for NoteId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.hex()) + } +} + +static HRP_NOTE: bech32::Hrp = bech32::Hrp::parse_unchecked("note"); + +impl NoteId { + pub fn new(bytes: [u8; 32]) -> Self { + NoteId(bytes) + } + + pub fn bytes(&self) -> &[u8; 32] { + &self.0 + } + + pub fn hex(&self) -> String { + hex::encode(self.bytes()) + } + + pub fn from_hex(hex_str: &str) -> Result { + let evid = NoteId(hex::decode(hex_str)?.as_slice().try_into().unwrap()); + Ok(evid) + } + + pub fn to_bech(&self) -> Option { + bech32::encode::(HRP_NOTE, &self.0).ok() + } +} + +/// Event is the struct used to represent a Nostr event +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Note { + /// 32-bytes sha256 of the the serialized event data + pub id: NoteId, + /// 32-bytes hex-encoded public key of the event creator + pub pubkey: Pubkey, + /// unix timestamp in seconds + pub created_at: u64, + /// integer + /// 0: NostrEvent + pub kind: u64, + /// Tags + pub tags: Vec>, + /// arbitrary string + pub content: String, + /// 64-bytes signature of the sha256 hash of the serialized event data, which is the same as the "id" field + pub sig: String, +} + +// Implement Hash trait +impl Hash for Note { + fn hash(&self, state: &mut H) { + self.id.0.hash(state); + } +} + +impl PartialEq for Note { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for Note {} + +impl Note { + pub fn from_json(s: &str) -> Result { + serde_json::from_str(s).map_err(Into::into) + } + + pub fn verify(&self) -> Result { + Err(Error::InvalidSignature) + } + + /// This is just for serde sanity checking + #[allow(dead_code)] + pub(crate) fn new_dummy( + id: &str, + pubkey: &str, + created_at: u64, + kind: u64, + tags: Vec>, + content: &str, + sig: &str, + ) -> Result { + Ok(Note { + id: NoteId::from_hex(id)?, + pubkey: Pubkey::from_hex(pubkey)?, + created_at, + kind, + tags, + content: content.to_string(), + sig: sig.to_string(), + }) + } +} + +impl std::str::FromStr for Note { + type Err = Error; + + fn from_str(s: &str) -> Result { + Note::from_json(s) + } +} + +// Custom serialize function for Pubkey +impl Serialize for NoteId { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.hex()) + } +} + +// Custom deserialize function for Pubkey +impl<'de> Deserialize<'de> for NoteId { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + NoteId::from_hex(&s).map_err(serde::de::Error::custom) + } +} diff --git a/nostrdb_net/src/profile.rs b/nostrdb_net/src/profile.rs new file mode 100644 index 0000000..98144ef --- /dev/null +++ b/nostrdb_net/src/profile.rs @@ -0,0 +1,38 @@ +use serde_json::Value; + +#[derive(Debug, Clone)] +pub struct Profile(Value); + +impl Profile { + pub fn new(value: Value) -> Profile { + Profile(value) + } + + pub fn name(&self) -> Option<&str> { + self.0["name"].as_str() + } + + pub fn display_name(&self) -> Option<&str> { + self.0["display_name"].as_str() + } + + pub fn lud06(&self) -> Option<&str> { + self.0["lud06"].as_str() + } + + pub fn lud16(&self) -> Option<&str> { + self.0["lud16"].as_str() + } + + pub fn about(&self) -> Option<&str> { + self.0["about"].as_str() + } + + pub fn picture(&self) -> Option<&str> { + self.0["picture"].as_str() + } + + pub fn website(&self) -> Option<&str> { + self.0["website"].as_str() + } +} diff --git a/nostrdb_net/src/pubkey.rs b/nostrdb_net/src/pubkey.rs new file mode 100644 index 0000000..5dfb62c --- /dev/null +++ b/nostrdb_net/src/pubkey.rs @@ -0,0 +1,124 @@ +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::Error; +use std::fmt; +use std::ops::Deref; +use tracing::debug; + +#[derive(Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)] +pub struct Pubkey([u8; 32]); + +static HRP_NPUB: bech32::Hrp = bech32::Hrp::parse_unchecked("npub"); + +impl Deref for Pubkey { + type Target = [u8; 32]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Pubkey { + pub fn new(data: [u8; 32]) -> Self { + Self(data) + } + + pub fn hex(&self) -> String { + hex::encode(self.bytes()) + } + + pub fn bytes(&self) -> &[u8; 32] { + &self.0 + } + + pub fn parse(s: &str) -> Result { + match Pubkey::from_hex(s) { + Ok(pk) => Ok(pk), + Err(_) => Pubkey::try_from_bech32_string(s, false), + } + } + + pub fn from_hex(hex_str: &str) -> Result { + Ok(Pubkey(hex::decode(hex_str)?.as_slice().try_into()?)) + } + + pub fn try_from_hex_str_with_verify(hex_str: &str) -> Result { + let vec: Vec = hex::decode(hex_str)?; + if vec.len() != 32 { + Err(Error::HexDecodeFailed) + } else { + let _ = match nostr::secp256k1::XOnlyPublicKey::from_slice(&vec) { + Ok(r) => Ok(r), + Err(_) => Err(Error::InvalidPublicKey), + }?; + + Ok(Pubkey(vec.try_into().unwrap())) + } + } + + pub fn try_from_bech32_string(s: &str, verify: bool) -> Result { + let data = match bech32::decode(s) { + Ok(res) => Ok(res), + Err(_) => Err(Error::InvalidBech32), + }?; + + if data.0 != HRP_NPUB { + Err(Error::InvalidBech32) + } else if data.1.len() != 32 { + Err(Error::InvalidByteSize) + } else { + if verify { + let _ = match nostr::secp256k1::XOnlyPublicKey::from_slice(&data.1) { + Ok(r) => Ok(r), + Err(_) => Err(Error::InvalidPublicKey), + }?; + } + Ok(Pubkey(data.1.try_into().unwrap())) + } + } + + pub fn to_bech(&self) -> Option { + bech32::encode::(HRP_NPUB, &self.0).ok() + } +} + +impl fmt::Display for Pubkey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.hex()) + } +} + +impl fmt::Debug for Pubkey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.hex()) + } +} + +impl From for String { + fn from(pk: Pubkey) -> Self { + pk.hex() + } +} + +// Custom serialize function for Pubkey +impl Serialize for Pubkey { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.hex()) + } +} + +// Custom deserialize function for Pubkey +impl<'de> Deserialize<'de> for Pubkey { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + debug!("decoding pubkey start"); + let s = String::deserialize(deserializer)?; + debug!("decoding pubkey {}", &s); + Pubkey::from_hex(&s).map_err(serde::de::Error::custom) + } +} diff --git a/nostrdb_net/src/relay/.mod.rs.swp b/nostrdb_net/src/relay/.mod.rs.swp new file mode 100644 index 0000000000000000000000000000000000000000..503cabaa2112dcf7486803e9e3f8c52d261b2f23 GIT binary patch literal 12288 zcmeI2ONw?7D~{FwFGS>^3{oJycb* zz+~`b;=#KY6ZD`b4|>)3h$tt;#K+wSM{j!a-(yD zLpOGxc=;ILSJ}gG?P6@|Y;$?*nLpX1Kki`X8hiIr^=8-kG`b+yI0c*nx1+#L_QLafc2m!Z(l~$Q z;icPo$n|jwI0c*nP64NYQ@|Zn2) zb#;`)Tqj)8_hr1BB<+Crg1LUlXZeFs+)BnhQ|zV<-iq^w8^W9Hq}tf-+o=P-s3j&D z*c$K?BAh;A44*CY9?k_^8~81xb-$yQhkR-a%~8W=_-ksQ$UG+SE{Jga)ak5iW>=RQ z1}1GV+woP;E0>K4WQGx4icrwkdYB{sS;#wB7Sm1&3c0>j*LAA!^a4q^Xi7nFFo|Ok zY8X^>rzypnXs9Gy4T8}2sI5hMrM6aI6=~O-H1nmh?epViX;ct2MXR2+wO97J?Ny#D zh9rGFYnDGSzszSC&6TN=O;iYz4;1jI({1x3u~+8L@}{-{zNpu&A8JO|?^SV8d*@UVqoXmDjO`arnc|>7#*tM9$`Bnn7VE1>#W(I3og$J1oP0O{4gzw4ykIVWTmL zTG4JIrA%xhk?f{uiyL=^fP;ScU36+yOoG<%6?GbFuVhxGWZR*3tD|#2TA2^13W6hv zME>EZot~=U(83mIs_j0Q`)H~TQT}h_DTP~}NdXS8h3u5E2}7+mqo{4s94kXD#`9qK zKq(rBgO{0cjBnZcAqs7z9g0-8?~A5=xHyqU%G6B5h&b&G&j>dsPT{n}(dZd3%cq&? zh;Pu%d&2%fsbgPNu$iYya4qX0;wK1tMtk3G&_1Wv2yF7yMd@Y`P(Yy*g^Hg=)o?T9 zwVlm3@XZER#k@q=O7np{o*sEu@?k%nR2argQ=GNE>@@X@*9iJKSd_sc;L(2p{D>uo literal 0 HcmV?d00001 diff --git a/nostrdb_net/src/relay/message.rs b/nostrdb_net/src/relay/message.rs new file mode 100644 index 0000000..a7dd12e --- /dev/null +++ b/nostrdb_net/src/relay/message.rs @@ -0,0 +1,290 @@ +use crate::{Error, Result}; +use ewebsock::{WsEvent, WsMessage}; + +#[derive(Debug, Eq, PartialEq)] +pub struct CommandResult<'a> { + event_id: &'a str, + status: bool, + message: &'a str, +} + +#[derive(Debug, Eq, PartialEq)] +pub enum RelayMessage<'a> { + OK(CommandResult<'a>), + Eose(&'a str), + Event(&'a str, &'a str), + Notice(&'a str), +} + +#[derive(Debug)] +pub enum RelayEvent<'a> { + Opened, + Closed, + Other(&'a WsMessage), + Error(Error), + Message(RelayMessage<'a>), +} + +impl<'a> From<&'a WsEvent> for RelayEvent<'a> { + fn from(event: &'a WsEvent) -> RelayEvent<'a> { + match event { + WsEvent::Opened => RelayEvent::Opened, + WsEvent::Closed => RelayEvent::Closed, + WsEvent::Message(ref ws_msg) => ws_msg.into(), + WsEvent::Error(s) => RelayEvent::Error(Error::Generic(s.to_owned())), + } + } +} + +impl<'a> From<&'a WsMessage> for RelayEvent<'a> { + fn from(wsmsg: &'a WsMessage) -> RelayEvent<'a> { + match wsmsg { + WsMessage::Text(s) => match RelayMessage::from_json(s).map(RelayEvent::Message) { + Ok(msg) => msg, + Err(err) => RelayEvent::Error(err), + }, + wsmsg => RelayEvent::Other(wsmsg), + } + } +} + +impl<'a> RelayMessage<'a> { + pub fn eose(subid: &'a str) -> Self { + RelayMessage::Eose(subid) + } + + pub fn notice(msg: &'a str) -> Self { + RelayMessage::Notice(msg) + } + + pub fn ok(event_id: &'a str, status: bool, message: &'a str) -> Self { + RelayMessage::OK(CommandResult { + event_id, + status, + message, + }) + } + + pub fn event(ev: &'a str, sub_id: &'a str) -> Self { + RelayMessage::Event(sub_id, ev) + } + + pub fn from_json(msg: &'a str) -> Result> { + if msg.is_empty() { + return Err(Error::Empty); + } + + // Notice + // Relay response format: ["NOTICE", ] + if msg.len() >= 12 && &msg[0..=9] == "[\"NOTICE\"," { + // TODO: there could be more than one space, whatever + let start = if msg.as_bytes().get(10).copied() == Some(b' ') { + 12 + } else { + 11 + }; + let end = msg.len() - 2; + return Ok(Self::notice(&msg[start..end])); + } + + // Event + // Relay response format: ["EVENT", , ] + if &msg[0..=7] == "[\"EVENT\"" { + let mut start = 9; + while let Some(&b' ') = msg.as_bytes().get(start) { + start += 1; // Move past optional spaces + } + if let Some(comma_index) = msg[start..].find(',') { + let subid_end = start + comma_index; + let subid = &msg[start..subid_end].trim().trim_matches('"'); + return Ok(Self::event(msg, subid)); + } else { + return Ok(Self::event(msg, "fixme")); + } + } + + // EOSE (NIP-15) + // Relay response format: ["EOSE", ] + if &msg[0..=7] == "[\"EOSE\"," { + let start = if msg.as_bytes().get(8).copied() == Some(b' ') { + 10 + } else { + 9 + }; + let end = msg.len() - 2; + return Ok(Self::eose(&msg[start..end])); + } + + // OK (NIP-20) + // Relay response format: ["OK",, , ] + if &msg[0..=5] == "[\"OK\"," && msg.len() >= 78 { + // TODO: fix this + let event_id = &msg[7..71]; + let booly = &msg[73..77]; + let status: bool = if booly == "true" { + true + } else if booly == "false" { + false + } else { + return Err(Error::DecodeFailed); + }; + + return Ok(Self::ok(event_id, status, "fixme")); + } + + Err(Error::DecodeFailed) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_handle_valid_notice() -> Result<()> { + let valid_notice_msg = r#"["NOTICE","Invalid event format!"]"#; + let handled_valid_notice_msg = RelayMessage::notice("Invalid event format!"); + + assert_eq!( + RelayMessage::from_json(valid_notice_msg)?, + handled_valid_notice_msg + ); + + Ok(()) + } + #[test] + fn test_handle_invalid_notice() { + //Missing content + let invalid_notice_msg = r#"["NOTICE"]"#; + //The content is not string + let invalid_notice_msg_content = r#"["NOTICE": 404]"#; + + assert!(matches!( + RelayMessage::from_json(invalid_notice_msg).unwrap_err(), + Error::DecodeFailed + )); + assert!(matches!( + RelayMessage::from_json(invalid_notice_msg_content).unwrap_err(), + Error::DecodeFailed + )); + } + + /* + #[test] + fn test_handle_valid_event() -> Result<()> { + use tracing::debug; + + let valid_event_msg = r#"["EVENT", "random_string", {"id":"70b10f70c1318967eddf12527799411b1a9780ad9c43858f5e5fcd45486a13a5","pubkey":"379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe","created_at":1612809991,"kind":1,"tags":[],"content":"test","sig":"273a9cd5d11455590f4359500bccb7a89428262b96b3ea87a756b770964472f8c3e87f5d5e64d8d2e859a71462a3f477b554565c4f2f326cb01dd7620db71502"}]"#; + + let id = "70b10f70c1318967eddf12527799411b1a9780ad9c43858f5e5fcd45486a13a5"; + let pubkey = "379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"; + let created_at = 1612809991; + let kind = 1; + let tags = vec![]; + let content = "test"; + let sig = "273a9cd5d11455590f4359500bccb7a89428262b96b3ea87a756b770964472f8c3e87f5d5e64d8d2e859a71462a3f477b554565c4f2f326cb01dd7620db71502"; + + let handled_event = Note::new_dummy(id, pubkey, created_at, kind, tags, content, sig).expect("ev"); + debug!("event {:?}", handled_event); + + let msg = RelayMessage::from_json(valid_event_msg).expect("valid json"); + debug!("msg {:?}", msg); + + let note_json = serde_json::to_string(&handled_event).expect("json ev"); + + assert_eq!( + msg, + RelayMessage::event(¬e_json, "random_string") + ); + + Ok(()) + } + + #[test] + fn test_handle_invalid_event() { + //Mising Event field + let invalid_event_msg = r#"["EVENT","random_string"]"#; + //Event JSON with incomplete content + let invalid_event_msg_content = r#"["EVENT","random_string",{"id":"70b10f70c1318967eddf12527799411b1a9780ad9c43858f5e5fcd45486a13a5","pubkey":"379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"}]"#; + + assert!(matches!( + RelayMessage::from_json(invalid_event_msg).unwrap_err(), + Error::DecodeFailed + )); + + assert!(matches!( + RelayMessage::from_json(invalid_event_msg_content).unwrap_err(), + Error::DecodeFailed + )); + } + */ + + #[test] + fn test_handle_valid_eose() -> Result<()> { + let valid_eose_msg = r#"["EOSE","random-subscription-id"]"#; + let handled_valid_eose_msg = RelayMessage::eose("random-subscription-id"); + + assert_eq!( + RelayMessage::from_json(valid_eose_msg)?, + handled_valid_eose_msg + ); + + Ok(()) + } + + // TODO: fix these tests + /* + #[test] + fn test_handle_invalid_eose() { + // Missing subscription ID + assert!(matches!( + RelayMessage::from_json(r#"["EOSE"]"#).unwrap_err(), + Error::DecodeFailed + )); + + // The subscription ID is not string + assert!(matches!( + RelayMessage::from_json(r#"["EOSE",404]"#).unwrap_err(), + Error::DecodeFailed + )); + } + + #[test] + fn test_handle_valid_ok() -> Result<()> { + let valid_ok_msg = r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",true,"pow: difficulty 25>=24"]"#; + let handled_valid_ok_msg = RelayMessage::ok( + "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30", + true, + "pow: difficulty 25>=24".into(), + ); + + assert_eq!(RelayMessage::from_json(valid_ok_msg)?, handled_valid_ok_msg); + + Ok(()) + } + */ + + #[test] + fn test_handle_invalid_ok() { + // Missing params + assert!(matches!( + RelayMessage::from_json( + r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30"]"# + ) + .unwrap_err(), + Error::DecodeFailed + )); + + // Invalid status + assert!( + matches!(RelayMessage::from_json(r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",hello,""]"#).unwrap_err(), + Error::DecodeFailed) + ); + + // Invalid message + assert!( + matches!(RelayMessage::from_json(r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",hello,404]"#).unwrap_err(), + Error::DecodeFailed) + ); + } +} diff --git a/nostrdb_net/src/relay/mod.rs b/nostrdb_net/src/relay/mod.rs new file mode 100644 index 0000000..532252b --- /dev/null +++ b/nostrdb_net/src/relay/mod.rs @@ -0,0 +1,100 @@ +use ewebsock::{Options, WsMessage, WsReceiver, WsSender}; + +use crate::{ClientMessage, Result}; +use nostrdb::Filter; +use std::fmt; +use std::hash::{Hash, Hasher}; +use tracing::{debug, error, info}; + +pub mod message; +pub mod pool; + +#[derive(Debug)] +pub enum RelayStatus { + Connected, + Connecting, + Disconnected, +} + +pub struct Relay { + pub url: String, + pub status: RelayStatus, + pub sender: WsSender, + pub receiver: WsReceiver, +} + +impl fmt::Debug for Relay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Relay") + .field("url", &self.url) + .field("status", &self.status) + .finish() + } +} + +impl Hash for Relay { + fn hash(&self, state: &mut H) { + // Hashes the Relay by hashing the URL + self.url.hash(state); + } +} + +impl PartialEq for Relay { + fn eq(&self, other: &Self) -> bool { + self.url == other.url + } +} + +impl Eq for Relay {} + +impl Relay { + pub fn new(url: String, wakeup: impl Fn() + Send + Sync + 'static) -> Result { + let status = RelayStatus::Connecting; + let (sender, receiver) = ewebsock::connect_with_wakeup(&url, Options::default(), wakeup)?; + + Ok(Self { + url, + sender, + receiver, + status, + }) + } + + pub fn send(&mut self, msg: &ClientMessage) { + let json = match msg.to_json() { + Ok(json) => { + debug!("sending {} to {}", json, self.url); + json + } + Err(e) => { + error!("error serializing json for filter: {e}"); + return; + } + }; + + let txt = WsMessage::Text(json); + self.sender.send(txt); + } + + pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> { + let (sender, receiver) = + ewebsock::connect_with_wakeup(&self.url, Options::default(), wakeup)?; + self.status = RelayStatus::Connecting; + self.sender = sender; + self.receiver = receiver; + Ok(()) + } + + pub fn ping(&mut self) { + let msg = WsMessage::Ping(vec![]); + self.sender.send(msg); + } + + pub fn subscribe(&mut self, subid: String, filters: Vec) { + info!( + "sending '{}' subscription to relay pool: {:?}", + subid, filters + ); + self.send(&ClientMessage::req(subid, filters)); + } +} diff --git a/nostrdb_net/src/relay/pool.rs b/nostrdb_net/src/relay/pool.rs new file mode 100644 index 0000000..4d2fe8f --- /dev/null +++ b/nostrdb_net/src/relay/pool.rs @@ -0,0 +1,254 @@ +use crate::relay::{Relay, RelayStatus}; +use crate::{ClientMessage, Result}; +use nostrdb::Filter; + +use std::collections::BTreeSet; +use std::time::{Duration, Instant}; + +use url::Url; + +#[cfg(not(target_arch = "wasm32"))] +use ewebsock::{WsEvent, WsMessage}; + +#[cfg(not(target_arch = "wasm32"))] +use tracing::{debug, error}; + +#[derive(Debug)] +pub struct PoolEvent<'a> { + pub relay: &'a str, + pub event: ewebsock::WsEvent, +} + +impl PoolEvent<'_> { + pub fn into_owned(self) -> PoolEventBuf { + PoolEventBuf { + relay: self.relay.to_owned(), + event: self.event, + } + } +} + +pub struct PoolEventBuf { + pub relay: String, + pub event: ewebsock::WsEvent, +} + +pub struct PoolRelay { + pub relay: Relay, + pub last_ping: Instant, + pub last_connect_attempt: Instant, + pub retry_connect_after: Duration, +} + +impl PoolRelay { + pub fn new(relay: Relay) -> PoolRelay { + PoolRelay { + relay, + last_ping: Instant::now(), + last_connect_attempt: Instant::now(), + retry_connect_after: Self::initial_reconnect_duration(), + } + } + + pub fn initial_reconnect_duration() -> Duration { + Duration::from_secs(5) + } +} + +pub struct RelayPool { + pub relays: Vec, + pub ping_rate: Duration, +} + +impl Default for RelayPool { + fn default() -> Self { + RelayPool::new() + } +} + +impl RelayPool { + // Constructs a new, empty RelayPool. + pub fn new() -> RelayPool { + RelayPool { + relays: vec![], + ping_rate: Duration::from_secs(25), + } + } + + pub fn ping_rate(&mut self, duration: Duration) -> &mut Self { + self.ping_rate = duration; + self + } + + pub fn has(&self, url: &str) -> bool { + for relay in &self.relays { + if relay.relay.url == url { + return true; + } + } + + false + } + + pub fn urls(&self) -> BTreeSet { + self.relays + .iter() + .map(|pool_relay| pool_relay.relay.url.clone()) + .collect() + } + + pub fn send(&mut self, cmd: &ClientMessage) { + for relay in &mut self.relays { + relay.relay.send(cmd); + } + } + + pub fn unsubscribe(&mut self, subid: String) { + for relay in &mut self.relays { + relay.relay.send(&ClientMessage::close(subid.clone())); + } + } + + pub fn subscribe(&mut self, subid: String, filter: Vec) { + for relay in &mut self.relays { + relay.relay.subscribe(subid.clone(), filter.clone()); + } + } + + /// Keep relay connectiongs alive by pinging relays that haven't been + /// pinged in awhile. Adjust ping rate with [`ping_rate`]. + pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { + for relay in &mut self.relays { + let now = std::time::Instant::now(); + + match relay.relay.status { + RelayStatus::Disconnected => { + let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after; + if now > reconnect_at { + relay.last_connect_attempt = now; + let next_duration = Duration::from_millis( + ((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64, + ); + debug!( + "bumping reconnect duration from {:?} to {:?} and retrying connect", + relay.retry_connect_after, next_duration + ); + relay.retry_connect_after = next_duration; + if let Err(err) = relay.relay.connect(wakeup.clone()) { + error!("error connecting to relay: {}", err); + } + } else { + // let's wait a bit before we try again + } + } + + RelayStatus::Connected => { + relay.retry_connect_after = PoolRelay::initial_reconnect_duration(); + + let should_ping = now - relay.last_ping > self.ping_rate; + if should_ping { + debug!("pinging {}", relay.relay.url); + relay.relay.ping(); + relay.last_ping = Instant::now(); + } + } + + RelayStatus::Connecting => { + // cool story bro + } + } + } + } + + pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { + for relay in &mut self.relays { + let relay = &mut relay.relay; + if relay.url == relay_url { + relay.send(cmd); + return; + } + } + } + + // Adds a websocket url to the RelayPool. + pub fn add_url( + &mut self, + url: String, + wakeup: impl Fn() + Send + Sync + Clone + 'static, + ) -> Result<()> { + let url = Self::canonicalize_url(url); + // Check if the URL already exists in the pool. + if self.has(&url) { + return Ok(()); + } + let relay = Relay::new(url, wakeup)?; + let pool_relay = PoolRelay::new(relay); + + self.relays.push(pool_relay); + + Ok(()) + } + + pub fn add_urls( + &mut self, + urls: BTreeSet, + wakeup: impl Fn() + Send + Sync + Clone + 'static, + ) -> Result<()> { + for url in urls { + self.add_url(url, wakeup.clone())?; + } + Ok(()) + } + + pub fn remove_urls(&mut self, urls: &BTreeSet) { + self.relays + .retain(|pool_relay| !urls.contains(&pool_relay.relay.url)); + } + + // standardize the format (ie, trailing slashes) + fn canonicalize_url(url: String) -> String { + match Url::parse(&url) { + Ok(parsed_url) => parsed_url.to_string(), + Err(_) => url, // If parsing fails, return the original URL. + } + } + + /// Attempts to receive a pool event from a list of relays. The + /// function searches each relay in the list in order, attempting to + /// receive a message from each. If a message is received, return it. + /// If no message is received from any relays, None is returned. + pub fn try_recv(&mut self) -> Option> { + for relay in &mut self.relays { + let relay = &mut relay.relay; + if let Some(event) = relay.receiver.try_recv() { + match &event { + WsEvent::Opened => { + relay.status = RelayStatus::Connected; + } + WsEvent::Closed => { + relay.status = RelayStatus::Disconnected; + } + WsEvent::Error(err) => { + error!("{:?}", err); + relay.status = RelayStatus::Disconnected; + } + WsEvent::Message(ev) => { + // let's just handle pongs here. + // We only need to do this natively. + #[cfg(not(target_arch = "wasm32"))] + if let WsMessage::Ping(ref bs) = ev { + debug!("pong {}", &relay.url); + relay.sender.send(WsMessage::Pong(bs.to_owned())); + } + } + } + return Some(PoolEvent { + event, + relay: &relay.url, + }); + } + } + + None + } +} diff --git a/nostrdb_rs/Cargo.toml b/nostrdb_rs/Cargo.toml new file mode 100644 index 0000000..d4a268e --- /dev/null +++ b/nostrdb_rs/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "nostrdb" +authors = ["William Casarin "] +description = "An unfairly fast embedded nostr database backed by lmdb" +readme = "README.md" +version = "0.5.1" +edition = "2021" +build = "build.rs" +license = "GPL-3.0-or-later" +homepage = "https://github.com/damus-io/nostrdb-rs/" +repository = "https://github.com/damus-io/nostrdb-rs/" + +[build-dependencies] +cc = "1.0" +bindgen = "0.69.1" + +[features] +bindgen = [] + +[dependencies] +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } + +flatbuffers = "23.5.26" +libc = "0.2.151" +futures = "0.3.31" + +[dev-dependencies] +hex = "0.4.3" diff --git a/build.rs b/nostrdb_rs/build.rs similarity index 100% rename from build.rs rename to nostrdb_rs/build.rs diff --git a/nostrdb b/nostrdb_rs/nostrdb similarity index 100% rename from nostrdb rename to nostrdb_rs/nostrdb diff --git a/src/bindings.rs b/nostrdb_rs/src/bindings.rs similarity index 100% rename from src/bindings.rs rename to nostrdb_rs/src/bindings.rs diff --git a/src/bindings_posix.rs b/nostrdb_rs/src/bindings_posix.rs similarity index 100% rename from src/bindings_posix.rs rename to nostrdb_rs/src/bindings_posix.rs diff --git a/src/bindings_win.rs b/nostrdb_rs/src/bindings_win.rs similarity index 100% rename from src/bindings_win.rs rename to nostrdb_rs/src/bindings_win.rs diff --git a/src/block.rs b/nostrdb_rs/src/block.rs similarity index 100% rename from src/block.rs rename to nostrdb_rs/src/block.rs diff --git a/src/config.rs b/nostrdb_rs/src/config.rs similarity index 100% rename from src/config.rs rename to nostrdb_rs/src/config.rs diff --git a/src/error.rs b/nostrdb_rs/src/error.rs similarity index 100% rename from src/error.rs rename to nostrdb_rs/src/error.rs diff --git a/src/filter.rs b/nostrdb_rs/src/filter.rs similarity index 100% rename from src/filter.rs rename to nostrdb_rs/src/filter.rs diff --git a/src/future.rs b/nostrdb_rs/src/future.rs similarity index 100% rename from src/future.rs rename to nostrdb_rs/src/future.rs diff --git a/src/lib.rs b/nostrdb_rs/src/lib.rs similarity index 100% rename from src/lib.rs rename to nostrdb_rs/src/lib.rs diff --git a/src/ndb.rs b/nostrdb_rs/src/ndb.rs similarity index 100% rename from src/ndb.rs rename to nostrdb_rs/src/ndb.rs diff --git a/src/ndb_profile.rs b/nostrdb_rs/src/ndb_profile.rs similarity index 100% rename from src/ndb_profile.rs rename to nostrdb_rs/src/ndb_profile.rs diff --git a/src/ndb_str.rs b/nostrdb_rs/src/ndb_str.rs similarity index 100% rename from src/ndb_str.rs rename to nostrdb_rs/src/ndb_str.rs diff --git a/src/note.rs b/nostrdb_rs/src/note.rs similarity index 100% rename from src/note.rs rename to nostrdb_rs/src/note.rs diff --git a/src/profile.rs b/nostrdb_rs/src/profile.rs similarity index 100% rename from src/profile.rs rename to nostrdb_rs/src/profile.rs diff --git a/src/query.rs b/nostrdb_rs/src/query.rs similarity index 100% rename from src/query.rs rename to nostrdb_rs/src/query.rs diff --git a/src/result.rs b/nostrdb_rs/src/result.rs similarity index 100% rename from src/result.rs rename to nostrdb_rs/src/result.rs diff --git a/src/subscription.rs b/nostrdb_rs/src/subscription.rs similarity index 100% rename from src/subscription.rs rename to nostrdb_rs/src/subscription.rs diff --git a/src/tags.rs b/nostrdb_rs/src/tags.rs similarity index 100% rename from src/tags.rs rename to nostrdb_rs/src/tags.rs diff --git a/src/test_util.rs b/nostrdb_rs/src/test_util.rs similarity index 100% rename from src/test_util.rs rename to nostrdb_rs/src/test_util.rs diff --git a/src/transaction.rs b/nostrdb_rs/src/transaction.rs similarity index 100% rename from src/transaction.rs rename to nostrdb_rs/src/transaction.rs diff --git a/src/util/mod.rs b/nostrdb_rs/src/util/mod.rs similarity index 100% rename from src/util/mod.rs rename to nostrdb_rs/src/util/mod.rs diff --git a/src/util/nip10.rs b/nostrdb_rs/src/util/nip10.rs similarity index 100% rename from src/util/nip10.rs rename to nostrdb_rs/src/util/nip10.rs From a2aa0261f4f7c29e67dcc06a944bc8f2e955d0e8 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Thu, 19 Dec 2024 14:38:49 -0800 Subject: [PATCH 2/4] nostrdb_net: add tokio feature --- nostrdb_net/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nostrdb_net/Cargo.toml b/nostrdb_net/Cargo.toml index d30ac0b..785f7d0 100644 --- a/nostrdb_net/Cargo.toml +++ b/nostrdb_net/Cargo.toml @@ -17,3 +17,6 @@ hex = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } url = { workspace = true } + +[features] +tokio = ["ewebsock/tokio"] From 7731587ab7bd55e7eb30b1ff18bbbe1cb11662ae Mon Sep 17 00:00:00 2001 From: William Casarin Date: Fri, 20 Dec 2024 13:12:17 -0800 Subject: [PATCH 3/4] fixup! add nostrdb_net, split into crates --- nostrdb_net/src/relay/.mod.rs.swp | Bin 12288 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 nostrdb_net/src/relay/.mod.rs.swp diff --git a/nostrdb_net/src/relay/.mod.rs.swp b/nostrdb_net/src/relay/.mod.rs.swp deleted file mode 100644 index 503cabaa2112dcf7486803e9e3f8c52d261b2f23..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI2ONw?7D~{FwFGS>^3{oJycb* zz+~`b;=#KY6ZD`b4|>)3h$tt;#K+wSM{j!a-(yD zLpOGxc=;ILSJ}gG?P6@|Y;$?*nLpX1Kki`X8hiIr^=8-kG`b+yI0c*nx1+#L_QLafc2m!Z(l~$Q z;icPo$n|jwI0c*nP64NYQ@|Zn2) zb#;`)Tqj)8_hr1BB<+Crg1LUlXZeFs+)BnhQ|zV<-iq^w8^W9Hq}tf-+o=P-s3j&D z*c$K?BAh;A44*CY9?k_^8~81xb-$yQhkR-a%~8W=_-ksQ$UG+SE{Jga)ak5iW>=RQ z1}1GV+woP;E0>K4WQGx4icrwkdYB{sS;#wB7Sm1&3c0>j*LAA!^a4q^Xi7nFFo|Ok zY8X^>rzypnXs9Gy4T8}2sI5hMrM6aI6=~O-H1nmh?epViX;ct2MXR2+wO97J?Ny#D zh9rGFYnDGSzszSC&6TN=O;iYz4;1jI({1x3u~+8L@}{-{zNpu&A8JO|?^SV8d*@UVqoXmDjO`arnc|>7#*tM9$`Bnn7VE1>#W(I3og$J1oP0O{4gzw4ykIVWTmL zTG4JIrA%xhk?f{uiyL=^fP;ScU36+yOoG<%6?GbFuVhxGWZR*3tD|#2TA2^13W6hv zME>EZot~=U(83mIs_j0Q`)H~TQT}h_DTP~}NdXS8h3u5E2}7+mqo{4s94kXD#`9qK zKq(rBgO{0cjBnZcAqs7z9g0-8?~A5=xHyqU%G6B5h&b&G&j>dsPT{n}(dZd3%cq&? zh;Pu%d&2%fsbgPNu$iYya4qX0;wK1tMtk3G&_1Wv2yF7yMd@Y`P(Yy*g^Hg=)o?T9 zwVlm3@XZER#k@q=O7np{o*sEu@?k%nR2argQ=GNE>@@X@*9iJKSd_sc;L(2p{D>uo From fb663bb01f42d4f419018ebaf7d5519a39152fa7 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Fri, 20 Dec 2024 13:12:28 -0800 Subject: [PATCH 4/4] wip tokio_tungstenite Signed-off-by: William Casarin --- Makefile | 2 +- nostrdb_net/Cargo.toml | 4 ++- nostrdb_net/src/relay/mod.rs | 21 ++++++++++++- nostrdb_net/src/relay/pool.rs | 57 +++++++++++++++++++++++++++++++++-- 4 files changed, 78 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index d6c4269..1fbaa8d 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ tags: fake - find src -name '*.rs' | xargs ctags + find . -name '*.rs' | xargs ctags .PHONY: fake diff --git a/nostrdb_net/Cargo.toml b/nostrdb_net/Cargo.toml index 785f7d0..4605e65 100644 --- a/nostrdb_net/Cargo.toml +++ b/nostrdb_net/Cargo.toml @@ -17,6 +17,8 @@ hex = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } url = { workspace = true } +tokio-tungstenite = { version = "0.26.1", optional = true } +tungstenite = { version = "0.26.1", optional = true } [features] -tokio = ["ewebsock/tokio"] +tokio = ["ewebsock/tokio", "tokio-tungstenite", "tungstenite"] diff --git a/nostrdb_net/src/relay/mod.rs b/nostrdb_net/src/relay/mod.rs index 532252b..b8469cf 100644 --- a/nostrdb_net/src/relay/mod.rs +++ b/nostrdb_net/src/relay/mod.rs @@ -19,7 +19,13 @@ pub enum RelayStatus { pub struct Relay { pub url: String, pub status: RelayStatus, + + #[cfg(feature = "tokio")] + pub stream: tokio_tungstenite::WebSocketStream>, + + #[cfg(not(feature = "tokio"))] pub sender: WsSender, + #[cfg(not(feature = "tokio"))] pub receiver: WsReceiver, } @@ -45,7 +51,7 @@ impl PartialEq for Relay { } } -impl Eq for Relay {} +impl Eq for Relay { } impl Relay { pub fn new(url: String, wakeup: impl Fn() + Send + Sync + 'static) -> Result { @@ -76,6 +82,19 @@ impl Relay { self.sender.send(txt); } + #[cfg(feature = "tokio")] + pub async fn connect(&mut self) -> Result<()> { + use tokio_tungstenite::connect_async; + + let request = self.url.into_client_request()?; + let (stream, response) = connect_async(request).await?; + + self.stream = stream; + + Ok(()) + } + + #[cfg(not(feature = "tokio"))] pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> { let (sender, receiver) = ewebsock::connect_with_wakeup(&self.url, Options::default(), wakeup)?; diff --git a/nostrdb_net/src/relay/pool.rs b/nostrdb_net/src/relay/pool.rs index 4d2fe8f..59266ad 100644 --- a/nostrdb_net/src/relay/pool.rs +++ b/nostrdb_net/src/relay/pool.rs @@ -7,16 +7,23 @@ use std::time::{Duration, Instant}; use url::Url; -#[cfg(not(target_arch = "wasm32"))] -use ewebsock::{WsEvent, WsMessage}; +#[cfg(not(feature = "tokio"))] +use ewebsock::{WsMessage}; #[cfg(not(target_arch = "wasm32"))] use tracing::{debug, error}; +#[cfg(not(feature = "tokio"))] +pub type WebsockEvent = ewebsock::WsEvent; + +#[cfg(feature = "tokio")] +pub type WebsockEvent = tungstenite::protocol::Message; + #[derive(Debug)] pub struct PoolEvent<'a> { pub relay: &'a str, - pub event: ewebsock::WsEvent, + + pub event: WebsockEvent, } impl PoolEvent<'_> { @@ -30,6 +37,11 @@ impl PoolEvent<'_> { pub struct PoolEventBuf { pub relay: String, + + #[cfg(feature = "tokio")] + pub event: tungstenite::protocol::Message, + + #[cfg(not(feature = "tokio"))] pub event: ewebsock::WsEvent, } @@ -213,11 +225,50 @@ impl RelayPool { } } + #[cfg(feature = "tokio")] + pub async fn recv(&mut self) -> Result> { + for relay in &mut self.relays { + let relay = &mut relay.relay; + if let Some(event) = await relay.receiver.recv() { + match &event { + Err(WebsockEvent::Opened) => { + relay.status = RelayStatus::Connected; + } + WebsockEvent::Closed => { + relay.status = RelayStatus::Disconnected; + } + WebsockEvent::Error(err) => { + error!("{:?}", err); + relay.status = RelayStatus::Disconnected; + } + WebsockEvent::Message(ev) => { + // let's just handle pongs here. + // We only need to do this natively. + #[cfg(not(target_arch = "wasm32"))] + if let WsMessage::Ping(ref bs) = ev { + debug!("pong {}", &relay.url); + relay.sender.send(WsMessage::Pong(bs.to_owned())); + } + } + } + return Some(PoolEvent { + event, + relay: &relay.url, + }); + } + } + + None + } + /// Attempts to receive a pool event from a list of relays. The /// function searches each relay in the list in order, attempting to /// receive a message from each. If a message is received, return it. /// If no message is received from any relays, None is returned. + #[cfg(not(feature = "tokio"))] pub fn try_recv(&mut self) -> Option> { + use ewebsock::WsEvent; + for relay in &mut self.relays { let relay = &mut relay.relay; if let Some(event) = relay.receiver.try_recv() {