pyspark-anonymizer
Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark environment.
Installing
pip install pyspark-anonymizer
Usage
Before Masking
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("your_app_name").getOrCreate()
df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")
df.limit(5).toPandas()
marketplace | customer_id | review_id | product_id | product_parent | product_title | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_headline | review_body | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 51163966 | R2RX7KLOQQ5VBG | B00000JBAT | 738692522 | Diamond Rio Digital Player | 3 | 0 | 0 | N | N | Why just 30 minutes? | RIO is really great, but Diamond should increa... | 1999-06-22 | 1999 |
1 | US | 30050581 | RPHMRNCGZF2HN | B001BRPLZU | 197287809 | NG 283220 AC Adapter Power Supply for HP Pavil... | 5 | 0 | 0 | N | Y | Five Stars | Great quality for the price!!!! | 2014-11-17 | 2014 |
2 | US | 52246039 | R3PD79H9CTER8U | B00000JBAT | 738692522 | Diamond Rio Digital Player | 5 | 1 | 2 | N | N | The digital audio "killer app" | One of several first-generation portable MP3 p... | 1999-06-30 | 1999 |
3 | US | 16186332 | R3U6UVNH7HGDMS | B009CY43DK | 856142222 | HDE Mini Portable Capsule Travel Mobile Pocket... | 5 | 0 | 0 | N | Y | Five Stars | I like it, got some for the Grandchilren | 2014-11-17 | 2014 |
4 | US | 53068431 | R3SP31LN235GV3 | B00000JBSN | 670078724 | JVC FS-7000 Executive MicroSystem (Discontinue... | 3 | 5 | 5 | N | N | Design flaws ruined the better functions | I returned mine for a couple of reasons: The ... | 1999-07-13 | 1999 |
After Masking
In this example we will add the following data anonymizers:
- drop_column on column "marketplace"
- replace all values to "*" of the "customer_id" column
- replace_with_regex "R\d" (R and any digit) to "*" on "review_id" column
- sha256 on "product_id" column
- filter_row with condition "product_parent != 738692522"
from pyspark.sql import SparkSession
import pyspark.sql.functions as spark_functions
import pyspark_anonymizer
spark = SparkSession.builder.appName("your_app_name").getOrCreate()
df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")
dataframe_anonymizers = [
{
"method": "drop_column",
"parameters": {
"column_name": "marketplace"
}
},
{
"method": "replace",
"parameters": {
"column_name": "customer_id",
"replace_to": "*"
}
},
{
"method": "replace_with_regex",
"parameters": {
"column_name": "review_id",
"replace_from_regex": "R\d",
"replace_to": "*"
}
},
{
"method": "sha256",
"parameters": {
"column_name": "product_id"
}
},
{
"method": "filter_row",
"parameters": {
"where": "product_parent != 738692522"
}
}
]
df_parsed = pyspark_anonymizer.Parser(df, dataframe_anonymizers, spark_functions).parse()
df_parsed.limit(5).toPandas()
customer_id | review_id | product_id | product_parent | product_title | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_headline | review_body | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | * | RPHMRNCGZF2HN | 69031b13080f90ae3bbbb505f5f80716cd11c4eadd8d86... | 197287809 | NG 283220 AC Adapter Power Supply for HP Pavil... | 5 | 0 | 0 | N | Y | Five Stars | Great quality for the price!!!! | 2014-11-17 | 2014 |
1 | * | *U6UVNH7HGDMS | c99947c06f65c1398b39d092b50903986854c21fd1aeab... | 856142222 | HDE Mini Portable Capsule Travel Mobile Pocket... | 5 | 0 | 0 | N | Y | Five Stars | I like it, got some for the Grandchilren | 2014-11-17 | 2014 |
2 | * | *SP31LN235GV3 | eb6b489524a2fb1d2de5d2e869d600ee2663e952a4b252... | 670078724 | JVC FS-7000 Executive MicroSystem (Discontinue... | 3 | 5 | 5 | N | N | Design flaws ruined the better functions | I returned mine for a couple of reasons: The ... | 1999-07-13 | 1999 |
3 | * | *IYAZPPTRJF7E | 2a243d31915e78f260db520d9dcb9b16725191f55c54df... | 503838146 | BlueRigger High Speed HDMI Cable with Ethernet... | 3 | 0 | 0 | N | Y | Never got around to returning the 1 out of 2 ... | Never got around to returning the 1 out of 2 t... | 2014-11-17 | 2014 |
4 | * | *RDD9FILG1LSN | c1f5e54677bf48936fb1e9838869630e934d16ac653b15... | 587294791 | Brookstone 2.4GHz Wireless TV Headphones | 5 | 3 | 3 | N | Y | Saved my. marriage, I swear to god. | Saved my.marriage, I swear to god. | 2014-11-17 | 2014 |
Anonymizers from DynamoDB
You can store anonymizers on DynamoDB too.
Creating DynamoDB table
To create the table follow the steps below.
Using example script
- Run examples/create_on_demand_table.py script of examples directory. The table will be created
On AWS console:
- DynamoDB > Tables > Create table
- Table name: "pyspark_anonymizer" (or any other of your own)
- Partition key: "dataframe_name"
- Customize the settings if you want
- Create table
Writing Anonymizer on DynamoDB
You can run the example script, then edit your settings from there.
- Run examples/insert_anonymizer.py script.
- A new entry on DynamoDB will be added, the example dataframe name is "table_x"
Parse from DynamoDB
from pyspark.sql import SparkSession
import pyspark.sql.functions as spark_functions
import pyspark_anonymizer
import boto3
from botocore.exceptions import ClientError as client_error
dynamo_table = "pyspark_anonymizer"
dataframe_name = "table_x"
dynamo_table = boto3.resource('dynamodb').Table(dynamo_table)
spark = SparkSession.builder.appName("your_app_name").getOrCreate()
df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")
df_parsed = pyspark_anonymizer.ParserFromDynamoDB(df, dataframe_name, dynamo_table, spark_functions, client_error).parse()
df_parsed.limit(5).toPandas()
The output will be same as the previous. The difference is that the anonymization settings will be in DynamoDB
Currently supported data masking/anonymization methods
- Methods
- drop_column - Drop a column.
- replace - Replace all column to a string.
- replace_with_regex - Replace column contents with regex.
- sha256 - Apply sha256 hashing function.
- filter_row - Apply a filter to the dataframe.