Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install:

.PHONY: run map
map:
python examples/map.py
python examples/tasks/map/map.py
run: map

.PHONY: benchmark_match, benchmark_mapper, test
Expand Down
1 change: 1 addition & 0 deletions examples/kafka/s2_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# User data to be sent
user_data = {"name": "John Doe", "age": 28}
print("User:", user_data)

# Produce message
producer.produce(topic='user-info', key=str(user_data['name']), value=user_data)
Expand Down
1 change: 1 addition & 0 deletions examples/kafka/s4_producer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

# Updated user data to be sent with the new schema
user_data = {"name": "Jane Doe", "age": 27, "email": "janedoe@example.com"}
print("User:", user_data)

# Produce message
producer.produce(topic='user-info', key=str(user_data['name']), value=user_data)
Expand Down
34 changes: 33 additions & 1 deletion examples/tasks/assemble/assemble.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,40 @@
import pandas as pd

import llmint
from llmint.assemble.pandas import assemble


def main():
llmint.assemble()
source_schema = '''
{
"fields": [
{"name": "Fname", "type": "string"},
{"name": "Lname", "type": "string"},
{"name": "Age", "type": "int"},
{"name": "Email", "type": ["null", "string"], "default": null}
]
}
'''
target_schema = '''
{
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
'''


source_df = pd.DataFrame([{"Fname": "Josh", "Lname": "Doe", "Age": 31, "Email": "joshdoe@example.com"}])
dest_df = pd.DataFrame([{"name": "Jane Doe", "age": 27, "email": "janedoe@example.com"}])
print("Source:", source_df, sep="\n")
print("Dest:", dest_df, sep="\n")

mappings = llmint.map(source_df, dest_df)
output = assemble(source_df, mappings)
combined_df = pd.concat([dest_df, output], axis=0)
print("Combined:", combined_df, sep="\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very neat / nice example for schema alignment for static datasets (known records with fixed size).

Could we have perhaps another example demonstrating this in the message broker setting?

E.g., having a continuous sending data source and the assembled data flow - once "in stalled" to the data path - able to process incoming data.



if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions llmint/assemble/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from llmint.assemble.pandas.function import assemble
36 changes: 36 additions & 0 deletions llmint/assemble/pandas/function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pandas as pd

from llmint.assemble.pandas.transform import (
add, cast, copy, default, delete, missing, rename, apply, link, scale, shift
)
from llmint.map.function import Map

def assemble(df: pd.DataFrame, mappings: list[Map]):
df_outputs = []

for mapping in mappings:
match mapping.transformation.split(' ')[0]:
case 'ADD':
df_outputs.append(add(df, mapping))
case 'CAST':
df_outputs.append(cast(df, mapping))
case 'COPY':
df_outputs.append(copy(df, mapping))
case 'DEFAULT':
df_outputs.append(default(df, mapping))
case 'DELETE':
df_outputs.append(delete(df, mapping))
case 'MISSING':
df_outputs.append(missing(df, mapping))
case 'RENAME':
df_outputs.append(rename(df, mapping))
case 'APPLY':
df_outputs.append(apply(df, mapping))
case 'LINK':
df_outputs.append(link(df, mapping))
case 'SCALE':
df_outputs.append(scale(df, mapping))
case 'SHIFT':
df_outputs.append(shift(df, mapping))

return pd.concat(df_outputs, axis=1)
12 changes: 12 additions & 0 deletions llmint/assemble/pandas/transform/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from llmint.assemble.pandas.transform.field.add import func as add
from llmint.assemble.pandas.transform.field.cast import func as cast
from llmint.assemble.pandas.transform.field.copy import func as copy
from llmint.assemble.pandas.transform.field.default import func as default
from llmint.assemble.pandas.transform.field.delete import func as delete
from llmint.assemble.pandas.transform.field.missing import func as missing
from llmint.assemble.pandas.transform.field.rename import func as rename

from llmint.assemble.pandas.transform.value.apply import func as apply
from llmint.assemble.pandas.transform.value.link import func as link
from llmint.assemble.pandas.transform.value.scale import func as scale
from llmint.assemble.pandas.transform.value.shift import func as shift
Empty file.
10 changes: 10 additions & 0 deletions llmint/assemble/pandas/transform/field/add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import re
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
col_type = re.search(r'TYPE (\w+)', mapping.transformation).group(1)

return Series([], name=mapping.target_field, dtype=col_type)
7 changes: 7 additions & 0 deletions llmint/assemble/pandas/transform/field/cast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
pass
8 changes: 8 additions & 0 deletions llmint/assemble/pandas/transform/field/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import re
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
return Series(df[mapping.source_field], name=mapping.target_field)
10 changes: 10 additions & 0 deletions llmint/assemble/pandas/transform/field/default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import re
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
default_val = re.search(r'DEFAULT TO (.*)', mapping.transformation).group(1)

return Series([default_val] * len(df), name=mapping.target_field)
7 changes: 7 additions & 0 deletions llmint/assemble/pandas/transform/field/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
pass
7 changes: 7 additions & 0 deletions llmint/assemble/pandas/transform/field/missing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
pass
7 changes: 7 additions & 0 deletions llmint/assemble/pandas/transform/field/rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
pass
Empty file.
16 changes: 16 additions & 0 deletions llmint/assemble/pandas/transform/value/apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import re
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
apply_func = re.search(r'APPLY (.*)', mapping.transformation).group(1)

# assign all columns to their own variables
for col in df.columns:
exec(f'{col.replace(" ", "_")} = df[col]', locals(), globals())

exec(f'_output = {apply_func}', locals(), globals())

return Series(_output, name=mapping.target_field)
7 changes: 7 additions & 0 deletions llmint/assemble/pandas/transform/value/link.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
pass
13 changes: 13 additions & 0 deletions llmint/assemble/pandas/transform/value/scale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import re
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
try:
scale = float(re.search(r'SCALE BY (\d*.\d*)', mapping.transformation).group(1))
except ValueError:
return df[mapping.source_field].copy()

return df[mapping.source_field] * scale
13 changes: 13 additions & 0 deletions llmint/assemble/pandas/transform/value/shift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import re
from pandas import Series, DataFrame

from llmint.map.function import Map


def func(df: DataFrame, mapping: Map):
try:
shift = float(re.search(r'SHIFT BY (\d*.\d*)', mapping.transformation).group(1))
except ValueError:
return df[mapping.source_field].copy()

return df[mapping.source_field] + shift
21 changes: 11 additions & 10 deletions llmint/core/eval.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from llmint.map.function import Map


class pcolors:
RIGHT = '\033[92m'
WRONG = '\033[91m'
Expand Down Expand Up @@ -43,13 +46,11 @@ def accuracy(output: list, test_example: list):
f1 = 0
return precision, recall, f1

def print_mappings(mappings: dict, include_reasoning=True):
for name, response in mappings.items():
mapping, reasoning = response
if include_reasoning:

print(pcolors.RIGHT + mapping + pcolors.ENDC + '\n',
reasoning, flush=True)
else:
print(pcolors.RIGHT + mapping + pcolors.ENDC,
flush=True)
def print_mappings(mappings: list[Map], include_reasoning=True):
for mapping in mappings:
if include_reasoning:
print(pcolors.RIGHT + mapping.__dict__ + pcolors.ENDC + '\n',
mapping.reasoning, flush=True)
else:
print(pcolors.RIGHT + mapping.__dict__ + pcolors.ENDC,
flush=True)
21 changes: 19 additions & 2 deletions llmint/map/function.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from pydantic import BaseModel

from llmint.core import model
from llmint.map import prompt, parameter


class Map(BaseModel):
source_field: str | None
target_field: str
transformation: str
reasoning: str | None


def map(source_schema, target_schema):
mappings = model.call(
output = model.call(
prompt=[
{"role": "system", "content": prompt.system},
{"role": "user", "content": prompt.user.format(
Expand All @@ -15,5 +25,12 @@ def map(source_schema, target_schema):
temperature=parameter.temperature,
seed=parameter.seed,
max_model_call=1, # only one model call
)["tool_outputs"][0] # take the first tool output
)["tool_outputs"]

# process the mappings
mappings = []
for mapping in output:
for _, mapping in mapping.items():
mappings.append(mapping)

return mappings
9 changes: 7 additions & 2 deletions llmint/map/transform/field/add.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from llmint.map.function import Map


name = "ADD"
schema = {
"type": "function",
Expand Down Expand Up @@ -27,5 +30,7 @@


def func(target_field, field_type, reasoning):
return (f'{{from: None, to: {target_field}, '
f'transformation: ADD {target_field} TYPE {field_type}}}', reasoning)
return Map(source_field=None,
target_field=target_field,
transformation=f'ADD {target_field} TYPE {field_type}',
reasoning=reasoning)
9 changes: 7 additions & 2 deletions llmint/map/transform/field/cast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from llmint.map.function import Map


name = "CAST"
schema = {
"type": "function",
Expand Down Expand Up @@ -35,5 +38,7 @@


def func(source_field, target_field, source_type, target_type, reasoning):
return (f'{{from: {source_field}, to: {target_field}, '
f'transformation: CAST {source_field} FROM {source_type} TO {target_type}}}', reasoning)
return Map(source_field=source_field,
target_field=target_field,
transformation=f'CAST FROM {source_type} TO {target_type}',
reasoning=reasoning)
9 changes: 7 additions & 2 deletions llmint/map/transform/field/copy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from llmint.map.function import Map


name = "COPY"
schema = {
"type": "function",
Expand Down Expand Up @@ -28,5 +31,7 @@


def func(source_field, target_field, reasoning):
return (f'{{from: {source_field}, to: {target_field}, '
f'transformation: COPY}}', reasoning)
return Map(source_field=source_field,
target_field=target_field,
transformation=f'COPY',
reasoning=reasoning)
10 changes: 7 additions & 3 deletions llmint/map/transform/field/default.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from llmint.map.function import Map


name = "DEFAULT"
schema = {
"type": "function",
Expand Down Expand Up @@ -31,6 +34,7 @@


def func(source_field, target_field, default_value, reasoning):
return (
f'{{from: {source_field}, to: {target_field}, '
f'transformation: DEFAULT {target_field} TO {default_value}}}', reasoning)
return Map(source_field=source_field,
target_field=target_field,
transformation=f'DEFAULT TO {default_value}',
reasoning=reasoning)
9 changes: 7 additions & 2 deletions llmint/map/transform/field/delete.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from llmint.map.function import Map


name = "DELETE"
schema = {
"type": "function",
Expand All @@ -23,5 +26,7 @@


def func(source_field, reasoning):
return (f'{{from: {source_field}, to: None, '
f'transformation: DELETE {source_field}}}', reasoning)
return Map(source_field=source_field,
target_field=None,
transformation=f'DELETE',
reasoning=reasoning)
Loading