Skip to content

Commit 1212c2c

Browse files
authored
Implement IPNS validation (#16)
* chore: minor clean-up * Better exception errors: add print statement for error message and type * remove duplicate codecs * correct errors in testing_data * Implement IPNS validation * Add support for IPNS subdomain and path gateway * Add support for IPNS paths * Add testing data * Add tests for new implementation * Bump package version
1 parent 770bc30 commit 1212c2c

File tree

9 files changed

+270
-44
lines changed

9 files changed

+270
-44
lines changed

README.MD

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ print(Validator("QmYjtig7VJQ6XsnUjqqJvj7QaMcCAwtrgNdahSiFofrE7o").is_ipfs())
2121
* [x] CID
2222
* [x] v0
2323
* [x] v1
24-
* [ ] URL
25-
* [x] IPFS
26-
* [ ] IPNS
27-
* [ ] Subdomain
28-
* [x] IPFS
29-
* [ ] IPNS
30-
* [ ] Path
24+
* [x] URL
25+
* [x] Path Gateway
26+
* [x] IPFS
27+
* [x] IPNS
28+
* [x] Subdomain Gateway
29+
* [x] IPFS
30+
* [x] IPNS
31+
* [x] Path
3132
* [x] IPFS
33+
* [x] IPNS
3234
* [ ] ...
3335

3436
## License

is_ipfs/is_ipfs.py

Lines changed: 140 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22
import typing
3+
from urllib.parse import urlparse
34

45
import cid
56
from multibase import decode
@@ -13,21 +14,27 @@ class Validator:
1314

1415
def __init__(self, input: typing.Any):
1516
self.input = input
16-
self.pathGatewayPattern = re.compile(
17+
self.path_gateway_pattern = re.compile(
1718
r"^https?://[^/]+/(?P<protocol>ip[fn]s)/(?P<hash>[^/?#]+)"
1819
)
19-
self.subdomainGatewayPattern = re.compile(
20+
self.subdomain_gateway_pattern = re.compile(
2021
r"^https?://(?P<hash>[^/]+)\.(?P<protocol>ip[fn]s)\.[^/?]+"
2122
)
22-
self.pathPattern = re.compile(r"^/(?P<protocol>ip[fn]s)/(?P<hash>[^/?#]+)")
23+
self.path_pattern = re.compile(r"^/(?P<protocol>ip[fn]s)/(?P<hash>[^/?#]+)")
2324

2425
def is_ipfs(self) -> bool:
2526
"""
2627
Returns True if the provided input is a valid IPFS resource/object or False otherwise.
2728
"""
28-
return self._is_CID() or self._is_ipfs_url() or self._is_ipfs_path()
29+
return (
30+
self._is_cid()
31+
or self._is_ipfs_url()
32+
or self._is_ipns_url()
33+
or self._is_ipfs_path()
34+
or self._is_ipns_path()
35+
)
2936

30-
def _is_CID(self) -> bool:
37+
def _is_cid(self) -> bool:
3138
"""
3239
Returns True if the provided string or CID object represents a valid CID or False otherwise.
3340
"""
@@ -36,27 +43,25 @@ def _is_CID(self) -> bool:
3643
elif type(self.input) == bytes:
3744
try:
3845
return cid.is_cid(decode(self.input))
39-
except:
46+
except Exception as error:
47+
print(f"Unexpected {type(error)}, {error}")
4048
return False
4149
else:
4250
try:
4351
if isinstance(self.input, (cid.CIDv0, cid.CIDv1)):
4452
return cid.is_cid(str(self.input))
45-
except:
53+
except Exception as error:
54+
print(f"Unexpected {type(error)}, {error}")
4655
return False
4756
return False
4857

49-
def _is_ipfs_url(self) -> bool:
50-
"""
51-
Returns True if the provided string is a valid IPFS url or False otherwise.
52-
"""
53-
return self._ipfs_path_url() or self._ipfs_subdomain_url()
54-
5558
def _is_integral_ipfs_url(
5659
self,
5760
pattern: re.Pattern,
5861
) -> bool:
59-
62+
"""
63+
Main logic for IPFS URL validation.
64+
"""
6065
formatted = str(self.input)
6166
if not formatted:
6267
return False
@@ -70,14 +75,15 @@ def _is_integral_ipfs_url(
7075

7176
_hash = match["hash"]
7277

73-
if pattern == self.subdomainGatewayPattern:
78+
if pattern == self.subdomain_gateway_pattern:
7479
_hash = _hash.lower()
7580
try:
7681
if get_codec(_hash).encoding not in ["base32", "base36"]:
7782
return False
78-
except:
83+
except Exception as error:
84+
print(f"Unexpected {type(error)}, {error}")
7985
return False
80-
elif pattern == self.pathGatewayPattern:
86+
elif pattern == self.path_gateway_pattern:
8187
if not str(_hash).startswith("Qm"):
8288
try:
8389
if get_codec(_hash).encoding not in [
@@ -90,34 +96,141 @@ def _is_integral_ipfs_url(
9096
"base58flickr",
9197
"base58btc",
9298
"base64url",
93-
"base32",
94-
"base36",
9599
]:
96100
return False
97-
except:
98-
pass
101+
except Exception as error:
102+
print(f"Unexpected {type(error)}, {error}")
103+
return False
99104

100-
return Validator(_hash)._is_CID()
105+
return Validator(_hash)._is_cid()
101106

102107
def _ipfs_subdomain_url(self) -> bool:
103108
"""
104109
Returns True if the provided url string includes a valid IPFS subdomain (case-insensitive CIDv1) or False otherwise.
105110
"""
106111
return self._is_integral_ipfs_url(
107-
self.subdomainGatewayPattern,
112+
self.subdomain_gateway_pattern,
108113
)
109114

110115
def _ipfs_path_url(self) -> bool:
111116
"""
112117
Returns True if the provided url string is a valid IPFS URL or False otherwise.
113118
"""
114-
115119
return self._is_integral_ipfs_url(
116-
self.pathGatewayPattern,
120+
self.path_gateway_pattern,
117121
)
118122

123+
def _is_ipfs_url(self) -> bool:
124+
"""
125+
Returns True if the provided string is a valid IPFS url or False otherwise.
126+
"""
127+
return self._ipfs_path_url() or self._ipfs_subdomain_url()
128+
119129
def _is_ipfs_path(self) -> bool:
120130
"""
121-
Returns true if the provided string is a valid IPFS path or false otherwise.
131+
Returns True if the provided string is a valid IPFS path or False otherwise.
122132
"""
123-
return self._is_integral_ipfs_url(self.pathPattern)
133+
return self._is_integral_ipfs_url(self.path_pattern)
134+
135+
def _is_integral_ipns_url(
136+
self,
137+
pattern: re.Pattern,
138+
) -> bool:
139+
"""
140+
Main logic for IPNS URL validation.
141+
"""
142+
formatted = str(self.input)
143+
if not formatted:
144+
return False
145+
146+
match = re.match(pattern, formatted)
147+
if not match:
148+
return False
149+
150+
if match["protocol"] != "ipns":
151+
return False
152+
153+
ipns_id = match["hash"]
154+
155+
if ipns_id and pattern == self.subdomain_gateway_pattern:
156+
ipns_id = ipns_id.lower()
157+
158+
if Validator(ipns_id)._is_cid():
159+
try:
160+
if get_codec(ipns_id).encoding == "base36":
161+
return True
162+
except Exception as error:
163+
print(f"Unexpected {type(error)}, {error}")
164+
return False
165+
try:
166+
if "." not in ipns_id and "-" in ipns_id:
167+
ipns_id = (
168+
ipns_id.replace("--", "@").replace("-", ".").replace("@", "-")
169+
)
170+
171+
return self._id_is_explicit_tld(ipns_id)
172+
except Exception as error:
173+
print(f"Unexpected {type(error)}, {error}")
174+
return False
175+
176+
elif pattern == self.path_gateway_pattern or pattern == self.path_pattern:
177+
if not str(ipns_id).startswith("Qm"):
178+
if self._id_is_explicit_tld(ipns_id):
179+
return True
180+
181+
try:
182+
if get_codec(ipns_id).encoding not in [
183+
"base2",
184+
"base16",
185+
"base32",
186+
"base32hex",
187+
"base36",
188+
"base36upper",
189+
"base58flickr",
190+
"base58btc",
191+
"base64url",
192+
]:
193+
return False
194+
except Exception as error:
195+
print(f"Unexpected {type(error)}, {error}")
196+
return False
197+
198+
return Validator(ipns_id)._is_cid()
199+
200+
def _ipns_subdomain_url(self) -> bool:
201+
"""
202+
Returns True if the provided url string includes a valid IPFS subdomain (case-insensitive CIDv1) or False otherwise.
203+
"""
204+
return self._is_integral_ipns_url(
205+
self.subdomain_gateway_pattern,
206+
)
207+
208+
def _ipns_path_url(self) -> bool:
209+
"""
210+
Returns True if the provided url string is a valid IPFS URL or False otherwise.
211+
"""
212+
return self._is_integral_ipns_url(
213+
self.path_gateway_pattern,
214+
)
215+
216+
def _is_ipns_url(self) -> bool:
217+
"""
218+
Returns True if the provided string is a valid IPFS url or False otherwise.
219+
"""
220+
return self._ipns_path_url() or self._ipns_subdomain_url()
221+
222+
def _is_ipns_path(self) -> bool:
223+
"""
224+
Returns True if the provided string is a valid IPNS path or False otherwise.
225+
"""
226+
return self._is_integral_ipns_url(self.path_pattern)
227+
228+
def _id_is_explicit_tld(self, input_string: str) -> bool:
229+
"""
230+
Returns True if the provided url string has an explicit TLD, False otherwise.
231+
"""
232+
fqdn_with_tld = re.compile(
233+
r"(?=^.{4,253}\.?$)(^((?!-)[a-zA-Z0-9-]{1,63}(?<!-)\.)+[a-zA-Z]{2,63}$)"
234+
)
235+
hostname = urlparse(f"http://{input_string}").hostname
236+
return bool(re.search(fqdn_with_tld, hostname))

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setuptools.setup(
88
name="py-is_ipfs",
9-
version="0.0.6",
9+
version="0.1.0",
1010
description="Python library to identify valid IPFS resources",
1111
long_description=long_description,
1212
long_description_content_type="text/markdown",
@@ -15,7 +15,7 @@
1515
packages=setuptools.find_packages(exclude=["tests", "tests.*"]),
1616
license="MIT",
1717
classifiers=[
18-
"Development Status :: 3 - Alpha",
18+
"Development Status :: 4 - Beta",
1919
"Intended Audience :: Developers",
2020
"License :: OSI Approved :: MIT License",
2121
"Natural Language :: English",

tests/integration/test_is_ipfs.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,30 @@ def test_all(self):
4141
for entry in testing_data.invalid_entries["path"]["ipfs"]:
4242
self.assertFalse(Validator(entry).is_ipfs())
4343

44+
with self.subTest("Test valid IPNS URL entries from fixtures"):
45+
for entry in testing_data.valid_entries["url"]["ipns"]:
46+
self.assertTrue(Validator(entry).is_ipfs())
47+
48+
with self.subTest("Test invalid IPNS URL entries from fixtures"):
49+
for entry in testing_data.invalid_entries["url"]["ipns"]:
50+
self.assertFalse(Validator(entry).is_ipfs())
51+
52+
with self.subTest("Test valid IPNS subdomain entries from fixtures"):
53+
for entry in testing_data.valid_entries["subdomain"]["ipns"]:
54+
self.assertTrue(Validator(entry).is_ipfs())
55+
56+
with self.subTest("Test invalid IPNS subdomain entries from fixtures"):
57+
for entry in testing_data.invalid_entries["subdomain"]["ipns"]:
58+
self.assertFalse(Validator(entry).is_ipfs())
59+
60+
with self.subTest("Test valid IPNS path entries from fixtures"):
61+
for entry in testing_data.valid_entries["path"]["ipns"]:
62+
self.assertTrue(Validator(entry).is_ipfs())
63+
64+
with self.subTest("Test invalid IPNS path entries from fixtures"):
65+
for entry in testing_data.invalid_entries["path"]["ipns"]:
66+
self.assertFalse(Validator(entry).is_ipfs())
67+
4468

4569
if __name__ == "__main__":
4670
unittest.main()

0 commit comments

Comments
 (0)