source of geminispace.info - the search provider for gemini space
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

737 lines
24 KiB

import argparse
import logging
import re
from datetime import datetime, timedelta
import os
import pathlib
import time
from urllib.parse import urljoin, uses_relative, uses_netloc
# TODO: this import breaks with Python 3.9, but all code that relies
# on this code path is currently dead code, so for now I'm just
# commenting out the import. It would be nice to make an actual
# decision soon about whether or not feed-based updating is even
# worth keeping around. If not, the dead code paths could simply
# deleted, and GUS would get a bit simpler :)
# import feedparser
import peewee
from gus.excludes import EXCLUDED_URL_PREFIXES, EXCLUDED_URL_PATHS
from . import constants
from gus.lib.db_model import init_db, Page, Link, Crawl
from gus.lib.gemini import GeminiResource, GeminiRobotFileParser
import gus.lib.logging
# hack: the built-in methods in urllib need to know the
# Gemini protocol exists
uses_relative.append("gemini")
uses_netloc.append("gemini")
CRAWL_DELAYS = {
"alexschroeder.ch": 5000,
"communitywiki.org": 5000,
}
EXCLUDED_URL_PATTERN = re.compile(
r"^gemini://(\d{6}\.ch|almp\d{4}\.app|.*/_(revert|history)/).*",
flags=re.IGNORECASE
)
def index_binary(resource, response):
logging.debug(
"Indexing binary for: %s",
gus.lib.logging.strip_control_chars(resource.indexable_url),
)
doc = {
"url": resource.indexable_url,
"fetchable_url": resource.fetchable_url,
"normalized_url": resource.normalized_url,
"domain": resource.normalized_host,
"port": resource.urlsplit.port or 1965,
"content_type": response.content_type,
"charset": response.charset,
"size": response.num_bytes,
"change_frequency": resource.get_default_change_frequency("binary"),
}
existing_page = Page.get_or_none(url=resource.indexable_url)
if existing_page:
doc["id"] = existing_page.id
existing_change_frequency = (
existing_page.change_frequency
or resource.get_default_change_frequency("binary")
)
doc["change_frequency"] = resource.increment_change_frequency(
existing_change_frequency, "binary"
)
page = Page(**doc)
try:
page.save()
except:
logging.error("Error adding page: %s", gus.lib.logging.strip_control_chars(resource.indexable_url))
return page
def index_redirect(resource):
logging.debug(
"Indexing redirect for: %s",
gus.lib.logging.strip_control_chars(resource.indexable_url),
)
doc = {
"url": resource.indexable_url,
"fetchable_url": resource.fetchable_url,
"normalized_url": resource.normalized_url,
"domain": resource.normalized_host,
"port": resource.urlsplit.port or 1965,
"change_frequency": resource.get_default_change_frequency("redirect"),
}
existing_page = Page.get_or_none(url=resource.indexable_url)
if existing_page:
doc["id"] = existing_page.id
existing_change_frequency = (
existing_page.change_frequency
or resource.get_default_change_frequency("redirect")
)
doc["change_frequency"] = resource.increment_change_frequency(
existing_change_frequency, "redirect"
)
page = Page(**doc)
try:
page.save()
except:
logging.error("Error adding page: %s", gus.lib.logging.strip_control_chars(resource.indexable_url))
return page
def index_error(resource, is_temporary):
category = "temp_error" if is_temporary else "perm_error"
default_change_frequency = resource.get_default_change_frequency(category)
doc = {
"url": resource.indexable_url,
"fetchable_url": resource.fetchable_url,
"normalized_url": resource.normalized_url,
"domain": resource.normalized_host,
"port": resource.urlsplit.port or 1965,
"change_frequency": default_change_frequency,
}
existing_page = Page.get_or_none(url=resource.indexable_url)
if existing_page:
doc["id"] = existing_page.id
existing_change_frequency = (
existing_page.change_frequency or default_change_frequency
)
doc["change_frequency"] = resource.increment_change_frequency(
existing_change_frequency, category
)
page = Page(**doc)
try:
page.save()
except:
logging.error("Error adding page: %s", gus.lib.logging.strip_control_chars(resource.indexable_url))
return page
def index_prompt(resource, response):
logging.debug(
"Indexing prompt for: %s",
gus.lib.logging.strip_control_chars(resource.indexable_url),
)
doc = {
"url": resource.indexable_url,
"fetchable_url": resource.fetchable_url,
"normalized_url": resource.normalized_url,
"domain": resource.normalized_host,
"port": resource.urlsplit.port or 1965,
"content_type": "input",
"charset": response.charset,
"size": response.num_bytes,
"prompt": response.prompt,
"change_frequency": resource.get_default_change_frequency("prompt"),
}
existing_page = Page.get_or_none(url=resource.indexable_url)
if existing_page:
doc["id"] = existing_page.id
existing_change_frequency = (
existing_page.change_frequency
or resource.get_default_change_frequency("prompt")
)
doc["change_frequency"] = resource.increment_change_frequency(
existing_change_frequency, "prompt"
)
page = Page(**doc)
try:
page.save()
except:
logging.error("Error adding page: %s", gus.lib.logging.strip_control_chars(resource.indexable_url))
return page
def index_content(resource, response):
logging.debug(
"Indexing content for: %s",
gus.lib.logging.strip_control_chars(resource.indexable_url),
)
doc = {
"url": resource.indexable_url,
"fetchable_url": resource.fetchable_url,
"normalized_url": resource.normalized_url,
"domain": resource.normalized_host,
"port": resource.urlsplit.port or 1965,
"content_type": response.content_type,
"charset": response.charset,
"content": response.content if response.num_bytes <= constants.MAXIMUM_TEXT_PAGE_SIZE else None,
"size": response.num_bytes,
"change_frequency": resource.get_default_change_frequency("content"),
}
if response.content_type == "text/gemini":
doc["lang"] = (response.lang or "none",)
existing_page = Page.get_or_none(url=resource.indexable_url)
is_different = False
if existing_page:
doc["id"] = existing_page.id
if existing_page.content:
is_different = doc["content"] != existing_page.content
if is_different:
doc["change_frequency"] = resource.get_default_change_frequency(
"content"
)
else:
existing_change_frequency = (
existing_page.change_frequency
or resource.get_default_change_frequency("content")
)
doc["change_frequency"] = resource.increment_change_frequency(
existing_change_frequency, "content"
)
page = Page(**doc)
try:
page.save()
except:
logging.error("Error adding page: %s", gus.lib.logging.strip_control_chars(resource.indexable_url))
return page, is_different
def should_skip(resource):
should_skip = False
for excluded_prefix in EXCLUDED_URL_PREFIXES:
if resource.normalized_url.startswith(excluded_prefix):
should_skip = True
break
for excluded_path in EXCLUDED_URL_PATHS:
if resource.urlsplit.path.lower().endswith(excluded_path):
should_skip = True
break
m = EXCLUDED_URL_PATTERN.match(resource.normalized_url)
if m:
should_skip = True
return should_skip
def index_links(from_resource, contained_resources):
from_page, created = Page.get_or_create(url=from_resource.indexable_url)
try:
Link.delete().where(Link.from_page == from_page).execute()
except:
logging.error("Error deleting a link: %s", Link.from_page)
data = []
for cr in contained_resources:
if should_skip(cr):
continue
to_page = Page.get_or_none(url=cr.indexable_url)
if not to_page:
to_page = Page.create(
url=cr.indexable_url,
fetchable_url=cr.fetchable_url,
domain=cr.normalized_host,
port=cr.urlsplit.port or 1965,
)
data.append(
{
"from_page": from_page,
"to_page": to_page,
"is_cross_host_like": Link.get_is_cross_host_like(from_resource, cr),
}
)
try:
Link.insert_many(data).execute()
except Exception as e:
logging.error("Error insert links: %s",e)
def fetch_robots_file(robot_host):
robot_url = urljoin("gemini://{}".format(robot_host), "/robots.txt")
logging.info(
"Fetching robots file: %s", gus.lib.logging.strip_control_chars(robot_url)
)
rp = GeminiRobotFileParser(robot_url)
rp.read()
return rp
def get_robots_file(robot_host):
if robot_host not in robot_file_map:
robot_file_map[robot_host] = fetch_robots_file(robot_host)
return robot_file_map[robot_host]
def crawl_page(
gemini_resource, current_depth, should_check_if_expired=True, redirect_chain=[]
):
gr = gemini_resource
url = gr.fetchable_url
if gr.normalized_host in failure_count and failure_count[gr.normalized_host] > constants.MAXIMUM_FAILED_REQUEST_COUNT:
logging.warn(
"Too much failed requests for host, skipping: %s", gus.lib.logging.strip_control_chars(url)
)
return
if max_crawl_depth >= 0 and current_depth > max_crawl_depth:
logging.warn(
"Going too deep, skipping: %s", gus.lib.logging.strip_control_chars(url)
)
return
if not gemini_resource.is_valid:
logging.warn(
"Not a valid gemini resource, skipping: %s",
gus.lib.logging.strip_control_chars(url),
)
return
if should_skip(gr):
logging.info(
"URL is excluded, skipping: %s",
gus.lib.logging.strip_control_chars(url),
)
return
if should_check_if_expired:
existing_page = Page.get_or_none(url=gr.indexable_url)
if existing_page and existing_page.change_frequency is not None:
most_recent_crawl = (
Crawl.select(peewee.fn.MAX(Crawl.timestamp))
.where(Crawl.page == existing_page)
.scalar()
)
if most_recent_crawl and datetime.now() < most_recent_crawl + timedelta(
hours=existing_page.change_frequency
):
logging.debug(
"Too soon to recrawl, skipping: %s",
gus.lib.logging.strip_control_chars(gr.fetchable_url),
)
return
# ROBOTS
robots_file = get_robots_file(gr.normalized_host)
crawl_delay = None
if robots_file is not None:
logging.debug("Found robots.txt for %s", gr.normalized_url)
# only fetch if allowed for user-agents * and indexer
# RobotFileParser will return the higher level value (*) if
# no indexer section is found
can_fetch = robots_file.can_fetch("indexer", gr.normalized_url)
# same approach as above - last value wins
# crawl_delay = robots_file.crawl_delay("indexer")
if not can_fetch:
logging.info(
"Blocked by robots.txt, skipping: %s",
gus.lib.logging.strip_control_chars(url),
)
return
# Crawl delay
if gr.normalized_host in domain_hit_timings:
if gr.normalized_host in CRAWL_DELAYS:
next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
milliseconds=CRAWL_DELAYS[gr.normalized_host]
)
elif not crawl_delay:
next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
milliseconds=300
)
else:
next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
milliseconds=crawl_delay
)
sleep_duration = max((next_allowed_hit - datetime.now()).total_seconds(), 0)
time.sleep(sleep_duration)
domain_hit_timings[gr.normalized_host] = datetime.now()
# Actually fetch!
logging.info("Fetching resource: %s", gus.lib.logging.strip_control_chars(url))
if gr.fully_qualified_parent_url is not None:
logging.debug(
"with parent: %s",
gus.lib.logging.strip_control_chars(gr.fully_qualified_parent_url),
)
response = gr.fetch()
if response is None:
# problem before getting a response
logging.warn("Failed to fetch: %s", gus.lib.logging.strip_control_chars(url))
page = index_error(gr, True)
page_crawl = Crawl(
page=page, status=0, is_different=False, timestamp=datetime.utcnow()
)
try:
page_crawl.save()
except:
logging.error("Error adding page_crawl: %s", gus.lib.logging.strip_control_chars(page.url))
failure_count[gr.normalized_host] = failure_count[gr.normalized_host] + 1 if gr.normalized_host in failure_count else 1
logging.debug("Failed request count for host %s is %d", gr.normalized_host, failure_count[gr.normalized_host])
return
failure_count[gr.normalized_host] = 0
if response.status.startswith("4"):
# temporary error status
logging.debug(
"Got temporary error: %s: %s %s",
gus.lib.logging.strip_control_chars(url),
response.status,
response.error_message,
)
page = index_error(gr, True)
page_crawl = Crawl(
page=page,
status=response.status,
is_different=False,
error_message=response.error_message,
timestamp=datetime.utcnow(),
)
try:
page_crawl.save()
except:
logging.error("Error adding page_crawl: %s", gus.lib.logging.strip_control_chars(page.url))
elif response.status.startswith("5"):
# permanent error status
logging.debug(
"Got permanent error: %s: %s %s",
gus.lib.logging.strip_control_chars(url),
response.status,
response.error_message,
)
page = index_error(gr, False)
page_crawl = Crawl(
page=page,
status=response.status,
is_different=False,
error_message=response.error_message,
timestamp=datetime.utcnow(),
)
try:
page_crawl.save()
except:
logging.error("Error adding page_crawl: %s", gus.lib.logging.strip_control_chars(page.url))
elif response.status.startswith("3"):
# redirect status
logging.debug(
"Got redirected: %s: %s %s",
gus.lib.logging.strip_control_chars(url),
response.status,
response.url,
)
if len(redirect_chain) > constants.MAXIMUM_REDIRECT_CHAIN_LENGTH:
logging.info(
"Aborting, maximum redirect chain length reached: %s",
gus.lib.logging.strip_control_chars(url),
)
return
redirect_resource = GeminiResource(
response.url, gr.normalized_url, gr.normalized_host
)
if redirect_resource.fetchable_url == gr.fetchable_url:
logging.info(
"Aborting, redirecting to self: %s",
gus.lib.logging.strip_control_chars(url),
)
return
page = index_redirect(gr)
page_crawl = Crawl(
page=page,
status=response.status,
is_different=False,
timestamp=datetime.utcnow(),
)
try:
page_crawl.save()
except:
logging.error("Error adding page_crawl: %s", gus.lib.logging.strip_control_chars(page.url))
index_links(gr, [redirect_resource])
crawl_page(
redirect_resource,
current_depth,
should_check_if_expired=True,
redirect_chain=redirect_chain + [gr.fetchable_url],
)
elif response.status.startswith("1"):
# input status
logging.debug(
"Input requested at: %s: %s %s",
gus.lib.logging.strip_control_chars(url),
response.status,
response.prompt,
)
page = index_prompt(gr, response)
page_crawl = Crawl(
page=page,
status=response.status,
is_different=False,
timestamp=datetime.utcnow(),
)
try:
page_crawl.save()
except:
logging.error("Error adding page_crawl: %s", gus.lib.logging.strip_control_chars(page.url))
elif response.status.startswith("2"):
# success status
logging.debug(
"Successful request: %s: %s %s",
gus.lib.logging.strip_control_chars(url),
response.status,
response.content_type,
)
if response.content_type.startswith("text/"):
page, is_different = index_content(gr, response)
page_crawl = Crawl(
page=page,
status=response.status,
is_different=is_different,
timestamp=datetime.utcnow(),
)
try:
page_crawl.save()
except:
logging.error("Error adding page_crawl: %s", gus.lib.logging.strip_control_chars(page.url))
if response.content_type != "text/gemini":
logging.debug(
"Content is not gemini text: %s: %s",
gus.lib.logging.strip_control_chars(url),
response.content_type,
)
else:
logging.debug(
"Got gemini text, extracting and crawling links: %s",
gus.lib.logging.strip_control_chars(url),
)
contained_resources = gr.extract_contained_resources(response.content)
index_links(gr, contained_resources)
for resource in contained_resources:
crawl_page(
resource, current_depth + 1, should_check_if_expired=True
)
else:
page = index_binary(gr, response)
page_crawl = Crawl(
page=page,
status=response.status,
is_different=False,
timestamp=datetime.utcnow(),
)
try:
page_crawl.save()
except:
logging.error("Error adding page_crawl: %s", gus.lib.logging.strip_control_chars(page.url))
else:
logging.warn(
"Got unhandled status: %s: %s",
gus.lib.logging.strip_control_chars(url),
response.status,
)
def load_expired_urls():
expired_pages = Page.raw(
"""SELECT url
FROM (
SELECT p.url, p.normalized_url, p.change_frequency, MAX(c.timestamp) as timestamp
FROM page as p
JOIN crawl as c
ON p.id == c.page_id
GROUP BY p.url
)
WHERE datetime(timestamp, REPLACE('fnord hours', 'fnord', change_frequency)) < datetime('now')
GROUP BY normalized_url;"""
)
return [page.url for page in expired_pages.execute()]
def load_seed_request_urls():
with open("seed-requests.txt") as f:
content = f.readlines()
# remove whitespace characters like `\n` at the end of each line
content = [x.strip() for x in content]
return content
def load_feed_urls(filename):
feeds = []
with open(filename, "r") as fp:
for line in fp:
line = line.strip()
if not line or line.startswith("#"):
continue
feeds.append(line)
return feeds
def items_from_feed_string(feed_str):
feed_obj = feedparser.parse(feed_str)
feed = feed_obj.feed
return [
(entry.updated_parsed, entry.link, entry.title, feed.title)
for entry in feed_obj.entries
]
def resolve_feed_content_urls(feed_file=constants.FEED_FILE):
# Load feed URLs to query
feed_urls = load_feed_urls(feed_file)
N = len(feed_urls)
# Prepare to extract feed items
last_accessed = {}
skips = 0
items = []
while feed_urls:
# Get a feed URL to fetch
feed_url = feed_urls.pop()
feed_resource = GeminiResource(feed_url)
# Don't hammer servers
last = last_accessed.get(feed_resource.normalized_host, 0)
now = time.time()
interval = int(now - last)
if interval < 5:
logging.warn(
"Declining to hit %s again after only %d seconds",
gus.lib.logging.strip_control_chars(feed_resource.normalized_host),
interval,
)
feed_urls.insert(0, feed_url)
skips += 1
if skips == len(feed_urls):
# We've hammered every server in the queue! Sleep a bit...
logging.warn("Sleeping to give all servers a rest!")
time.sleep(5)
continue
skips = 0
# Good to go
logging.info("Fetching feed: %s", gus.lib.logging.strip_control_chars(feed_url))
try:
resp = feed_resource.fetch()
except:
logging.info(
"Error fetching feed, skipping: %s",
gus.lib.logging.strip_control_chars(feed_url),
)
continue
if resp and resp.status == "20":
last_accessed[feed_resource.normalized_host] = time.time()
items.extend(items_from_feed_string(resp.content))
return [item[1] for item in items]
def recrawl_feeds():
content_urls = resolve_feed_content_urls()
global index_dir
index_dir = constants.INDEX_DIR
global db
db = init_db(f"{index_dir}/{constants.DB_FILENAME}")
global max_crawl_depth
max_crawl_depth = 0
global robot_file_map
robot_file_map = {}
global domain_hit_timings
domain_hit_timings = {}
seed_resources = [GeminiResource(url) for url in content_urls]
for resource in seed_resources:
crawl_page(resource, 0)
logging.debug(
"Recrawled feeds: %s", gus.lib.logging.strip_control_chars(content_urls)
)
logging.info("Finished!")
def run_crawl(should_run_destructive=False, seed_urls=[]):
global index_dir
index_dir = constants.INDEX_DIR_NEW if should_run_destructive else constants.INDEX_DIR
pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
global db
db = init_db(f"{index_dir}/{constants.DB_FILENAME}")
global robot_file_map
robot_file_map = {}
global domain_hit_timings
domain_hit_timings = {}
global max_crawl_depth
max_crawl_depth = 500
global failure_count
failure_count = {}
expired_resources = [GeminiResource(url) for url in load_expired_urls()]
for resource in expired_resources:
crawl_page(resource, 0, should_check_if_expired=False)
submitted_resources = [GeminiResource(url) for url in load_seed_request_urls()]
for resource in submitted_resources:
crawl_page(resource, 0, should_check_if_expired=True)
logging.info("Finished!")
def main():
args = parse_args()
gus.lib.logging.handle_arguments(args)
if args.should_recrawl_feeds:
recrawl_feeds()
else:
run_crawl(args.should_run_destructive, seed_urls=args.seed_urls)
def parse_args():
parser = argparse.ArgumentParser(description="Crawl Geminispace.")
parser.add_argument(
"--destructive",
"-d",
dest="should_run_destructive",
action="store_true",
default=False,
help="create a fresh index and perform a full Geminispace crawl",
)
parser.add_argument(
"--feeds",
"-f",
dest="should_recrawl_feeds",
action="store_true",
default=False,
help="recrawl known atom feeds",
)
parser.add_argument(
"--seeds",
"-s",
metavar="URL",
dest="seed_urls",
nargs="+",
default=[],
help="one or more URLs with which to extend the seeds of the crawl",
)
gus.lib.logging.add_arguments(parser)
args = parser.parse_args()
return args
if __name__ == "__main__":
main()