A new version of the CIDACS-RL linkage tool suitable to a cluster computing environment.

Overview

Fully Distributed CIDACS-RL

The CIDACS-RL is a brazillian record linkage tool suitable to integrate large amount of data with high accuracy. However, its current implementation relies on a ElasticSearch Cluster to distribute the queries and a single node to perform them through Python Multiprocessing lib. This implementation of CIDACS-RL tool can be deployed in a Spark Cluster using all resources available by Jupyter Kernel still using the ElasticSearch cluster, becaming a fully distributed and cluster based solution. It can outperform the legacy version of CIDACS-RL either on multi-node or single node Spark Environment.

config.json

Almost all the aspects of the linkage can be manipulated by the config.json file.

Section Sub-section Field (datatype) Field description
General info index_data (str<'yes', 'no'>) This flag says if the linkage process includes the indexing of a data set into elastic search. Constraints: string, it can assume the values "yes" or "no".
General info es_index_name (str<ES_VALID_INDEX>) The name of an existing elasticsearch index (if index_data is 'no') or a new one (if index_data is 'yes'). Constraints: string, elasticsearch valid.
General info es_connect_string (str<ES_URL:ES_PORT>) Elasticsearch API address. Constraints: string, URL format.
General info query_size (int) Number of candidates output for each Elasticsearch query. Constraints: int.
General info cutoff_exact_match (str<0:1 number>) Cutoff point to determine wether a pair is an exact match or not. Constraints: str, number between 0 and 1.
General info null_value (str) Value to replace missings on both data sets involved. Constraints: string.
General info temp_dir (str) Directory used to write checkpoints for exact match and non-exact match phases. Constraints: string, fully qualified path.
General info debug (str<'true', 'false'>) If it is set as "true", all records found on exact match will be queried again on non-exact match phase.
Datasets info Indexed dataset path (str) Path for csv or parquet folder of dataset to index.
Datasets info Indexed dataset extension (str<'csv', 'parquet'>) String to determine the type of data reading on Spark.
Datasets info Indexed dataset columns (list) Python list with column names involved on linkage.
Datasets info Indexed dataset id_column_name (str) Name of id column.
Datasets info Indexed dataset storage_level (str<'MEMORY_AND_DISK', 'MEMORY_ONLY'>) Directive for memory allocation on Spark.
Datasets info Indexed dataset default_paralelism (str<4*N_OF_AVAILABLE_CORES>) Number of partitions of a given Spark dataframe.
Datasets info tolink dataset path (str) Path for csv or parquet folder of dataset to index.
Datasets info tolink dataset extension (str<'csv', 'parquet'>) String to determine the type of data reading on Spark.
Datasets info tolink dataset columns (list) Python list with column names involved on linkage.
Datasets info tolink dataset id_column_name (str) Name of id column.
Datasets info tolink dataset storage_level (str<'MEMORY_AND_DISK', 'MEMORY_ONLY'>) Directive for memory allocation on Spark.
Datasets info tolink dataset default_paralelism (str<4*N_OF_AVAILABLE_CORES>) Number of partitions of a given Spark dataframe.
Datasets info result dataset path (str) Path for csv or parquet folder of dataset to index.
Comparisons label1 indexed_col (str) Name of first column to be compared on indexed dataset
Comparisons label1 tolink_col (str) Name of first column to be compared on tolink dataset
Comparisons label1 must_match (str<'true', 'false'>) Set if this pair of columns are included on exact match phase
Comparisons label1 should_match (str<'true', 'false'>) Set if this pair of columns are included on non-exact match phase
Comparisons label1 is_fuzzy (str<'true', 'false'>) Set if this pair of columns are included on fuzzy queries for non-exact match phase
Comparisons label1 boost (str) Set the boost/weight of this pair of columns on queries
Comparisons label1 query_type (str<'match', 'term'>) Set the type of matching for this pair of columns on non-exact match phase
Comparisons label1 similarity (str<'jaro_winkler', 'overlap', 'hamming'> Set the similarity to be calculated between the values of this pair of columns
Comparisons label1 weight (str) Set the weight of this pair of columns.
Comparisons label1 penalty (str) Set the penalty of the overall similarity in case of missing value(s).
Comparisons label2 ... ...

config.json example


{
 'index_data': 'no',
 'es_index_name': 'fd-cidacs-rl',
 'es_connect_string': 'http://localhost:9200',
 'query_size': 100,
 'cutoff_exact_match': '0.95',
 'null_value': '99',
 'temp_dir': '../../../0_global_data/fd-cidacs-rl/temp_dataframe/',
 'debug': 'false',
 
 'datasets_info': {
    'indexed_dataset': {
        'path': '../../../0_global_data/fd-cidacs-rl/sinthetic-dataset-A.parquet',
        'extension': 'parquet',
        'columns': ['id_cidacs_a', 'nome_a', 'nome_mae_a', 'dt_nasc_a', 'sexo_a'],
        'id_column_name': 'id_cidacs_a',
        'storage_level': 'MEMORY_ONLY',
        'default_paralelism': '16'},
    'tolink_dataset': {
        'path': '../../../0_global_data/fd-cidacs-rl/sinthetic-datasets-b/sinthetic-datasets-b-500000.parquet',
        'extension': 'parquet',
        'columns': ['id_cidacs_b', 'nome_b', 'nome_mae_b', 'dt_nasc_b', 'sexo_b'],
        'id_column_name': 'id_cidacs_b',
        'storage_level': 'MEMORY_ONLY',
        'default_paralelism': '16'},
    'result_dataset': {
        'path': '../0_global_data/result/500000/'}},
        
 'comparisons': {
    'name': {
        'indexed_col': 'nome_a',
        'tolink_col': 'nome_b',
        'must_match': 'true',
        'should_match': 'true',
        'is_fuzzy': 'true',
        'boost': '3.0',
        'query_type': 'match',
        'similarity': 'jaro_winkler',
        'weight': 5.0,
        'penalty': 0.02},
    'mothers_name': {
       'indexed_col': 'nome_mae_a',
       'tolink_col': 'nome_mae_b',
       'must_match': 'true',
       'should_match': 'true',
       'is_fuzzy': 'true',
       'boost': '2.0',
       'query_type': 'match',
       'similarity': 'jaro_winkler',
       'weight': 5.0,
       'penalty': 0.02},
  'birthdate': {
       'indexed_col': 'dt_nasc_a',
       'tolink_col': 'dt_nasc_b',
       'must_match': 'false',
       'should_match': 'true',
       'is_fuzzy': 'false',
       'boost': '',
       'query_type': 'term',
       'similarity': 'hamming',
       'weight': 1.0,
       'penalty': 0.02},
  'sex': {
       'indexed_col': 'sexo_a',
       'tolink_col': 'sexo_b',
       'must_match': 'true',
       'should_match': 'true',
       'is_fuzzy': 'false',
       'boost': '',
       'query_type': 'term',
       'similarity': 'overlap',
       'weight': 3.0,
       'penalty': 0.02}}}

Running in a Standalone Spark Cluster

Read more: https://github.com/elastic/elasticsearch-hadoop https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html https://search.maven.org/artifact/org.elasticsearch/elasticsearch-spark-30_2.12 If you intend to run this tool into a single node Spark environment, consider to include this in you spark-submit or spark-shell command line


pyspark --packages org.elasticsearch:elasticsearch-spark-30_2.12:7.14.0 --conf spark.es.nodes="localhost" --conf spark.es.port="9200"

If you are running into a Spark Cluster under JupyterHUB kernels, try to add this kernel or edit an existing one:


{
	 "display_name": "Spark3.3",
	  "language": "python",
	   "argv": [
		     "/opt/bigdata/anaconda3/bin/python",
		       "-m",
		         "ipykernel",
			   "-f",
			     "{connection_file}"
			      ],
			       "env": {
				         "SPARK_HOME": "/opt/bigdata/spark",
					   "PYTHONPATH": "/opt/bigdata/spark/python:/opt/bigdata/spark/python/lib/py4j-0.10.9.2-src.zip",
					     "PYTHONSTARTUP": "/opt/bigdata/spark/python/pyspark/python/pyspark/shell.py",
					       "PYSPARK_PYTHON": "/opt/bigdata/anaconda3/bin/python",
					         "PYSPARK_SUBMIT_ARGS": "--master spark://node1.sparkcluster:7077 --packages org.elasticsearch:elasticsearch-spark-30_2.12:7.14.0 --conf spark.es.nodes=['node1','node2'] --conf spark.es.port='9200' pyspark-shell"
						  }
}

Some advices for indexed data and queries

  • Every col should be casted as string (df.withColumn('column', F.col('column').cast(string')))
  • Date type columns will not be proper indexed as string, except if some preprocessing step tranform it from yyyy-MM-dd to yyyyMMdd.
  • All the nodes of elasticsearch cluster must be included on --packages configuration.
  • Term queries are good to well structured variables, such as CPF, dates, CNPJ, etc.
You might also like...
A PyTorch implementation of
A PyTorch implementation of "Cluster-GCN: An Efficient Algorithm for Training Deep and Large Graph Convolutional Networks" (KDD 2019).

ClusterGCN ⠀⠀ A PyTorch implementation of "Cluster-GCN: An Efficient Algorithm for Training Deep and Large Graph Convolutional Networks" (KDD 2019). A

Multiband spectro-radiometric satellite image analysis with K-means cluster algorithm

Multi-band Spectro Radiomertric Image Analysis with K-means Cluster Algorithm Overview Multi-band Spectro Radiomertric images are images comprising of

Complete-IoU (CIoU) Loss and Cluster-NMS for Object Detection and Instance Segmentation (YOLACT)
Complete-IoU (CIoU) Loss and Cluster-NMS for Object Detection and Instance Segmentation (YOLACT)

Complete-IoU Loss and Cluster-NMS for Improving Object Detection and Instance Segmentation. Our paper is accepted by IEEE Transactions on Cybernetics

Online Pseudo Label Generation by Hierarchical Cluster Dynamics for Adaptive Person Re-identification

Online Pseudo Label Generation by Hierarchical Cluster Dynamics for Adaptive Person Re-identification

Clustergram - Visualization and diagnostics for cluster analysis in Python
Clustergram - Visualization and diagnostics for cluster analysis in Python

Clustergram Visualization and diagnostics for cluster analysis Clustergram is a diagram proposed by Matthias Schonlau in his paper The clustergram: A

Sub-Cluster AdaCos: Learning Representations for Anomalous Sound Detection.

Accompanying code for the paper Sub-Cluster AdaCos: Learning Representations for Anomalous Sound Detection.

Public scripts, services, and configuration for running a smart home K3S network cluster
Public scripts, services, and configuration for running a smart home K3S network cluster

makerhouse_network Public scripts, services, and configuration for running MakerHouse's home network. This network supports: TODO features here For mo

Numenta Platform for Intelligent Computing is an implementation of Hierarchical Temporal Memory (HTM), a theory of intelligence based strictly on the neuroscience of the neocortex.

NuPIC Numenta Platform for Intelligent Computing The Numenta Platform for Intelligent Computing (NuPIC) is a machine intelligence platform that implem

MACE is a deep learning inference framework optimized for mobile heterogeneous computing platforms.
MACE is a deep learning inference framework optimized for mobile heterogeneous computing platforms.

Documentation | FAQ | Release Notes | Roadmap | MACE Model Zoo | Demo | Join Us | 中文 Mobile AI Compute Engine (or MACE for short) is a deep learning i

Owner
Robespierre Pita
AI Researcher
Robespierre Pita
Sky Computing: Accelerating Geo-distributed Computing in Federated Learning

Sky Computing Introduction Sky Computing is a load-balanced framework for federated learning model parallelism. It adaptively allocate model layers to

HPC-AI Tech 72 Dec 27, 2022
A lightweight Python-based 3D network multi-agent simulator. Uses a cell-based congestion model. Calculates risk, loudness and battery capacities of the agents. Suitable for 3D network optimization tasks.

AMAZ3DSim AMAZ3DSim is a lightweight python-based 3D network multi-agent simulator. It uses a cell-based congestion model. It calculates risk, battery

Daniel Hirsch 13 Nov 4, 2022
Python project to take sound as input and output as RGB + Brightness values suitable for DMX

sound-to-light Python project to take sound as input and output as RGB + Brightness values suitable for DMX Current goals: Get one pixel working: Vary

Bobby Cox 1 Nov 17, 2021
ESGD-M - A stochastic non-convex second order optimizer, suitable for training deep learning models, for PyTorch

ESGD-M - A stochastic non-convex second order optimizer, suitable for training deep learning models, for PyTorch

Katherine Crowson 53 Dec 29, 2022
A PaddlePaddle version of Neural Renderer, refer to its PyTorch version

Neural 3D Mesh Renderer in PadddlePaddle A PaddlePaddle version of Neural Renderer, refer to its PyTorch version Install Run: pip install neural-rende

AgentMaker 13 Jul 12, 2022
The all new way to turn your boring vector meshes into the new fad in town; Voxels!

Voxelator The all new way to turn your boring vector meshes into the new fad in town; Voxels! Notes: I have not tested this on a rotated mesh. With fu

null 6 Feb 3, 2022
Serverless proxy for Spark cluster

Hydrosphere Mist Hydrosphere Mist is a serverless proxy for Spark cluster. Mist provides a new functional programming framework and deployment model f

hydrosphere.io 317 Dec 1, 2022
A template repository for submitting a job to the Slurm Cluster installed at the DISI - University of Bologna

Cluster di HPC con GPU per esperimenti di calcolo (draft version 1.0) Per poter utilizzare il cluster il primo passo è abilitare l'account istituziona

null 20 Dec 16, 2022
codes for paper Combining Dynamic Local Context Focus and Dependency Cluster Attention for Aspect-level sentiment classification

DLCF-DCA codes for paper Combining Dynamic Local Context Focus and Dependency Cluster Attention for Aspect-level sentiment classification. submitted t

null 15 Aug 30, 2022