A Pythonic Data Catalog powered by Ray that brings exabyte-level scalability and fast, ACID-compliant, change-data-capture to your big data workloads.

Overview

DeltaCAT

DeltaCAT is a Pythonic Data Catalog powered by Ray.

Its data storage model allows you to define and manage fast, scalable, ACID-compliant data catalogs through git-like stage/commit APIs, and has been used to successfully host exabyte-scale enterprise data lakes.

DeltaCAT uses the Ray distributed compute framework together with Apache Arrow for common table management tasks, including petabyte-scale change-data-capture, data consistency checks, and table repair.

Comments
  • Add ability to collect stats for specific dataset columns

    Add ability to collect stats for specific dataset columns

    First draft of adding the ability to collect stats for specific columns in a dataset.

    • Update caching/fetching of stats to be at the column level
    • Improve column level performance of stats collection by loading specific columns of PyArrow tables into memory rather than the entire table
    • Additional API to collect aggregation of all stats from a list of deltas (used for the benchmarks below)
    • Allow file system storage path to be optional (to test cache lookup performance vs. no-cache performance)

    Linked Issues:

    • #5
    • #4
    • #6

    Quick benchmarks from local testing on a large data catalog table of roughly 226 TB and 679 billion row counts.

    {'rowCount': 679582576542, 'pyarrowTableBytes': 226288499853521}
    709.9742422103882 seconds taken for all column stats collection (Using S3 cache)
    
    {'rowCount': 679582576542, 'pyarrowTableBytes': 226288499853521}
    3005.063000679016 seconds taken for all column stats collection (No cache)
    
    {'rowCount': 679582576542, 'pyarrowTableBytes': 13761547608634}
    10.410633325576782 seconds taken for two column stats collection (Using S3 cache)
    
    {'rowCount': 679582576542, 'pyarrowTableBytes': 13761547608634}
    1646.9808332920074 seconds taken for two column stats collection (No cache)
    
    opened by rkenmi 2
  • Add job event state listener and dispatcher

    Add job event state listener and dispatcher

    This PR introduces general purpose utilities (event state listener and dispatcher) to transition Ray jobs from one state to another. It also includes an example driver script for deltacat compaction runs.

    Concepts

    Event State Listener: Latest events are polled from the Event Summary DynamoDB table. Callbacks can be defined for an event's job state, with granularity down to the job state sequence level. In its simplest form, these callbacks can be used to invoke calls with the Event Dispatcher to transition from a previous job state to another job state.

    Event Dispatcher: Publishes new events via SNS with the job state name and job state sequence.

    --

    Pre-requisites

    Tested with

    • Ray 1.11.0
    opened by rkenmi 1
  • Multi partition compaction

    Multi partition compaction

    Removed files under autoscaler/ Created a new module under /utils, placement.py

    Example of using placement group

    from deltacat.utils.placement import PlacementGroupManager as  pgm
    pgs = pgm(32, 8, 60).pgs # requesting 32 cpus with 8 cpus per instance, timeout 60 seconds.
    
    pgs[0][0]
    {"scheduling_strategy":PlacementGroupSchedulingStrategy(
    			placement_group=pg, placement_group_capture_child_tasks=True)
    }
    pgs[0][1]
    {"CPU":32, "memory":256G,"object_store_memory":256G,"node_id":['nid0','nid1','nid2','nid3']}
    
    opened by valiantljk 0
  • Scaling test

    Scaling test

    • added node group and placement group manager
    • hardcoded num_cpus to 0.5 for most of the remote tasks (run into resource not available issue in scaling test when default request of cpus is 1)
    opened by valiantljk 0
  • No Cairns shard found for stream

    No Cairns shard found for stream

    It looks like when no cairns shard was found, the code doesn't handle very well. For example:

    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/deltacat/compute/compactor/compaction_session.py", line 102, in compact_partition
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     parallelism=parallelism
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/deltacat/compute/compactor/compaction_session.py", line 258, in _execute_compaction_round
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     deltacat_storage,
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/deltacat/compute/compactor/utils/io.py", line 36, in discover_deltas
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     True,
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/sungate/storage/andes/client.py", line 208, in list_deltas
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     return _list_result_first_page(next_page_provider)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/sungate/storage/andes/client.py", line 51, in _list_result_first_page
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     list_result = next_page_provider()
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/sungate/storage/andes/client.py", line 233, in _list_deltas
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     equivalent_table_types,
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1310, in __call__
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     answer, self.gateway_client, self.target_id, self.name)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)   File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33)     format(target_id, ".", name), value)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) py4j.protocol.Py4JJavaError: An error occurred while calling o3.listDeltas.
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) : java.lang.IllegalArgumentException: No Cairns shard found for stream 83928554-6bea-4315-9e42-f2e05341d3af and partition key values [2, 2021-10-05T00:00:00.000Z].
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at com.amazon.sungate.interop.bdt.unified.AndesPortableUnifiedClientImpl.findLatestActiveCairnsShardUuidNonNull(AndesPortableUnifiedClientImpl.java:479)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at com.amazon.sungate.interop.bdt.unified.AndesPortableUnifiedClientImpl.listCairnsDeltas(AndesPortableUnifiedClientImpl.java:784)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at com.amazon.sungate.interop.bdt.unified.AndesPortableUnifiedClientImpl.listDeltas(AndesPortableUnifiedClientImpl.java:652)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at java.lang.reflect.Method.invoke(Method.java:498)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at py4j.Gateway.invoke(Gateway.java:282)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at py4j.GatewayConnection.run(GatewayConnection.java:238)
    (invoke_parallel_compact_partition pid=15634, ip=172.31.46.33) 	at java.lang.Thread.run(Thread.java:750)
    
    opened by valiantljk 0
  • [Storage] API Updates to add Ray Options Provider

    [Storage] API Updates to add Ray Options Provider

    This PR changes existing APIs to inject ray.remote options providers when invoking storage APIs that will subsequently invoke concurrent Ray tasks for all blocks of a dataset.

    opened by pdames 0
  • PyArrow kwargs not setting columns in s3_file_to_table

    PyArrow kwargs not setting columns in s3_file_to_table

    https://github.com/ray-project/deltacat/blob/fb210380ac9e3cfcfa728e822ab838567d5f4d99/deltacat/utils/pyarrow.py#L155

    I think this was unintended? It looks like a no-op that doesn't throw any errors due to type hinting.

    Quick fix might just be: kwargs["columns"] = include_columns

    opened by rkenmi 0
  • Retrieving specific columns from tables and caching

    Retrieving specific columns from tables and caching

    https://github.com/ray-project/deltacat/blob/18c6c5f93a1e9c77bd3f794502a3dedbc179a0f1/deltacat/compute/stats/utils/io.py#L104

    The current implementation of the stats collector downloads the entire table into local memory. For many use cases we might not necessarily need the entire table - instead, having the option to fetch specific columns may offer a performance boost in the stats collection process

    enhancement 
    opened by rkenmi 0
  • Further optimizations for get_delta_stats with Ray

    Further optimizations for get_delta_stats with Ray

    https://github.com/ray-project/deltacat/blob/18c6c5f93a1e9c77bd3f794502a3dedbc179a0f1/deltacat/compute/stats/utils/io.py#L103-L104

    A single Ray task sequentially downloading delta manifests with a large number of manifest entries may take more time to complete, as opposed to having a single Ray task handle the download for a specific manifest entry by file index.

    enhancement 
    opened by rkenmi 0
  • Add infinity range support for stats collection range intervals

    Add infinity range support for stats collection range intervals

    https://github.com/ray-project/deltacat/blob/18c6c5f93a1e9c77bd3f794502a3dedbc179a0f1/deltacat/compute/stats/utils/intervals.py#L5

    Currently the delta stats collector does not have any support for ranges with infinity. For example if we want to grab stream positions from (-inf, inf), then ideally the range would encompass all stream positions for this particular delta.

    Ideas:

    • Allow None as an input to represent the infinity range. For example (None, None) can represent (-inf, inf), and (5, None) can represent (5, inf).
    • Document the behavior of how None is interpreted across deltacat_storage implementations
    enhancement 
    opened by rkenmi 0
  • Add delta stats collector functionality

    Add delta stats collector functionality

    When a compaction job run starts, the first things it needs are the (1) row count and (2) in-memory pyarrow table bytes for each new partition delta to be compacted. This change introduces an API to fetch these values given a range-set of partition delta stream positions.

    opened by rkenmi 0
  • Add first iteration of Github Actions workflow to build and publish to Python Package Index (PyPI)

    Add first iteration of Github Actions workflow to build and publish to Python Package Index (PyPI)

    This PR introduces a Github Action workflow that publishes tagged commits to PyPI and untagged commits to TestPyPi unconditionally if there is a version change

    Usage

    git add README.md
    git commit -m "Added foo section to the README"
    git tag 0.1.5
    git push origin 0.1.5
    

    Prerequisites

    • PyPI and TestPyPI API tokens generated and then set as PYPI_API_TOKEN and TEST_PYPI_API_TOKEN respectively (PyPI currently set, TestPyPi pending)

    Related links / References

    • https://packaging.python.org/en/latest/guides/publishing-package-distribution-releases-using-github-actions-ci-cd-workflows/

    Testing

    Identical workflow executed on my fork of Deltacat - https://github.com/pfaraone/deltacat/actions

    Discussion

    • Dependency installation from requirements.txt is taking 55 seconds (45% of total execution time) because we are using a dependency range instead of a pinned dependency (e.g., "pyarrow >= 8.0.0")
    enhancement 
    opened by pfaraone 0
  • Add placement group support for constrain random tasks scheduling

    Add placement group support for constrain random tasks scheduling

    Current scaling approach leverages ray cluster's node group to shard the cluster into several logical node groups. In code we have

    task.options(resources={'node_group1':1}).remote()
    

    to assign the compaction task to a pre-allocated node group. But this doesn't stop the nested child tasks from landing onto other node randomly. The proposal is to combine placement group with node group(custom resource), so that all child tasks can inherit the parent task's pg. For example, the code should look like this:

    bundle = {"resources": {"node_group":1}}
    pg = placement_group([bundle], strategy="SPREAD")
    ray.get(pg.ready())
    task.options(scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg, placement_group_capture_child_tasks=True)).remote()
    
    opened by valiantljk 0
  • Add metastats collector and refactor stats collector

    Add metastats collector and refactor stats collector

    This PR adds support for metastats collector. Metastats collector basically estimate cluster cpus needed for stats collection and allocate appropriate number of manifest files per cpu.

    opened by Zyiqin-Miranda 0
  •  Proper documentation reference and how to run?

    Proper documentation reference and how to run?

    Hi all, Can anyone in the project share some documentation with me? All that i could find is a presentation of DeltaCat on the internet. DeltaCat Presentation Link. And also how can should i go about running and testing the already existing code, it does not seem to be having any default implementation of storage and catalog layer?

    Thanks and Regards Vikram Ahuja

    opened by vikramahuja1001 0
Owner
null
Python for downloading model data (HRRR, RAP, GFS, NBM, etc.) from NOMADS, NOAA's Big Data Program partners (Amazon, Google, Microsoft), and the University of Utah Pando Archive System.

Python for downloading model data (HRRR, RAP, GFS, NBM, etc.) from NOMADS, NOAA's Big Data Program partners (Amazon, Google, Microsoft), and the University of Utah Pando Archive System.

Brian Blaylock 194 Jan 2, 2023
Urban Big Data Centre Housing Sensor Project

Housing Sensor Project The Urban Big Data Centre is conducting a study of indoor environmental data in Scottish houses. We are using Raspberry Pi devi

Jeremy Singer 2 Dec 13, 2021
Capture screen and download off Roku based devices

rokuview Capture screen and download off Roku based devices Tested on Hisense TV with Roku OS built-in No guarantee this will work with all Roku model

null 3 May 27, 2021
Serverless demo showing users how they can capture (and obfuscate) their Lambda payloads in Datadog APM

Serverless-capture-lambda-payload-demo Serverless demo showing users how they can capture (and obfuscate) their Lambda payloads in Datadog APM This wi

Datadog, Inc. 1 Nov 2, 2021
This Open-Source project is great for sensor capture and storage solutions.

Phase 1 This project helps developers in the creation of extended realities that communicate with Arduino and require the security of blockchain stora

Wolfberry, LLC 10 Dec 28, 2022
Change your Windows background with this program safely & easily!

Background_Changer Table of Contents: About the Program Features Requirements Preview Credits Reach Me See Also About the Program: You can change your

Sina.f 0 Jul 14, 2022
You can change your mac address with this program.

1 - Warning! You can use this program with Kali Linux. Therefore if you don't install the Kali Linux. Firstly you need to install Kali Linux. 2 - Star

Mustafa Bahadır Doğrusöz 1 Jun 10, 2022
An implementation of Ray Tracing in One Weekend using Taichi

又一个Taichi语言的Ray Tracer 背景简介 这个Ray Tracer基本上是照搬了Peter Shirley的第一本小书Ray Tracing in One Weekend,在我写的时候参考的是Version 3.2.3这个版本。应该比其他中文博客删改了不少内容。果然Peter Shir

张皓 30 Nov 21, 2022
We are building an open database of COVID-19 cases with chest X-ray or CT images.

?? Note: please do not claim diagnostic performance of a model without a clinical study! This is not a kaggle competition dataset. Please read this pa

Joseph Paul Cohen 2.9k Dec 30, 2022
Pokemon sword replay capture

pokemon-sword-replay-capture This is an old version (March 2020) pokemon-sword-replay-capture-mar-2020-version of my Pokemon Replay Capture software.

null 11 May 15, 2022
Blender 2.80+ Timelapse Capture Tool Addon

SimpleTimelapser Blender 2.80+ Timelapse Capture Tool Addon Developed for Blender 3.0.0, tested working on 2.80.0 It's no ZBrush undo history but it's

null 4 Jan 19, 2022
A StarkNet project template based on a Pythonic environment

StarkNet Project Template This is an opinionated StarkNet project template. It is based around the Python's ecosystem and best practices. tox to manag

Francesco Ceccon 5 Apr 21, 2022
Leveraging pythonic forces to defeat different coding challenges 🐍

Pyforces Leveraging pythonic forces to defeat different coding challenges! Table of Contents Pyforces Tests Pyforces Pyforces is a study repo with a c

Igor Grillo Peternella 8 Dec 14, 2022
A simple script that can watch a list of directories for change and does some action

plot_watcher A simple script that can watch a list of directories and does some action when a specific kind of change happens In its current implement

Charaf Errachidi 12 Sep 10, 2021
Fixes your Microphone Level to one specific value.

MicLeveler Fixes your Microphone Level to one specific value. Intention A friend of mine has the problem that some programs are setting his microphone

Moritz Timpe 2 Oct 14, 2021
HSPyLib is a Python library that will elevate your experience to another level.

HomeSetup Python Library - HSPyLib Your mature python application HSPyLib is a Python library that will elevate your experience to another level. It r

Hugo Saporetti Junior 4 Dec 14, 2022
Automatically re-open threads when they get archived, no matter your boost level!

ThreadPersist Automatically re-open threads when they get archived, no matter your boost level! Installation You will need to install poetry to run th

null 7 Sep 18, 2022
poro is a LCU interface to change some lol's options.

poro is a LCU interface to change some lol's options. with this program you can: change your profile icon change your profiel background image ch

João Dematte 2 Jan 5, 2022
Code needed for hybrid land cover change analysis for NASA IDS project

Documentation for the NASA IDS change analysis Poley 10/21/2021 Required python packages: whitebox numpy rasterio rasterio.mask os glob math itertools

Andrew Poley 2 Nov 12, 2021