Skip to content

Commit 719863d

Browse files
authored
Merge pull request #16 from pyper-dev/dev
improve generator behaviour and update docs/examples
2 parents 144517c + a285ed6 commit 719863d

File tree

9 files changed

+290
-54
lines changed

9 files changed

+290
-54
lines changed

docs/src/docs/ApiReference/task.md

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,20 @@ Pipelines created this way can be [composed](../UserGuide/ComposingPipelines) in
4747
* **type:** `Optional[Callable]`
4848
* **default:** `None`
4949

50-
The function or callable object defining the logic of the task. Does not need to be passed explicitly if using `@task` as a decorator.
50+
The function or callable object defining the logic of the task. This is a positional-only parameter.
5151

5252
```python
5353
from pyper import task
5454

5555
@task
5656
def add_one(x: int):
5757
return x + 1
58+
59+
# OR
60+
def add_one(x: int):
61+
return x + 1
62+
63+
pipeline = task(add_one)
5864
```
5965

6066
{: .text-beta}
@@ -76,16 +82,14 @@ if __name__ == "__main__":
7682
pipeline1 = task(create_data)
7783
for output in pipeline1(0):
7884
print(output)
79-
# Prints:
80-
# [1, 2, 3]
85+
#> [1, 2, 3]
8186

8287
pipeline2 = task(create_data, branch=True)
8388
for output in pipeline2(0):
8489
print(output)
85-
# Prints:
86-
# 1
87-
# 2
88-
# 3
90+
#> 1
91+
#> 2
92+
#> 3
8993
```
9094

9195
This can be applied to generator functions (or async generator functions) to submit outputs lazily:
@@ -102,10 +106,9 @@ if __name__ == "__main__":
102106
pipeline = task(create_data, branch=True)
103107
for output in pipeline(0):
104108
print(output)
105-
# Prints:
106-
# 1
107-
# 2
108-
# 3
109+
#> 1
110+
#> 2
111+
#> 3
109112
```
110113

111114
{: .text-beta}
@@ -135,10 +138,9 @@ if __name__ == "__main__":
135138
pipeline = create_data | running_total
136139
for output in pipeline(0):
137140
print(output)
138-
# Prints:
139-
# 1
140-
# 3
141-
# 6
141+
#> 1
142+
#> 3
143+
#> 6
142144
```
143145

144146
{: .warning}
@@ -259,10 +261,9 @@ if __name__ == "__main__":
259261
)
260262
for output in pipeline(1, 4):
261263
print(output)
262-
# Prints:
263-
# 10
264-
# 20
265-
# 30
264+
#> 10
265+
#> 20
266+
#> 30
266267
```
267268

268269
Given that each producer-consumer expects to be given one input argument, the purpose of the `bind` parameter is to allow functions to be defined flexibly in terms of the inputs they wish to take, as well as allowing tasks to access external states, like contexts.
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
---
2+
title: Chess Data Analysis
3+
parent: Examples
4+
layout: default
5+
nav_order: 1
6+
permalink: /docs/Examples/ChessDataAnalysis
7+
---
8+
9+
# Chess Data Analysis
10+
{: .no_toc }
11+
12+
* TOC
13+
{:toc}
14+
15+
## Problem Statement
16+
17+
Let's look at a very simple example of collecting some data and doing something with it. We will:
18+
19+
* Build a pipeline to download a player's game data for the past few months from the [chess.com API](https://www.chess.com/news/view/published-data-api)
20+
* Use the `python-chess` package to parse the PGN game data
21+
* Use `pandas` to do some basic opening win-rate analysis
22+
23+
## Setup
24+
25+
This is a standalone script. Python package requirements are specified in `requirements.txt`
26+
27+
**See the [source code](https://github.com/pyper-dev/pyper/tree/main/examples/ChessDataAnalysis) for this example** _(always review code before running it on your own machine)_
28+
29+
## Implementation
30+
31+
To collect the data we need, we will use the chess.com API's monthly multigame PGN download endpoint, which has the url format:
32+
33+
```
34+
https://api.chess.com/pub/player/player-name/games/YYYY/MM/pgn
35+
```
36+
37+
Firstly, we define a helper function to generate these urls for the most recent months:
38+
39+
```python
40+
def generate_urls_by_month(player: str, num_months: int):
41+
"""Define a series of pgn game resource urls for a player, for num_months recent months."""
42+
today = datetime.date.today()
43+
for i in range(num_months):
44+
d = today - relativedelta(months=i)
45+
yield f"https://api.chess.com/pub/player/{player}/games/{d.year}/{d.month:02}/pgn"
46+
```
47+
48+
We also need a function to fetch the raw data from each url.
49+
50+
```python
51+
def fetch_text_data(url: str, session: requests.Session):
52+
"""Fetch text data from a url."""
53+
r = session.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"})
54+
return r.text
55+
```
56+
57+
Each PGN dataset consists of data for multiple games. We'll create a function called `read_game_data` to extract individual game details as dictionaries.
58+
59+
```python
60+
def _clean_opening_name(eco_url: str):
61+
"""Get a rough opening name from the chess.com ECO url."""
62+
name = eco_url.removeprefix("https://www.chess.com/openings/")
63+
return " ".join(name.split("-")[:2])
64+
65+
66+
def read_game_data(pgn_text: str, player: str):
67+
"""Read PGN data and generate game details (each PGN contains details for multiple games)."""
68+
pgn = io.StringIO(pgn_text)
69+
while (headers := chess.pgn.read_headers(pgn)) is not None:
70+
color = 'W' if headers["White"].lower() == player else 'B'
71+
72+
if headers["Result"] == "1/2-1/2":
73+
score = 0.5
74+
elif (color == 'W' and headers["Result"] == "1-0") or (color == 'B' and headers["Result"] == "0-1"):
75+
score = 1
76+
else:
77+
score = 0
78+
79+
yield {
80+
"color": color,
81+
"score": score,
82+
"opening": _clean_opening_name(headers["ECOUrl"])
83+
}
84+
```
85+
86+
Finally, we need some logic to handle the data analysis (which we're keeping very barebones).
87+
Let's dump the data into a pandas dataframe and print a table showing:
88+
89+
* average score grouped by chess opening
90+
* where the player plays the white pieces
91+
* ordered by total games
92+
93+
```python
94+
def build_df(data: typing.Iterable[dict]) -> pd.DataFrame:
95+
df = pd.DataFrame(data)
96+
df = df[df["color"] == 'W']
97+
df = df.groupby("opening").agg(total_games=("score", "count"), average_score=("score", "mean"))
98+
df = df.sort_values(by="total_games", ascending=False)
99+
return df
100+
```
101+
102+
All that's left is to piece everything together.
103+
104+
Note that the Pyper framework hasn't placed any particular restrictions on the way our 'business logic' is implemented. We can use Pyper to simply compose together these logical functions into a concurrent pipeline, with minimal code coupling.
105+
106+
In the pipeline, we will:
107+
108+
1. Set `branch=True` for `generate_urls_by_month`, to allow this task to generate multiple outputs
109+
2. Create 3 workers for `fetch_text_data`, so that we can wait on requests concurrently
110+
3. Set `branch=True` for `read_game_data` also, as this generates multiple dictionaries
111+
4. Let the `build_df` function consume all output generated by this pipeline
112+
113+
```python
114+
def main():
115+
player = "hikaru"
116+
num_months = 6 # Keep this number low, or add sleeps for etiquette
117+
118+
with requests.Session() as session:
119+
run = (
120+
task(generate_urls_by_month, branch=True)
121+
| task(
122+
fetch_text_data,
123+
workers=3,
124+
bind=task.bind(session=session))
125+
| task(
126+
read_game_data,
127+
branch=True,
128+
bind=task.bind(player=player))
129+
> build_df
130+
)
131+
df = run(player, num_months)
132+
print(df.head(10))
133+
```
134+
135+
With no more lines of code than it would have taken to define a series of sequential for-loops, we've defined a concurrently executable data flow!
136+
137+
We can now run everything to see the result of our analysis:
138+
139+
```
140+
opening total_games average_score
141+
142+
Nimzowitsch Larsen 244 0.879098
143+
Closed Sicilian 205 0.924390
144+
Caro Kann 157 0.882166
145+
Bishops Opening 156 0.900641
146+
French Defense 140 0.846429
147+
Sicilian Defense 127 0.877953
148+
Reti Opening 97 0.819588
149+
Vienna Game 71 0.929577
150+
English Opening 61 0.868852
151+
Scandinavian Defense 51 0.862745
152+
```

docs/src/docs/UserGuide/AdvancedConcepts.md

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ IO-bound tasks benefit from both concurrent and parallel execution.
3939
However, to avoid the overhead costs of creating processes, it is generally preferable to use either threading or async code.
4040

4141
{: .info}
42-
Threads incur a higher overhead cost compared to async coroutines, but are suitable if the function / application prefers or requires a synchronous implementation
42+
Threads incur a higher overhead cost compared to async coroutines, but are suitable if the task prefers or requires a synchronous implementation
4343

4444
Note that asynchronous functions need to `await` or `yield` something in order to benefit from concurrency.
4545
Any long-running call in an async task which does not yield execution will prevent other tasks from making progress:
@@ -115,30 +115,32 @@ In Pyper, it is especially important to separate out different types of work int
115115

116116
```python
117117
# Bad -- functions not separated
118-
@task(workers=20)
118+
@task(branch=True, workers=20)
119119
def get_data(endpoint: str):
120120
# IO-bound work
121121
r = requests.get(endpoint)
122122
data = r.json()
123123

124124
# CPU-bound work
125-
return process_data(data)
125+
for item in data["results"]:
126+
yield process_data(item)
126127
```
127128

128129
Whilst it makes sense to handle the network request concurrently, the call to `process_data` within the same task is blocking and will harm concurrency.
129-
Instead, `process_data` can be implemented as a separate task:
130+
Instead, `process_data` should be implemented as a separate function:
130131

131132
```python
132-
@task(workers=20)
133+
@task(branch=True, workers=20)
133134
def get_data(endpoint: str):
134135
# IO-bound work
135136
r = requests.get(endpoint)
136-
return r.json()
137+
data = r.json()
138+
return data["results"]
137139

138140
@task(workers=10, multiprocess=True)
139141
def process_data(data):
140142
# CPU-bound work
141-
...
143+
return ...
142144
```
143145

144146
### Resource Management
@@ -254,16 +256,14 @@ if __name__ == "__main__":
254256
branched_pipeline = task(get_data, branch=True)
255257
for output in branched_pipeline():
256258
print(output)
257-
# Prints:
258-
# 1
259-
# 2
260-
# 3
259+
#> 1
260+
#> 2
261+
#> 3
261262

262263
non_branched_pipeline = task(get_data)
263264
for output in non_branched_pipeline():
264265
print(output)
265-
# Prints:
266-
# <generator object get_data at ...>
266+
#> <generator object get_data at ...>
267267
```
268268

269269
### Limitations
@@ -281,11 +281,13 @@ Generator functions, which return _immediately_, do most of their work outside o
281281

282282
The alternatives are to:
283283

284-
1. Use a synchronous generator anyway (if its performance is unlikely to be a bottleneck)
284+
1. Refactor your functions. If you find that one function is repeating a computation multiple times, it may be possible to [separate out responsibilities](#logical-separation) into separate functions
285+
286+
2. Use a synchronous generator anyway (if its performance is unlikely to be a bottleneck)
285287

286-
2. Use a normal synchronous function, and return an iterable data structure (if memory is unlikely to be a bottleneck)
288+
3. Use a normal synchronous function, and return an iterable data structure (if memory is unlikely to be a bottleneck)
287289

288-
3. Use an async generator (if an async implementation of the function is appropriate)
290+
4. Use an async generator (if an async implementation of the function is appropriate)
289291

290292
{: .text-green-200}
291293
**Multiprocessing and Pickling**

docs/src/docs/UserGuide/ComposingPipelines.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ This represents defining a new function that:
4040
if __name__ == "__main__":
4141
for output in new_pipeline(4):
4242
print(output)
43-
# Prints:
44-
# 9
43+
#> 9
4544
```
4645

4746
## Consumer Functions and the `>` Operator

docs/src/docs/UserGuide/CreatingPipelines.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ if __name__ == "__main__":
7070
pipeline = task(func)
7171
for output in pipeline(x=0):
7272
print(output)
73-
# Prints:
74-
# 1
73+
#> 1
7574
```
7675

7776
{: .info}
@@ -91,10 +90,9 @@ if __name__ == "__main__":
9190
pipeline = task(func, branch=True)
9291
for output in pipeline(x=0):
9392
print(output)
94-
# Prints:
95-
# 1
96-
# 2
97-
# 3
93+
#> 1
94+
#> 2
95+
#> 3
9896
```
9997

10098
## Asynchronous Code
@@ -112,8 +110,7 @@ async def main():
112110
pipeline = task(func)
113111
async for output in pipeline(x=0):
114112
print(output)
115-
# Prints:
116-
# 1
113+
#> 1
117114

118115
if __name__ == "__main__":
119116
asyncio.run(main())

0 commit comments

Comments
 (0)