-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtasks.py
More file actions
500 lines (389 loc) · 14 KB
/
tasks.py
File metadata and controls
500 lines (389 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
import datetime
import os
import shutil
import sys
from pathlib import Path
from textwrap import dedent
import docutils.frontend
import docutils.nodes
import docutils.parsers.rst
import docutils.utils
import yaml
from invoke.main import program
from invoke.tasks import call, task
from livereload.server import shlex
from pelican import main as pelican_main
from pelican.server import ComplexHTTPRequestHandler, RootedHTTPServer
from pelican.settings import DEFAULT_CONFIG, get_settings_from_file
from rich.console import Console
from rich.prompt import Prompt
from rich.table import Table
from slugify import slugify
def parse_rst(text: str) -> docutils.nodes.document:
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
document = docutils.utils.new_document("<rst-doc>", settings=settings)
parser.parse(text, document)
return document
OPEN_BROWSER_ON_SERVE = True
SETTINGS_FILE_BASE = "pelicanconf.py"
SETTINGS = {}
SETTINGS.update(DEFAULT_CONFIG)
LOCAL_SETTINGS = get_settings_from_file(SETTINGS_FILE_BASE)
SETTINGS.update(LOCAL_SETTINGS)
CONFIG = {
"settings_base": SETTINGS_FILE_BASE,
"settings_publish": "publishconf.py",
# Output path. Can be absolute or relative to tasks.py. Default: 'output'
"deploy_path": SETTINGS["OUTPUT_PATH"],
# Host and port for `serve`
"host": "localhost",
"port": 8000,
"s3_bucket": "ideas.offby1.net",
"cloudfrount_distribution_id": "E3HG7SIR4ZZAS1",
}
console = Console()
@task
def clean(c):
"""Remove generated files"""
if os.path.isdir(CONFIG["deploy_path"]):
shutil.rmtree(CONFIG["deploy_path"])
os.makedirs(CONFIG["deploy_path"])
@task
def build(c, production=True, delete=True, output_path=None):
"""Build the site"""
settings = CONFIG["settings_publish"] if production else CONFIG["settings_base"]
flags = []
if delete:
flags.append("-d")
if output_path:
flags.append("-o")
flags.append(output_path)
c.run(f"pelican {' '.join(flags)} -s {settings}")
@task(pre=[call(build, production=False, delete=True)])
def rebuild(c):
"""`build` with the delete switch"""
...
@task
def regenerate(c):
"""Automatically regenerate site upon file modification"""
c.run("pelican -r -s {settings_base}".format(**CONFIG))
@task(call(build, production=False))
def serve(c):
"""Serve site at http://localhost:$PORT/ (default port is 8000)"""
class AddressReuseTCPServer(RootedHTTPServer):
allow_reuse_address = True
server = AddressReuseTCPServer(
CONFIG["deploy_path"], ("", CONFIG["port"]), ComplexHTTPRequestHandler
)
sys.stderr.write("Serving on port {port} ...\n".format(**CONFIG))
try:
server.serve_forever()
except KeyboardInterrupt:
pass
@task
def devserver(c):
c.run(f"pelican -D -l -r --port {CONFIG['port']}")
@task(build)
def preview(c):
"""Build production version of site"""
...
@task
def livereload(c):
"""Automatically reload browser tab upon file modification."""
from livereload import Server
def cached_build():
cmd = "-s {settings_base} -e CACHE_CONTENT=true LOAD_CONTENT_CACHE=true"
pelican_run(cmd.format(**CONFIG))
cached_build()
server = Server()
theme_path = SETTINGS["THEME"]
watched_globs = [
CONFIG["settings_base"],
f"{theme_path}/templates/**/*.html",
f"{theme_path}/templates/**/*.j2",
]
content_file_extensions = [".md", ".rst"]
for extension in content_file_extensions:
content_glob = "{}/**/*{}".format(SETTINGS["PATH"], extension)
watched_globs.append(content_glob)
static_file_extensions = [".css", ".js"]
for extension in static_file_extensions:
static_file_glob = f"{theme_path}/static/**/*{extension}"
watched_globs.append(static_file_glob)
# Also watch the content/extra directory for JS and CSS files
extra_static_glob = f"{SETTINGS['PATH']}/extra/**/*{extension}"
watched_globs.append(extra_static_glob)
watched_globs.append(f"{theme_path}/scss/**/*.scss")
for glob in watched_globs:
server.watch(glob, cached_build)
if OPEN_BROWSER_ON_SERVE:
# Open site in default browser
import webbrowser
SERVE_HOST = os.environ.get("SERVE_HOST", "ideas.ngrok.dev")
webbrowser.open(f"https://{SERVE_HOST}/")
server.serve(host=CONFIG["host"], port=CONFIG["port"], root=CONFIG["deploy_path"])
@task
def prepare_fonts(c):
c.run(
"rsync -pthrvz node_modules/@fortawesome/fontawesome-free/webfonts/ themes/offby1/static/webfonts/"
)
@task(pre=[prepare_fonts])
def site(c):
"""generate using production settings"""
c.run(
f"pelican {SETTINGS['PATH']} -o {CONFIG['deploy_path']} -s {CONFIG['settings_publish']}"
)
@task
def upload(c):
"""Upload the site to the production S3 bucket"""
c.run(f"aws s3 sync --delete {CONFIG['deploy_path']} s3://{CONFIG['s3_bucket']}")
@task
def invalidate(c):
c.run(
f"aws cloudfront create-invalidation --distribution-id {CONFIG['cloudfrount_distribution_id']} --paths '/*'"
)
@task(pre=[site, upload], post=[invalidate])
def publish(c):
"""Runs the `site` and `upload` tasks, and then invalidates the CF distribution so the site goes live."""
...
@task
def compile_deps(c, upgrade=False):
"""Compile the pip deps to lock them"""
if upgrade:
c.run(
"pip-compile -U --no-emit-trusted-host --no-emit-index-url requirements.in"
)
return
if (
Path("requirements.txt").stat().st_mtime
< Path("requirements.in").stat().st_mtime
):
c.run("pip-compile --no-emit-trusted-host --no-emit-index-url requirements.in")
@task(compile_deps)
def deps(c):
"""Sync the dependencies into the working virtualenv"""
c.run("pip-sync requirements.txt")
@task(post=[deps], pre=[call(compile_deps, upgrade=True)])
def upgrade(c):
"""Upgrade and sync all dependencies"""
@task
def new_post(c, title=None, post_type="md"):
"""Create a blank new post in SETTINGS['PATH']"""
if title is not None:
filename_title_string = f"-{slugify(title)}"
title_string = title
else:
console.print("[bold magenta]Please enter the title:[/bold magenta]")
title = Prompt.ask("Title", default="New Post")
filename_title_string = f"-{slugify(title)}"
title_string = title
new_post_path = (
Path(SETTINGS["PATH"])
/ "posts"
/ f"{datetime.date.today().isoformat()}{filename_title_string}.{post_type}"
)
if post_type == "rst":
title_bar = "#" * len(title_string)
new_post_path.write_text(
dedent(
f"""\
{title_string}
{title_bar}
.. role:: raw-html(raw)
:format: html
:slug: {slugify(title_string)}
:date: {datetime.datetime.now().isoformat()}
:category: CATEGORY
:tags:
:author: Chris Rose
:email: offline@offby1.net
:summary:
A New Post
"""
)
)
else:
new_post_path.write_text(
dedent(
f"""\
Title: {title_string}
Slug: {slugify(title_string)}
Date: {datetime.datetime.now().isoformat()}
Tags:
Category: CATEGORY
Author: Chris Rose
Email: offline@offby1.net
Status: draft
Summary: Summarize this
"""
)
)
c.run(f"git add '{new_post_path}'")
@task
def new_post_from_template(c, template, title=None, category=None, summary=None):
"""Create a new post from a template (photo-essay, til, hero-image, plain-text)"""
import re
template_map = {
"photo-essay": "photo-essay.md",
"til": "til.md",
"hero-image": "hero-image.md",
"plain-text": "plain-text.md",
}
if template not in template_map:
console.print(f"[bold red]Error:[/bold red] Unknown template '{template}'")
console.print(f"Available templates: {', '.join(template_map.keys())}")
return
# Get title if not provided
if title is None:
console.print("[bold magenta]Please enter the title:[/bold magenta]")
title = Prompt.ask("Title", default="New Post")
# Get category if not provided
if category is None:
console.print("[bold magenta]Please enter the category:[/bold magenta]")
category = Prompt.ask("Category", default="general-thoughts")
# Get summary if not provided
if summary is None:
console.print("[bold magenta]Please enter a summary:[/bold magenta]")
summary = Prompt.ask("Summary", default="A new post")
# Read template
template_path = (
Path(__file__).parent / "content" / "templates" / template_map[template]
)
template_content = template_path.read_text()
# Generate new values
slug = slugify(title)
date = datetime.datetime.now().isoformat()
# Update frontmatter fields using regex
content = re.sub(
r"^title:.*$", f"title: {title}", template_content, flags=re.MULTILINE
)
content = re.sub(r"^slug:.*$", f"slug: {slug}", content, flags=re.MULTILINE)
content = re.sub(r"^date:.*$", f"date: {date}", content, flags=re.MULTILINE)
content = re.sub(
r"^category:.*$", f"category: {category}", content, flags=re.MULTILINE
)
content = re.sub(
r"^summary:.*$", f"summary: {summary}", content, flags=re.MULTILINE
)
# Remove 'templates' tag from the tags list
content = re.sub(r"^ - templates\n", "", content, flags=re.MULTILINE)
# Create new post file
new_post_path = (
Path(SETTINGS["PATH"])
/ "posts"
/ f"{datetime.date.today().isoformat()}-{slug}.md"
)
new_post_path.write_text(content)
console.print(f"[bold green]Created:[/bold green] {new_post_path}")
c.run(f"git add '{new_post_path}'")
class Visitor(docutils.nodes.NodeVisitor):
def __init__(self, doc):
super().__init__(doc)
self.fields = {}
def visit_field(self, field: docutils.nodes.Node) -> None:
self.fields[field.children[0].astext()] = field.children[1].astext()
def unknown_visit(self, node: docutils.nodes.Node) -> None:
# print(node.pformat())
...
def get_tags(doc: docutils.nodes.document) -> list[str]:
v = Visitor(doc)
doc.walk(v)
return [t.strip() for t in v.fields["tags"].split(",") if t.strip()]
def get_category(doc: docutils.nodes.document) -> str:
v = Visitor(doc)
doc.walk(v)
return v.fields["category"].strip()
def content_paths(relative="content/posts", extensions=(".rst", ".md")):
for root, _, files in os.walk(relative):
for f in files:
p = Path(root) / f
if p.suffix not in extensions:
continue
yield p
def parse_yaml_frontmatter(text: str) -> dict:
"""Parse YAML frontmatter from a Markdown file."""
if not text.startswith("---"):
return {}
end = text.find("---", 3)
if end == -1:
return {}
return yaml.safe_load(text[3:end]) or {}
def get_metadata(path: Path) -> dict:
"""Extract tags and category from a content file, handling both RST and Markdown."""
text = path.read_text()
if path.suffix == ".md":
fm = parse_yaml_frontmatter(text)
tags_raw = fm.get("tags", [])
if isinstance(tags_raw, list):
tags = [str(t).strip() for t in tags_raw if str(t).strip()]
else:
tags = [t.strip() for t in str(tags_raw).split(",") if t.strip()]
category = str(fm.get("category", "")).strip()
return {"tags": tags, "category": category}
else:
doc = parse_rst(text)
return {"tags": get_tags(doc), "category": get_category(doc)}
@task
def list_tags(c):
all_tags: dict[str, set[str]] = {}
for p in content_paths():
try:
meta = get_metadata(p)
except Exception:
print(f"Unable to parse tags from {p}")
continue
for t in meta["tags"]:
all_tags.setdefault(t, set()).add(meta["category"])
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Tag")
table.add_column("Categories")
for t, v in sorted(all_tags.items()):
table.add_row(t, ",".join(v))
console.print(table)
@task
def list_categories(c):
categories = []
for p in content_paths():
try:
meta = get_metadata(p)
except Exception:
print(f"Unable to parse categories from {p}")
continue
category = meta["category"]
if category and category not in categories:
categories.append(category)
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Category")
for c in sorted(categories):
table.add_row(c)
console.print(table)
@task
def photo_gallery_gen(c, location):
"""Create gallery metadata files."""
fmt_path = Path(__file__).parent / "config"
location = Path(location)
with c.cd(location):
exif_cmd = "exiftool -if '$filename !~ /\\.txt$$/'"
if not (location / "exif.txt").exists():
c.run(f"{exif_cmd} -f -p {fmt_path}/exif.fmt . | sort > exif.txt")
else:
console.print("[red]Skipping already present file [bold]exif.txt")
if not (location / "captions.txt").exists():
c.run(f"{exif_cmd} -f -p {fmt_path}/captions.fmt . | sort > captions.txt")
else:
console.print("[red]Skipping already present file [bold]captions.txt")
@task
def show_hcard(c, page="index.html"):
"""Show the current hcards for the index page"""
import mf2py
index = Path(__file__).parent / "output" / page
with index.open(mode="r") as fh:
mf = mf2py.parse(doc=fh)
console.print(mf)
def pelican_run(cmd):
cmd += " " + program.core.remainder # allows to pass-through args to pelican
pelican_main(shlex.split(cmd))