-
Notifications
You must be signed in to change notification settings - Fork 82
Example notebook for Gaming industry #362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
0e05c80
Add files via upload
ManishNamburi 5717660
Update VideoGameLoginSyntheticDataGeneration.py
ManishNamburi c263500
Update and rename VideoGameLoginSyntheticDataGeneration.py to gaming_…
ManishNamburi 8e1e391
Merge branch 'master' into feat_gaming_demo
ManishNamburi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
271 changes: 271 additions & 0 deletions
271
examples/notebooks/VideoGameLoginSyntheticDataGeneration.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,271 @@ | ||
| # Databricks notebook source | ||
| # MAGIC %md | ||
| # MAGIC | ||
| # MAGIC # Getting Started with the Databricks Labs Data Generator | ||
| # MAGIC | ||
| # MAGIC This notebook provides an introduction to synthetic data generation using the [Databricks Labs Data Generator (`dbldatagen`)](https://databrickslabs.github.io/dbldatagen/public_docs/index.html). This data generator is useful for generating large synthetic datasets for development, testing, benchmarking, proofs-of-concept, and other use-cases. | ||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # DBTITLE 1,Install dbldatagen | ||
| # MAGIC %pip install dbldatagen | ||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # DBTITLE 1,Import Modules | ||
| import dbldatagen as dg | ||
| import dbldatagen.distributions as dist | ||
|
|
||
| from pyspark.sql.types import IntegerType, FloatType, StringType, TimestampType, BooleanType, LongType, ArrayType | ||
| from pyspark.sql.functions import current_timestamp | ||
|
|
||
| import random | ||
| import string | ||
| from datetime import datetime | ||
| from pyspark.sql.functions import col, format_string, expr, sha2 | ||
ManishNamburi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # DBTITLE 1,Set up Parameters | ||
| # Set up how many rows we want along with how many users, devices and IPs we want | ||
| ROW_COUNT=4144000 | ||
| NUMBER_OF_USERS=200000 | ||
| NUMBER_OF_DEVICES= NUMBER_OF_USERS+50000 | ||
| NUMBER_OF_IPS=40000 | ||
| # ROW_COUNT=1000000000 | ||
| # NUMBER_OF_USERS=2000000 | ||
| # NUMBER_OF_DEVICES= NUMBER_OF_USERS+500000 | ||
| # NUMBER_OF_IPS=400000 | ||
| START_TIMESTAMP="2025-03-01 00:00:00" | ||
| END_TIMESTAMP="2025-03-30 00:00:00" | ||
ManishNamburi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # MAGIC %md | ||
| # MAGIC ## Data Generation Specifications | ||
| # MAGIC | ||
| # MAGIC Let's start by generating a DataFrame with rows representing unique login information. Data generation is controlled by a `DataGenerator` object. Each `DataGenerator` can be extended with rules specifying the output schema and value generation. | ||
ManishNamburi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # DBTITLE 1,Generate a DataFrame | ||
| default_annotations_spec = ( | ||
| dg.DataGenerator( | ||
| spark, | ||
| name="default_annotations_spec", | ||
| rows=ROW_COUNT | ||
| ) | ||
| .withIdOutput() # Add a unique id column for each row | ||
| .withColumn( | ||
| "EVENT_TIMESTAMP", | ||
| TimestampType(), | ||
| data_range=dg.DateRange(START_TIMESTAMP, END_TIMESTAMP, "seconds=1"), | ||
| random=True, | ||
| ) # Random event timestamp within the specified range | ||
| .withColumn( | ||
| "internal_ACCOUNTID", | ||
| LongType(), | ||
| minValue=0x1000000000000, | ||
| uniqueValues=NUMBER_OF_USERS, | ||
| omit=True, | ||
| baseColumnType="hash", | ||
| ) # Internal unique account id, omitted from output, used for deterministic hashing | ||
| .withColumn( | ||
| "ACCOUNTID", StringType(), format="0x%032x", baseColumn="internal_ACCOUNTID" | ||
| ) # Public account id as hex string | ||
| .withColumn( | ||
| "internal_DEVICEID", | ||
| LongType(), | ||
| minValue=0x1000000000000, | ||
| uniqueValues=NUMBER_OF_DEVICES, | ||
| omit=True, | ||
| baseColumnType="hash", | ||
| baseColumn="internal_ACCOUNTID" | ||
| ) # Internal device id, unique per account, omitted from output | ||
| .withColumn( | ||
| "DEVICEID", StringType(), format="0x%032x", baseColumn="internal_DEVICEID" | ||
| ) # Public device id as hex string | ||
| .withColumn("app_version", StringType(), values=["current"]) # Static app version | ||
| .withColumn("authMethod", StringType(), values=["OAuth", "password"]) # Auth method, random selection | ||
| # Assign clientName based on DEVICEID deterministically | ||
| .withColumn( | ||
| "clientName", | ||
| StringType(), | ||
| expr=""" | ||
| element_at( | ||
| array('SwitchGameClient','XboxGameClient','PlaystationGameClient','PCGameClient'), | ||
| (pmod(abs(hash(DEVICEID)), 4) + 1) | ||
| ) | ||
| """ | ||
| ) | ||
| .withColumn( | ||
| "clientId", | ||
| StringType(), | ||
| expr="sha2(concat(ACCOUNTID, clientName), 256)", | ||
| baseColumn=["ACCOUNTID", "clientName"] | ||
| ) # Deterministic clientId based on ACCOUNTID and clientName | ||
| .withColumn( | ||
| "correlationId", | ||
| StringType(), | ||
| expr="sha2(concat(ACCOUNTID, clientId), 256)", | ||
| ) # Session correlation id, deterministic hash | ||
| .withColumn("country", StringType(), values=["USA", "UK", "AUS"], weights=[0.6, 0.2, 0.2], baseColumn="ACCOUNTID", random=True) # Assign country with 60% USA, 20% UK, 20% AUS | ||
| .withColumn("environment", StringType(), values=["prod"]) # Static environment value | ||
| .withColumn("EVENT_TYPE", StringType(), values=["account_login_success"]) # Static event type | ||
| # Assign geoip_city_name based on country and ACCOUNTID | ||
| .withColumn( | ||
| "CITY", | ||
| StringType(), | ||
| expr=""" | ||
| CASE | ||
| WHEN country = 'USA' THEN element_at(array('New York', 'San Francisco', 'Chicago'), pmod(abs(hash(ACCOUNTID)), 3) + 1) | ||
| WHEN country = 'UK' THEN 'London' | ||
| WHEN country = 'AUS' THEN 'Sydney' | ||
| END | ||
| """, | ||
| baseColumn=["country", "ACCOUNTID"] | ||
| ) | ||
| .withColumn("countrycode2", StringType(), expr="CASE WHEN country = 'USA' THEN 'US' WHEN country = 'UK' THEN 'UK' WHEN country = 'AUS' THEN 'AU' END", baseColumn=["country"]) # Country code | ||
| # Assign ISP based on country and ACCOUNTID | ||
| .withColumn( | ||
| "ISP", | ||
| StringType(), | ||
| expr=""" | ||
| CASE | ||
| WHEN country = 'USA' THEN element_at(array('Comcast', 'AT&T', 'Verizon', 'Spectrum', 'Cox'), pmod(abs(hash(ACCOUNTID)), 5) + 1) | ||
| WHEN country = 'UK' THEN element_at(array('BT', 'Sky', 'Virgin Media', 'TalkTalk', 'EE'), pmod(abs(hash(ACCOUNTID)), 5) + 1) | ||
| WHEN country = 'AUS' THEN element_at(array('Telstra', 'Optus', 'TPG', 'Aussie Broadband', 'iiNet'), pmod(abs(hash(ACCOUNTID)), 5) + 1) | ||
| ELSE 'Unknown ISP' | ||
| END | ||
| """, | ||
| baseColumn=["country", "ACCOUNTID"] | ||
| ) | ||
| # Assign latitude based on city | ||
| .withColumn( | ||
| "latitude", | ||
| FloatType(), | ||
| expr=""" | ||
| CASE | ||
| WHEN CITY = 'New York' THEN 40.7128 | ||
| WHEN CITY = 'San Francisco' THEN 37.7749 | ||
| WHEN CITY = 'Chicago' THEN 41.8781 | ||
| WHEN CITY = 'London' THEN 51.5074 | ||
| WHEN CITY = 'Sydney' THEN -33.8688 | ||
| ELSE 0.0 | ||
| END | ||
| """, | ||
| baseColumn="CITY" | ||
| ) | ||
| # Assign longitude based on city | ||
| .withColumn( | ||
| "longitude", | ||
| FloatType(), | ||
| expr=""" | ||
| CASE | ||
| WHEN CITY = 'New York' THEN -74.0060 | ||
| WHEN CITY = 'San Francisco' THEN -122.4194 | ||
| WHEN CITY = 'Chicago' THEN -87.6298 | ||
| WHEN CITY = 'London' THEN -0.1278 | ||
| WHEN CITY = 'Sydney' THEN 151.2093 | ||
| ELSE 0.0 | ||
| END | ||
| """, | ||
| baseColumn="CITY" | ||
| ) | ||
| # Assign region name based on country and city | ||
| .withColumn( | ||
| "region_name", | ||
| StringType(), | ||
| expr=""" | ||
| CASE | ||
| WHEN country = 'USA' THEN | ||
| CASE | ||
| WHEN CITY = 'New York' THEN 'New York' | ||
| WHEN CITY = 'San Francisco' THEN 'California' | ||
| WHEN CITY = 'Chicago' THEN 'Illinois' | ||
| ELSE 'Unknown' | ||
| END | ||
| WHEN country = 'UK' THEN 'England' | ||
| WHEN country = 'AUS' THEN 'New South Wales' | ||
| ELSE 'Unknown' | ||
| END | ||
| """, | ||
| baseColumn=["country", "CITY"] | ||
| ) | ||
| # Internal IP address as integer, unique per device, omitted from output | ||
| .withColumn( | ||
| "internal_REQUESTIPADDRESS", | ||
| LongType(), | ||
| minValue=0x1000000000000, | ||
| uniqueValues=NUMBER_OF_IPS, | ||
| omit=True, | ||
| baseColumnType="hash", | ||
| baseColumn="internal_DEVICEID" | ||
| ) | ||
| # Convert internal IP integer to dotted quad string | ||
| .withColumn( | ||
| "REQUESTIPADDRESS", | ||
| StringType(), | ||
| expr=""" | ||
| concat( | ||
| cast((internal_REQUESTIPADDRESS >> 24) & 255 as string), '.', | ||
| cast((internal_REQUESTIPADDRESS >> 16) & 255 as string), '.', | ||
| cast((internal_REQUESTIPADDRESS >> 8) & 255 as string), '.', | ||
| cast(internal_REQUESTIPADDRESS & 255 as string) | ||
| ) | ||
| """, | ||
| baseColumn="internal_REQUESTIPADDRESS", | ||
| ) | ||
| # Generate user agent string using clientName and correlationId | ||
| .withColumn( | ||
| "userAgent", | ||
| StringType(), | ||
| expr="concat('Launch/1.0+', clientName, '(', clientName, '/)/', correlationId)", | ||
| baseColumn=["clientName", "correlationId"] | ||
| ) | ||
| ) | ||
| default_annotations = default_annotations_spec.build() | ||
ManishNamburi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # DBTITLE 1,Transform the Dataframe | ||
| from pyspark.sql.functions import col, to_date, hour | ||
| # Transform the DataFrame | ||
| transformed_df = default_annotations.select( | ||
| col("EVENT_TYPE").alias("EVENT_TYPE"), | ||
| col("EVENT_TIMESTAMP").alias("EVENT_TIMESTAMP"), | ||
| hour(col("EVENT_TIMESTAMP")).alias("EVENT_HOUR"), | ||
| to_date(col("EVENT_TIMESTAMP")).alias("EVENT_DATE"), | ||
| col("ACCOUNTID").alias("ACCOUNTID"), | ||
| col("environment").alias("APPENV"), | ||
| col("app_version").alias("APP_VERSION"), | ||
| col("authMethod").alias("AUTHMETHOD"), | ||
| col("clientId").alias("CLIENTID"), | ||
| col("clientName").alias("CLIENTNAME"), | ||
| col("DEVICEID").alias("DEVICEID"), | ||
| col("environment").alias("ENVIRONMENT"), | ||
| col("CITY").alias("CITY"), | ||
| col("countrycode2").alias("COUNTRY_CODE2"), | ||
| col("ISP").alias("ISP"), | ||
| col("latitude").cast("double").alias("LATITUDE"), | ||
| col("longitude").cast("double").alias("LONGITUDE"), | ||
| col("region_name").alias("REGION_NAME"), | ||
| col("REQUESTIPADDRESS").alias("REQUESTIPADDRESS"), | ||
| col("correlationId").alias("SESSION_ID"), | ||
| col("correlationId").alias("TRACKINGUUID"), | ||
| col("userAgent").alias("USERAGENT"), | ||
| ) | ||
ManishNamburi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # DBTITLE 1,Look at the Data | ||
| display(transformed_df) | ||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # MAGIC %md | ||
| # MAGIC # Write Data | ||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| transformed_df.write.mode("overwrite").saveAsTable("main.test.EVENT_ACCOUNT_LOGIN_SUCCESS") | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.