source of geminispace.info - the search provider for gemini space
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

429 lines
17 KiB

import re
from urllib.parse import (
quote,
unquote,
urljoin,
urlparse,
urlsplit,
urlunparse,
urlunsplit,
uses_relative,
uses_netloc,
)
from urllib.robotparser import RobotFileParser
import gusmobile
from gus import constants
from gus.lib.domain import is_domain
# hack: the built-in methods in urllib need to know the
# Gemini protocol exists
uses_relative.append("gemini")
uses_netloc.append("gemini")
LOG_ROOT_LIKE_PATTERN = re.compile(
".*/(gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/?$",
flags=re.IGNORECASE,
)
LOG_POST_LIKE_PATTERN = re.compile(
".*/((gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/.+$|.*/\d{4}[-_]\d{2}[-_]\d{2}.*$|.*/(19|20)\d{6}.*$)",
flags=re.IGNORECASE,
)
LOG_POST_LIKE_EXCLUSION_PATTERN = re.compile(
".*/(games|archive|archives|rss|handlers|diagnostics)/.*|.*atom.xml$|.*gemlog.gmi$|.*index.gmi$|.*index.gemini$",
flags=re.IGNORECASE,
)
LOG_POST_GEMLOGBLUE_LIKE_PATTERN = re.compile(
"^/users/[a-z][-a-z0-9]*/\d+\.gmi?", flags=re.IGNORECASE
)
LOG_POST_BOSTON_LIKE_PATTERN = re.compile(
"^/boston/\d{4}/\d{2}/\d+\.\d+", flags=re.IGNORECASE
)
ROOT_LIKE_ONLY_PATTERN = re.compile(
"^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?$", flags=re.IGNORECASE
)
ROOT_LIKE_PATTERN = re.compile(
"^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?", flags=re.IGNORECASE
)
AUTHOR_URL_PATTERN = re.compile(
"^/~([a-z][-a-z0-9]*)/|^/users/~?([a-z][-a-z0-9]*)", flags=re.IGNORECASE
)
AUTHOR_CONTENT_PATTERN = re.compile(
".*(by|author): ([\w\s\d]+)", flags=re.IGNORECASE | re.MULTILINE
)
TITLE_CONTENT_PATTERN = re.compile("^#\s(.*)$", flags=re.IGNORECASE | re.MULTILINE)
TITLE_URL_PATTERN = re.compile(
".*/(\d{8}[-_]|\d{4}[-_]\d{2}[-_]\d{2}[-_])?([a-z0-9-_]+)(\.[a-z0-9]+)$",
flags=re.IGNORECASE,
)
class GeminiRobotFileParser(RobotFileParser):
def set_url(self, url):
"""Sets the URL referring to a robots.txt file."""
self.url = url
u, _ = GeminiResource.urlsplit_featureful(url)
self.host, self.path = u[1:3]
def read(self):
"""Reads the robots.txt URL and feeds it to the parser."""
gr = GeminiResource(self.url)
response = gr.fetch()
if response is None:
self.allow_all = True
return
if not response.status.startswith("2") or not response.content_type == "text/plain":
self.allow_all = True
else:
self.parse(response.content.splitlines())
def read_from_string(self, robots_txt):
"""An utility method for writing tests"""
self.parse(robots_txt.splitlines())
def can_fetch_prioritized(self, useragents, url):
"""Given a url and prioritized list of user-agents, is fetching allowed?
Priority is with the highest priority first; eg. ["ThisIndexerBot", "generic-indexer", "generic-bot", "*"].
"""
if self.allow_all:
return True
if self.disallow_all:
return False
if not self.last_checked:
return False
parsed_url = urlparse(unquote(url))
url = urlunparse(('','',parsed_url.path, parsed_url.params,parsed_url.query, parsed_url.fragment))
url = quote(url) or "/"
def useragent_allowed(useragent):
for entry in self.entries:
if entry.applies_to(useragent):
return entry.allowance(url)
return None
# map user-agents to allowances; the first non-None will be the prioritized allowance
for ua in useragents:
allowed = useragent_allowed(ua)
if allowed is not None:
return allowed
# if none of the user-agents match, check default entry
if self.default_entry:
return self.default_entry.allowance(url)
# if nothing matches, crawling is allowed
return True
class GeminiResource:
def __init__(self, url, fully_qualified_parent_url=None, parent_hostname=None):
self.raw_url = url
self.urlsplit, self.is_relative = GeminiResource.urlsplit_featureful(
url,
fully_qualified_parent_url=fully_qualified_parent_url,
parent_hostname=parent_hostname,
)
self.is_valid = self.urlsplit is not None
self.fully_qualified_parent_url = fully_qualified_parent_url
self._normalized_url = None
self._normalized_host = None
self._normalized_host_like = None
self._fetchable_url = None
self._is_root_like = None
self._is_log_root_like = None
self._is_log_post_like = None
self._default_change_frequency = None
self.contained_resources = None
def urlsplit_featureful(url, fully_qualified_parent_url=None, parent_hostname=None):
# the point of this relatively complex function is to allow for protocol-less,
# double-slash-prepended-less URLs that still get treated as absolute (i.e.,
# non-relative) URLs and thus get their hosts parsed correctly by `urlsplit`.
# This is important because I want to be able to use the host for a number of
# things behind the scenes.
is_relative = False
u = urlsplit(url, "gemini")
if u.scheme != "gemini":
return None, None
if u.hostname is None:
if url.startswith("/"):
# process relative link
if parent_hostname is None:
return None, None
joined = urljoin("gemini://{}".format(parent_hostname), url)
u = urlsplit(joined, "gemini")
is_relative = True
else: # url does not start with /
# could be: blah.com/test
# could be: test
url_split = url.split("/")
if is_domain(url_split[0]):
# treat schemeless uris as non-gemini as announced in
# https://lists.orbitalfox.eu/archives/gemini/2020/003646.html
return None, None
else:
# process relative link
if fully_qualified_parent_url is None:
return None, None
joined = urljoin(fully_qualified_parent_url, url)
u = urlsplit(joined, "gemini")
is_relative = True
return u, is_relative
def _get_normalized_url(self):
if not self.is_valid:
return None
if self._normalized_url is None:
(
self._normalized_url,
self._normalized_host,
) = self._get_normalized_url_and_host()
return self._normalized_url
def _get_normalized_host(self):
if not self.is_valid:
return None
if self._normalized_host is None:
(
self._normalized_url,
self._normalized_host,
) = self._get_normalized_url_and_host()
return self._normalized_host
def _get_normalized_host_like(self):
if not self.is_valid:
return None
if self._normalized_host_like is None:
normalized_host_like = self.normalized_host
m = ROOT_LIKE_PATTERN.match(self.urlsplit.path)
if m:
normalized_host_like += m[0].rstrip("/")
self._normalized_host_like = normalized_host_like
return self._normalized_host_like
def _get_fetchable_url(self):
if not self.is_valid:
return None
if self._fetchable_url is None:
if self.is_relative:
# leave off fragment portion of urlsplit at [4]
urlsplit_parts = list(self.urlsplit[:4])
urlsplit_parts.append("")
url = urlunsplit(urlsplit_parts)
else:
raw_url_lower = self.raw_url.lower()
if raw_url_lower.startswith("gemini://"):
url = self.raw_url
elif raw_url_lower.startswith("//"):
url = "gemini:{}".format(self.raw_url)
else:
url = "gemini://{}".format(self.raw_url)
# leave off fragment portion of urlsplit at [4]
if self.urlsplit[4] != "":
url = url.replace("#{}".format(self.urlsplit[4]), "")
self._fetchable_url = url
return self._fetchable_url
def _get_is_root_like(self):
if self._is_root_like is None:
is_root_like = False
if (
self.urlsplit.path == ""
or self.urlsplit.path == "/"
or ROOT_LIKE_ONLY_PATTERN.match(self.urlsplit.path)
):
is_root_like = True
self._is_root_like = is_root_like
return self._is_root_like
def _get_is_log_root_like(self):
if self._is_log_root_like is None:
is_log_root_like = False
if (
self.urlsplit.path == ""
or self.urlsplit.path == "/"
or LOG_ROOT_LIKE_PATTERN.match(self.urlsplit.path)
):
is_log_root_like = True
self._is_log_root_like = is_log_root_like
return self._is_log_root_like
def _get_is_log_post_like(self):
if self._is_log_post_like is None:
is_log_post_like = False
post_like_match = LOG_POST_LIKE_PATTERN.match(self.urlsplit.path)
post_like_exclusion_match = LOG_POST_LIKE_EXCLUSION_PATTERN.match(
self.urlsplit.path
)
post_gemlogblue_match = LOG_POST_GEMLOGBLUE_LIKE_PATTERN.match(
self.urlsplit.path
)
post_boston_match = LOG_POST_BOSTON_LIKE_PATTERN.match(self.urlsplit.path)
if (
(post_like_match and not post_like_exclusion_match)
or (self.normalized_host == "gemlog.blue" and post_gemlogblue_match)
or (self.normalized_host == "gemini.conman.org" and post_boston_match)
):
is_log_post_like = True
self._is_log_post_like = is_log_post_like
return self._is_log_post_like
def get_friendly_author(self, content):
if not self.is_valid:
return None
friendly_author = None
author_url_match = AUTHOR_URL_PATTERN.match(self.urlsplit.path)
if author_url_match:
# first check url
if author_url_match[1]:
friendly_author = author_url_match[1]
elif author_url_match[2]:
friendly_author = author_url_match[2]
if friendly_author is None:
# if no URL match, try looking in page content
if isinstance(content, str):
author_content_match = AUTHOR_CONTENT_PATTERN.match(content)
if author_content_match:
friendly_author = author_content_match[1]
if friendly_author is None:
# if still no match, use normalized host
friendly_author = self.normalized_host
return friendly_author
def get_friendly_title(self, content):
if not self.is_valid:
return None
friendly_title = None
if isinstance(content, str):
title_content_match = TITLE_CONTENT_PATTERN.match(content)
if title_content_match:
# first try page content
friendly_title = title_content_match[1]
if friendly_title is None:
# if no content match, try looking in URL
title_url_match = TITLE_URL_PATTERN.match(self.urlsplit.path)
if title_url_match:
friendly_title = (
title_url_match[2]
.replace("-", " ")
.replace("_", " ")
.strip()
.title()
)
if friendly_title is None:
# if still no match, use URL path
friendly_title = self.urlsplit.path.lstrip("/")
return friendly_title
def get_default_change_frequency(self, category):
if not self.is_valid:
return None
if self._default_change_frequency is None:
if category == "content":
if self.is_root_like or self.is_log_root_like:
change_frequency = constants.ROOT_CHANGE_FREQUENCY_DEFAULT
else:
change_frequency = constants.NON_ROOT_CHANGE_FREQUENCY_DEFAULT
elif category == "binary":
change_frequency = constants.BINARY_CHANGE_FREQUENCY_DEFAULT
elif category == "redirect":
change_frequency = constants.REDIRECT_CHANGE_FREQUENCY_DEFAULT
elif category == "temp_error":
change_frequency = constants.TEMP_ERROR_CHANGE_FREQUENCY_DEFAULT
elif category == "perm_error":
change_frequency = constants.PERM_ERROR_CHANGE_FREQUENCY_DEFAULT
elif category == "prompt":
change_frequency = constants.PROMPT_CHANGE_FREQUENCY_DEFAULT
else:
raise Exception.NameError("Unrecognized resource category")
self._default_change_frequency = change_frequency
return self._default_change_frequency
def increment_change_frequency(self, existing_change_frequency, category):
if category == "content":
if self.is_root_like or self.is_log_root_like:
return existing_change_frequency + constants.ROOT_CHANGE_FREQUENCY_INCREMENT
else:
return existing_change_frequency + constants.NON_ROOT_CHANGE_FREQUENCY_INCREMENT
elif category == "binary":
return existing_change_frequency + constants.BINARY_CHANGE_FREQUENCY_INCREMENT
elif category == "redirect":
return existing_change_frequency + constants.REDIRECT_CHANGE_FREQUENCY_INCREMENT
elif category == "temp_error":
return existing_change_frequency + constants.TEMP_ERROR_CHANGE_FREQUENCY_INCREMENT
elif category == "perm_error":
return existing_change_frequency + constants.PERM_ERROR_CHANGE_FREQUENCY_INCREMENT
elif category == "prompt":
return existing_change_frequency + constants.PROMPT_CHANGE_FREQUENCY_INCREMENT
else:
raise Exception.NameError("Unrecognized resource category")
# constructed from fetchable_url
# does not matter if quoted or unquoted so I choose arbitrarily to
# standardize on unquoting it.
normalized_url = property(_get_normalized_url)
normalized_host = property(_get_normalized_host)
# constructed from urlsplit or raw_url
# should be quoted.
fetchable_url = property(_get_fetchable_url)
# constructed from fetchable_url
# should be unquoted.
is_root_like = property(_get_is_root_like)
is_log_root_like = property(_get_is_log_root_like)
is_log_post_like = property(_get_is_log_post_like)
normalized_host_like = property(_get_normalized_host_like)
def fetch(self):
# NB: this intentionally does NOT fetch the normalized URL, because that could
# cause an infinite loop with, e.g., normalization stripping a trailing slash
# and a server redirecting to the same URL _with_ a trailing slash.
return gusmobile.fetch(self.fetchable_url)
def _get_normalized_url_and_host(self):
url_normalized = unquote(self.fetchable_url.lower().rstrip("/"))
if self.urlsplit.port == 1965:
url_normalized = url_normalized.replace(
self.urlsplit.hostname.lower() + ":1965",
self.urlsplit.hostname.lower(),
1,
)
host_normalized = self.urlsplit.hostname.lower()
return url_normalized, host_normalized
def extract_contained_resources(self, content):
# this finds all gemini URLs within the content of a given GeminiResource and
# returns them as a list of new GeminiResources
if self.contained_resources:
return self.contained_resources
link_pattern = "^=>\s*(\S+)"
preformat_pattern = r"^```.*?^```"
content_without_preformat = re.sub(
preformat_pattern, "", content, flags=re.DOTALL | re.MULTILINE
)
probable_urls = re.findall(
link_pattern, content_without_preformat, re.MULTILINE
)
resources = []
for url in probable_urls:
resource = GeminiResource(
url,
fully_qualified_parent_url=self.fetchable_url,
parent_hostname=self.urlsplit.hostname,
)
if resource.is_valid:
resources.append(resource)
self.contained_resources = resources
return self.contained_resources