Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.

Overview

Petastorm

Build Status (Travis CI) Code coverage License Latest Version

Petastorm is an open source data access library developed at Uber ATG. This library enables single machine or distributed training and evaluation of deep learning models directly from datasets in Apache Parquet format. Petastorm supports popular Python-based machine learning (ML) frameworks such as Tensorflow, PyTorch, and PySpark. It can also be used from pure Python code.

Documentation web site: https://petastorm.readthedocs.io

Installation

pip install petastorm

There are several extra dependencies that are defined by the petastorm package that are not installed automatically. The extras are: tf, tf_gpu, torch, opencv, docs, test.

For example to trigger installation of GPU version of tensorflow and opencv, use the following pip command:

pip install petastorm[opencv,tf_gpu]

Generating a dataset

A dataset created using Petastorm is stored in Apache Parquet format. On top of a Parquet schema, petastorm also stores higher-level schema information that makes multidimensional arrays into a native part of a petastorm dataset.

Petastorm supports extensible data codecs. These enable a user to use one of the standard data compressions (jpeg, png) or implement her own.

Generating a dataset is done using PySpark. PySpark natively supports Parquet format, making it easy to run on a single machine or on a Spark compute cluster. Here is a minimalistic example writing out a table with some random data.

import numpy as np
from petastorm.codecs import CompressedImageCodec, NdarrayCodec, ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType


HelloWorldSchema = Unischema('HelloWorldSchema', [
   UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
   UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
   UnischemaField('other_data', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])


def row_generator(x):
   """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
   return {'id': x,
           'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
           'other_data': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}


def generate_hello_world_dataset(output_url='file:///tmp/hello_world_dataset'):
   rows_count = 10
   rowgroup_size_mb = 256

   spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
   sc = spark.sparkContext

   # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
   # well as save petastorm specific metadata
   with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

       rows_rdd = sc.parallelize(range(rows_count))\
           .map(row_generator)\
           .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

       spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
           .coalesce(10) \
           .write \
           .mode('overwrite') \
           .parquet(output_url)
  • HelloWorldSchema is an instance of a Unischema object. Unischema is capable of rendering types of its fields into different framework specific formats, such as: Spark StructType, Tensorflow tf.DType and numpy numpy.dtype.
  • To define a dataset field, you need to specify a type, shape, a codec instance and whether the field is nullable for each field of the Unischema.
  • We use PySpark for writing output Parquet files. In this example, we launch PySpark on a local box (.master('local[2]')). Of course for a larger scale dataset generation we would need a real compute cluster.
  • We wrap spark dataset generation code with the materialize_dataset context manager. The context manager is responsible for configuring row group size at the beginning and write out petastorm specific metadata at the end.
  • The row generating code is expected to return a Python dictionary indexed by a field name. We use row_generator function for that.
  • dict_to_spark_row converts the dictionary into a pyspark.Row object while ensuring schema HelloWorldSchema compliance (shape, type and is-nullable condition are tested).
  • Once we have a pyspark.DataFrame we write it out to a parquet storage. The parquet schema is automatically derived from HelloWorldSchema.

Plain Python API

The petastorm.reader.Reader class is the main entry point for user code that accesses the data from an ML framework such as Tensorflow or Pytorch. The reader has multiple features such as:

  • Selective column readout
  • Multiple parallelism strategies: thread, process, single-threaded (for debug)
  • N-grams readout support
  • Row filtering (row predicates)
  • Shuffling
  • Partitioning for multi-GPU training
  • Local caching

Reading a dataset is simple using the petastorm.reader.Reader class which can be created using the petastorm.make_reader factory method:

from petastorm import make_reader

 with make_reader('hdfs://myhadoop/some_dataset') as reader:
    for row in reader:
        print(row)

hdfs://... and file://... are supported URL protocols.

Once a Reader is instantiated, you can use it as an iterator.

Tensorflow API

To hookup the reader into a tensorflow graph, you can use the tf_tensors function:

from petastorm.tf_utils import tf_tensors

with make_reader('file:///some/localpath/a_dataset') as reader:
   row_tensors = tf_tensors(reader)
   with tf.Session() as session:
       for _ in range(3):
           print(session.run(row_tensors))

Alternatively, you can use new tf.data.Dataset API;

from petastorm.tf_utils import make_petastorm_dataset

with make_reader('file:///some/localpath/a_dataset') as reader:
    dataset = make_petastorm_dataset(reader)
    iterator = dataset.make_one_shot_iterator()
    tensor = iterator.get_next()
    with tf.Session() as sess:
        sample = sess.run(tensor)
        print(sample.id)

Pytorch API

As illustrated in pytorch_example.py, reading a petastorm dataset from pytorch can be done via the adapter class petastorm.pytorch.DataLoader, which allows custom pytorch collating function and transforms to be supplied.

Be sure you have torch and torchvision installed:

pip install torchvision

The minimalist example below assumes the definition of a Net class and train and test functions, included in pytorch_example:

import torch
from petastorm.pytorch import DataLoader

torch.manual_seed(1)
device = torch.device('cpu')
model = Net().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

def _transform_row(mnist_row):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    return (transform(mnist_row['image']), mnist_row['digit'])


transform = TransformSpec(_transform_row, removed_fields=['idx'])

with DataLoader(make_reader('file:///localpath/mnist/train', num_epochs=10,
                            transform_spec=transform), batch_size=64) as train_loader:
    train(model, device, train_loader, 10, optimizer, 1)
with DataLoader(make_reader('file:///localpath/mnist/test', num_epochs=10,
                            transform_spec=transform), batch_size=1000) as test_loader:
    test(model, device, test_loader)

If you are working with very large batch sizes and do not need support for Decimal/strings we provide a petastorm.pytorch.BatchedDataLoader that can buffer using Torch tensors (cpu or cuda) with a signficantly higher throughput.

Spark Dataset Converter API

Spark converter API simplifies the data conversion from Spark to TensorFlow or PyTorch. The input Spark DataFrame is first materialized in the parquet format and then loaded as a tf.data.Dataset or torch.utils.data.DataLoader.

The minimalist example below assumes the definition of a compiled tf.keras model and a Spark DataFrame containing a feature column followed by a label column.

from petastorm.spark import SparkDatasetConverter, make_spark_converter
import tensorflow.compat.v1 as tf  # pylint: disable=import-error

# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'hdfs:/...')

df = ... # `df` is a spark dataframe

# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter(df)

# make a tensorflow dataset from `converter`
with converter.make_tf_dataset() as dataset:
    # the `dataset` is `tf.data.Dataset` object
    # dataset transformation can be done if needed
    dataset = dataset.map(...)
    # we can train/evaluate model on the `dataset`
    model.fit(dataset)
    # when exiting the context, the reader of the dataset will be closed

# delete the cached files of the dataframe.
converter.delete()

The minimalist example below assumes the definition of a Net class and train and test functions, included in pytorch_example.py, and a Spark DataFrame containing a feature column followed by a label column.

from petastorm.spark import SparkDatasetConverter, make_spark_converter

# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'hdfs:/...')

df_train, df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net()

# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter(df_train)
converter_test = make_spark_converter(df_test)

# make a pytorch dataloader from `converter_train`
with converter_train.make_torch_dataloader() as dataloader_train:
    # the `dataloader_train` is `torch.utils.data.DataLoader` object
    # we can train model using the `dataloader_train`
    train(model, dataloader_train, ...)
    # when exiting the context, the reader of the dataset will be closed

# the same for `converter_test`
with converter_test.make_torch_dataloader() as dataloader_test:
    test(model, dataloader_test, ...)

# delete the cached files of the dataframes.
converter_train.delete()
converter_test.delete()

Analyzing petastorm datasets using PySpark and SQL

A Petastorm dataset can be read into a Spark DataFrame using PySpark, where you can use a wide range of Spark tools to analyze and manipulate the dataset.

# Create a dataframe object from a parquet file
dataframe = spark.read.parquet(dataset_url)

# Show a schema
dataframe.printSchema()

# Count all
dataframe.count()

# Show a single column
dataframe.select('id').show()

SQL can be used to query a Petastorm dataset:

spark.sql(
   'SELECT count(id) '
   'from parquet.`file:///tmp/hello_world_dataset`').collect()

You can find a full code sample here: pyspark_hello_world.py,

Non Petastorm Parquet Stores

Petastorm can also be used to read data directly from Apache Parquet stores. To achieve that, use make_batch_reader (and not make_reader). The following table summarizes the differences make_batch_reader and make_reader functions.

make_reader make_batch_reader
Only Petastorm datasets (created using materializes_dataset) Any Parquet store (some native Parquet column types are not supported yet.
The reader returns one record at a time. The reader returns batches of records. The size of the batch is not fixed and defined by Parquet row-group size.
Predicates passed to make_reader are evaluated per single row. Predicates passed to make_batch_reader are evaluated per batch.
Can filter parquet file based on the filters argument. Can filter parquet file based on the filters argument

Troubleshooting

See the Troubleshooting page and please submit a ticket if you can't find an answer.

See also

  1. Gruener, R., Cheng, O., and Litvin, Y. (2018) Introducing Petastorm: Uber ATG's Data Access Library for Deep Learning. URL: https://eng.uber.com/petastorm/
  2. QCon.ai 2019: "Petastorm: A Light-Weight Approach to Building ML Pipelines".

How to Contribute

We prefer to receive contributions in the form of GitHub pull requests. Please send pull requests against the github.com/uber/petastorm repository.

  • If you are looking for some ideas on what to contribute, check out github issues and comment on the issue.
  • If you have an idea for an improvement, or you'd like to report a bug but don't have time to fix it please a create a github issue.

To contribute a patch:

  • Break your work into small, single-purpose patches if possible. It's much harder to merge in a large change with a lot of disjoint features.
  • Submit the patch as a GitHub pull request against the master branch. For a tutorial, see the GitHub guides on forking a repo and sending a pull request.
  • Include a detailed describtion of the proposed change in the pull request.
  • Make sure that your code passes the unit tests. You can find instructions how to run the unit tests here.
  • Add new unit tests for your code.

Thank you in advance for your contributions!

See the Development for development related information.

Comments
  • Leverage pyarrow predicate filtering

    Leverage pyarrow predicate filtering

    Pyarrow ParquetDataset supports predicate filtering. We should replace our own implementation to utilize theirs https://github.com/apache/arrow/blob/master/python/pyarrow/parquet.py#L789

    opened by rgruener 17
  • Unischema supports Parquet schema with more than 255 fields

    Unischema supports Parquet schema with more than 255 fields

    Many of our datasets have more than 255 fields. This commit provides an alternative namedtuple implementation 'namedtuple2' to support more than 255 fields with the Python 3.6 interpreter.

    opened by remysaissy 16
  • Pytorch example with DataLoader adapter, using MNIST data

    Pytorch example with DataLoader adapter, using MNIST data

    This code includes an MNIST dataset generator, a pytorch training example that uses the resulting dataset, and a simple README.md.

    As can be seen from the main.py, there are few limitations that come to light which could help us improve petastorm:

    • Batch shuffling
    • Support for custom transforms
    • Total data size (or some semblance of it?)

    Running pytorch/examples/mnist/main.py (in a Docker container) with the default 10 epoch yielded the following outcome (I just show the test output for the middle 8 epochs):

    ...
    Train Epoch: 1 [59520/60000 (99%)]	Loss: 0.505042
    
    Test set: Average loss: 0.2056, Accuracy: 9395/10000 (94%)
    
    ...
    Test set: Average loss: 0.1337, Accuracy: 9596/10000 (96%)
    Test set: Average loss: 0.1033, Accuracy: 9684/10000 (97%)
    Test set: Average loss: 0.0919, Accuracy: 9710/10000 (97%)
    Test set: Average loss: 0.0760, Accuracy: 9770/10000 (98%)
    Test set: Average loss: 0.0689, Accuracy: 9797/10000 (98%)
    Test set: Average loss: 0.0623, Accuracy: 9803/10000 (98%)
    Test set: Average loss: 0.0632, Accuracy: 9791/10000 (98%)
    Test set: Average loss: 0.0541, Accuracy: 9818/10000 (98%)
    
    ...
    Train Epoch: 10 [59520/60000 (99%)]	Loss: 0.040862
    
    Test set: Average loss: 0.0505, Accuracy: 9845/10000 (98%)
    
    real	3m3.021s
    user	20m4.680s
    sys	0m22.228s
    

    With the petastormed variant, the training accuracy looks on-par, with somewhat better runtime. I'll show just the test output:

    Test set: Average loss: 0.2035, Accuracy: 9385/10000 (94%)
    Test set: Average loss: 0.1326, Accuracy: 9591/10000 (96%)
    Test set: Average loss: 0.1040, Accuracy: 9675/10000 (97%)
    Test set: Average loss: 0.0887, Accuracy: 9705/10000 (97%)
    Test set: Average loss: 0.0761, Accuracy: 9752/10000 (98%)
    Test set: Average loss: 0.0715, Accuracy: 9774/10000 (98%)
    Test set: Average loss: 0.0627, Accuracy: 9797/10000 (98%)
    Test set: Average loss: 0.0606, Accuracy: 9810/10000 (98%)
    Test set: Average loss: 0.0582, Accuracy: 9824/10000 (98%)
    Test set: Average loss: 0.0548, Accuracy: 9828/10000 (98%)
    
    real	2m35.852s
    user	2m33.508s
    sys	0m6.576s
    
    opened by forbearer 16
  • Added tests for test_parquet_reader.py

    Added tests for test_parquet_reader.py

    Added tests for selecting specific columns and requesting invalid columns to test_parquet_reader. Modified specific column test to request specific column names, rather than regex patterns, so could select even columns rather than odd, so would always find at least one. Added comment explaining why regex patterns were a problem.

    opened by gregw18 15
  • Error reading parquet files made by AWS Athena

    Error reading parquet files made by AWS Athena

    I made a bunch of parquet files using an amazon athena CTAS query. I downloaded these files to first test locally (the end goal is to access the data from S3).

    If I run the code below;

    import s3fs
    from petastorm.reader import make_batch_reader
    from petastorm.tf_utils import make_petastorm_dataset
    
    dataset_url = "file:///Data/test-parquet"
    
    with make_batch_reader(dataset_url) as reader:
        dataset = make_petastorm_dataset(reader)
        for batch in dataset:
            break
    batch.correct
    

    I receive a lot of warnings and then an error in for batch in dataset

    pyarrow.lib.ArrowIOError: The file only has 1 row groups, requested metadata for row group: 1

    If 1 look at dataset.take(1) or something alike, I do see the correct schema of the table. However, I don't seem to be able to access the data.

    opened by RoelantStegmann 14
  • Add unit tests for compress in random shuffling buffer

    Add unit tests for compress in random shuffling buffer

    ~Compress remaining shuffling buffer should use remained size, that is, self._size.~

    self.size actually is a property decorator defined afterwards. I just change it to keep consistent with other places of code to improve readability.

    Also, I added some unit tests to check compress results.

    opened by chongxiaoc 13
  • Expose the flag to disable Ømq copy buffers

    Expose the flag to disable Ømq copy buffers

    One of our engineers found an optimization involving disabling ZeroMQ copy buffers in the ProcessWorker, but this is not exposed in the top-level factory methods, make_reader and make_batch_reader. It's useful, and probably should be.

    opened by dmcguire81 13
  • Problem with HelloWorld Example on Front Page of Repo

    Problem with HelloWorld Example on Front Page of Repo

    Hi I'm running the following code:

    from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
    from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
    from petastorm.etl.dataset_metadata import materialize_dataset
    from pyspark.sql.types import IntegerType
    import numpy as np
    from petastorm.fs_utils import FilesystemResolver
    
    resolver=FilesystemResolver(output_url + 'test', spark.sparkContext._jsc.hadoopConfiguration(),
                                 hdfs_driver='libhdfs')
    fact = resolver.filesystem_factory()
    
    HelloWorldSchema = Unischema('HelloWorldSchema', [
       UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
       UnischemaField('other_data', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
    ])
    
    
    def row_generator(x):
       """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
       return {'id': x,
               'other_data': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
    
    def generate_hello_world_dataset(output_url, spark, sc):
       rows_count = 1000
       rowgroup_size_mb = 256
    
       # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
       # well as save petastorm specific metadata
       with materialize_dataset(spark, url, HelloWorldSchema, rowgroup_size_mb, filesystem_factory=fact):
    
           rows_rdd = sc.parallelize(range(rows_count))\
               .map(row_generator)\
               .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
    
           spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema(), ) \
               .coalesce(10) \
               .write \
               .mode('overwrite') \
               .parquet(url)
        
    generate_hello_world_dataset(url, spark, sc)
    

    This is the only way that I can run with a libhdfs setup. I get the following error.

    org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 377, in main
        process()
      File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 372, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/serializers.py", line 393, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "/basedir/home/aredd/venvs/prometheus/lib64/python3.6/site-packages/petastorm/etl/dataset_metadata.py", line 216, in get_row_group_info
      File "/basedir/home/aredd/venvs/prometheus/lib64/python3.6/site-packages/petastorm/fs_utils.py", line 108, in <lambda>
      File "/basedir/tmp/mapred.tmp1/yarn/nm/usercache/username/appcache/application_1576215002453_189781/container_e15_1576215002453_189781_01_000003/PRO/pro/lib64/python3.6/site-packages/petastorm/hdfs/namenode.py", line 266, in hdfs_connect_namenode
        return pyarrow.hdfs.connect(hostname, url.port or 8020, driver=driver, user=user)
      File "/basedir/tmp/mapred.tmp1/yarn/nm/usercache/username/appcache/application_1576215002453_189781/container_e15_1576215002453_189781_01_000003/PRO/pro/lib64/python3.6/site-packages/pyarrow/hdfs.py", line 215, in connect
        extra_conf=extra_conf)
      File "/basedir/tmp/mapred.tmp1/yarn/nm/usercache/username/appcache/application_1576215002453_189781/container_e15_1576215002453_189781_01_000003/PRO/pro/lib64/python3.6/site-packages/pyarrow/hdfs.py", line 40, in __init__
        self._connect(host, port, user, kerb_ticket, driver, extra_conf)
      File "pyarrow/io-hdfs.pxi", line 105, in pyarrow.lib.HadoopFileSystem._connect
      File "pyarrow/error.pxi", line 80, in pyarrow.lib.check_status
    pyarrow.lib.ArrowIOError: HDFS connection failed
    
            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
            at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$class.foreach(Iterator.scala:891)
            at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
            at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
            at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
            at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
            at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
            at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
            at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
            at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
            at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
            at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
            at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
            at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:121)
            at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    

    Thanks in advance

    opened by andrewredd 13
  • Train-Test Dataset Split

    Train-Test Dataset Split

    Is there currently support for splitting a Petastorm dataset into train-test for PyTorch? In PyTorch, one would typically do this to a Dataset class but since Petastorm only has the classes Reader and DataLoader (as below), I wonder if this feature has been implemented.

    trainloader = DataLoader(make_reader('file://' + filename), batch_size=128)

    opened by seranotannason 13
  • Predicting is slow and sometimes doesn't even work.

    Predicting is slow and sometimes doesn't even work.

    Hi, I'm currently using PySpark 3.1.1 and I'm using petastorm to be able to use my TF models with Spark Dataframes. After much digging through the examples I'm struggling with some implementations. I'm trying to implement an AutoEncoder model and my dataset is as follows:

    +----------+-------------+--------------+------------+---------+--------------+----+
    |screw_id  |profile_1111|profile_2222   |profile_time|   gof   |profile_stepnr|rank|
    +----------+-------------+--------------+------------+---------+--------------+----+
    |12925510_1|0.0          |2.28          |1           |1.0      |0             |1   |
    |12925510_1|5.1          |0.0           |30          |1.0      |0             |1   |
    |12925510_1|10.3         |0.0           |40          |1.0      |0             |1   |
    |12925510_1|15.9         |0.0           |47          |1.0      |0             |1   |
    |12925510_1|21.0         |0.0           |52          |1.0      |0             |1   |
    |12925510_1|26.2         |2.16          |61          |1.0      |0             |1   |
    |12925510_1|31.4         |2.08          |68          |1.0      |0             |1   |
    |12925510_1|36.5         |2.2           |75          |1.0      |0             |1   |
    |12925510_1|41.7         |2.2           |87          |1.0      |0             |1   |
    +----------+-------------+--------------+------------+---------+--------------+----+
    

    After some feature engineering implemented via a pipeline my features get encoded into a vector format in a new column named "features". I create the AE model (I don't think is relevant for this use-case to post it here, but I can add it if needed) and then the spark converter for both my training and validation dataset:

    converter_train = make_spark_converter(train_tf.select('features')) converter_val = make_spark_converter(val_tf.select('features'))

    Using the examples provided in this repo I have implemented the train_and_evaluate function as shown next. If I'm not mistaken, for unsupervised learning where no labels are provided I should use my 'features' for both X and Y or it will complain that I did not provide the gradients for any variable:

    BATCH_SIZE = 2**11
    #Epochs set to 1 for testing purposes
    NUM_EPOCHS = 1
    import os
    import tensorflow as tf
    
    def train_and_evaluate(lr=0.001):
        model = get_compiled_model(lr)
        
    
        with converter_train.make_tf_dataset(batch_size=BATCH_SIZE) as train_dataset, \
               converter_val.make_tf_dataset(batch_size=BATCH_SIZE) as val_dataset:
            
            # tf.keras only accept tuples, not namedtuples
            train_dataset = train_dataset.map(lambda x: (x.features, x.features))
            steps_per_epoch = len(converter_train) // BATCH_SIZE
    
            val_dataset = val_dataset.map(lambda x: (x.features, x.features))
            validation_steps = max(1, len(converter_test) // BATCH_SIZE)
    
            print(f"steps_per_epoch: {steps_per_epoch}, validation_steps: {validation_steps}")
    
            hist = model.fit(train_dataset,
                             steps_per_epoch=steps_per_epoch,
                             epochs=NUM_EPOCHS,
                             validation_data=val_dataset,
                             validation_steps=validation_steps,
                             callbacks=ae_callback(),
                             verbose=2)
                    
            return hist.history['val_loss'][-1], hist.history['val_accuracy'][-1], model 
      
    loss, accuracy, model = train_and_evaluate()
    print("Validation Accuracy: {}".format(accuracy))
    

    The model trains "fine" (performance is not as good as it did in Pandas but I haven't spent much time calibrating it) and relatively fast (2/3 min). With this trained model I now want to infer on a new dataset:

    def pred():
        with converter_unit.make_tf_dataset(batch_size=BATCH_SIZE) as t_dataset:
            te_dataset = t_dataset.map(lambda x: (x.features, x.features))
            return model.predict(te_dataset, verbose=2)
    

    I run this function and never (or almost never) get the results and it never errors out. The test dataframe has only 400 lines so it should be pretty fast considering that training the model took only a couple min. Any suggestion ?

    opened by diogoribeiro09 12
  • Allow users to use s3, s3a and s3n protocols when saving / reading datasets

    Allow users to use s3, s3a and s3n protocols when saving / reading datasets

    s3, s3a and s3n url protocols can be explicitly specified when saving petatorm datasets.

    Fixed a bug on petastorm dataset write execution path previously preventing writing directly to s3 buckets.

    Tested: modified examples/generate_external_dataset.py and examples/python_hello_world.py to write/read from s3 bucket using s3a and s3n buckets (wasn't able to properly configure s3 authentication to check that). Was able to write/read data successfully.

    opened by selitvin 11
  • Make `make_spark_converter` supports creating converter from a saved dataframe path

    Make `make_spark_converter` supports creating converter from a saved dataframe path

    Signed-off-by: Weichen Xu [email protected]

    Make make_spark_converter supports creating converter from a saved dataframe path. In this case, we can skip the step of materializing spark dataframe that might be slow.

    opened by WeichenXu123 2
  • make_batch_reader Documentation out of date? seed?

    make_batch_reader Documentation out of date? seed?

    I am looking at the documentation here:

    https://petastorm.readthedocs.io/en/latest/api.html under make_batch_reader it mentions that there is a parameter seed

    I set this is my code:

    
    with peta_conv_train_df.make_torch_dataloader(transform_spec=transform_func,
                                                  num_epochs=1,
                                                  batch_size=test_batch_size,
                                                  cur_shard = 1,
                                                  shard_count = 2,
                                                  seed=10,
                                                  shard_seed=123,
                                                  reader_pool_type = pool_type) as reader:
    
    

    but I get:

    TypeError: make_batch_reader() got an unexpected keyword argument 'seed'
    

    It seems to exist in the source code though....

    Also what is the difference between seed and shard_seed

    opened by Data-drone 0
  • Petastorm sharding and setting batch sizes

    Petastorm sharding and setting batch sizes

    With sharding in petastorm ie:

    
    with peta_conv_train_df.make_torch_dataloader(transform_spec=transform_func,
                                                  num_epochs=1,
                                                  batch_size=test_batch_size,
                                                  cur_shard = curr_shard,
                                                  shard_count = num_shards,
                                                  reader_pool_type = pool_type) as reader:
    
    

    Is the batch_size what we want per GPU or for whole cluster. ie in the above if I had:

    test_batch_size = 64 then each shard gets 64 or each shard gets 64 / num_shards?

    opened by Data-drone 0
  • Prediction issue using Keras and TransformSpec with PySpark

    Prediction issue using Keras and TransformSpec with PySpark

    Hello, I am trying to get predictions from a Keras model with two inputs: sequence info and a regular covariate.

    Using the TransformSpec function, I preprocess sequences so that they have the same length, and for masking values.

    The model fits fine, but I have issues getting predictions.

    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    
    import tensorflow as tf
    
    from petastorm.spark import SparkDatasetConverter, make_spark_converter
    from petastorm import TransformSpec
    spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///dbfs/...')
    
    import numpy as np
    import pandas as pd
    
    # create data
    sequence = [[1, 1, 1, 1, 1], [2, 2, 2, 2], [3, 2, 2], [3, 3, 3], [3, 3, 3, 3, 3], [2, 2, 2, 2, 2]]
    y = [0, 1, 1, 2, 2, 1]
    x = [0.3, 0.1, 0.3, 0.5, 0.5, 0.1]
    df = pd.DataFrame({'y':y, 'x':x, 'sequence':sequence})
    sdf = spark.createDataFrame(df)
    target='y'
    all_features = ['sequence', 'x']
    
    # functions
    def preprocess(v, max_length=5):
        vv = list(v)
        vv = [0] * (max_length - len(vv)) + vv
        return np.array(vv)
    
    def format_sequence_data(pd_batch):
        pd_batch['sequence'] = pd_batch['sequence'].map(lambda x: preprocess(x))
        return pd_batch.loc[:,['sequence', 'x', 'y']]
        
    transform_spec_fn = TransformSpec(
      format_sequence_data, 
      edit_fields=[
            ('sequence', np.float32, (5,), False), 
            ('x', np.float32, (), False),
            ('y', np.int32, (), False)], 
      selected_fields=['sequence', 'x', 'y'])
    
    # petastorm
    df_converter = make_spark_converter(sdf)
    
    # model
    def createModel():
    
        seq_vec = tf.keras.Input(shape=(5,), name='sequence')
        e = tf.keras.layers.Embedding(input_dim=5, output_dim=5, 
            input_length=5, mask_zero=True, name='sequence_embedding')(seq_vec)
    
        x = tf.keras.Input(shape=(1,), name='x', dtype='float')
        x = tf.keras.layers.Normalization()(x)
    
        ml = tf.keras.layers.LSTM(10, return_sequences=True)(e)
        ml = tf.keras.layers.LSTM(5)(ml)
    
        combined = tf.keras.layers.Concatenate()([ml, x])
    
        mlp = tf.keras.layers.Dense(10)(combined)
        mlp = tf.keras.layers.Dense(5)(mlp)
        mlp = tf.keras.layers.Dense(3, activation='softmax')(mlp)
    
        model = tf.keras.Model([seq_vec, x], mlp)
        model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics='accuracy')
        return model
    
    model = createModel()
    
    # training
    batch_size=1
    def train_and_evaluate(): 
        with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=batch_size) as data:
            data = data.map(lambda x: (tuple(getattr(x, col) for col in ['sequence', 'x']), getattr(x, target)))
            steps_per_epoch = int(len(df_converter) / batch_size)
    
            history = model.fit(data, 
                steps_per_epoch=steps_per_epoch,
                epochs=10,
                shuffle=False,
                verbose=2)
    
        return history
    
    history = train_and_evaluate()
    

    For prediction, I use:

    # udf function for prediction (pyspark)
    def model_prediction_prob_udf(model):
      def predict(input_batch_iter):
        for input_batch in input_batch_iter:
            input_batch['sequence'] = input_batch['sequence'].map(lambda x: preprocess(x))
            preds = model.predict([input_batch.loc[:,c] for c in all_features], batch_size=1000)
            yield pd.Series(preds.tolist())
      return_type = T.ArrayType(T.DoubleType())
      return F.pandas_udf(return_type, F.PandasUDFType.SCALAR_ITER)(predict) 
    
    pred_prob_udf = model_prediction_prob_udf(model)
    
    pred = sdf.withColumn('features', F.struct(all_features))
    pred = pred.withColumn('prediction_prob', pred_prob_udf(F.col('features')))
    display(pred)
    

    I get the error:

    'ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type numpy.ndarray).'

    If I use something like the code below, prediction takes forever:

    with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=1) as data:
            data = data.map(lambda x: ((x.sequence, x.x),))
            tt = model.predict(data)
    

    Any ideas or suggestions on how to fix this? Thanks!

    opened by sdaza 0
  • when hdfs-site.xml file has xi:include tag, the function cann't get hadoop_configuration info

    when hdfs-site.xml file has xi:include tag, the function cann't get hadoop_configuration info

    https://github.com/uber/petastorm/blob/170b22a18ee1c0346d2b289f096804e34a0c5d25/petastorm/hdfs/namenode.py#L67 hdfs configuration like this,which configuration info is in a web file;And now the code cann't load the configuration correctly

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration xmlns:xi="http://www.w3.org/2001/XInclude">
    <xi:include href="http://10.*.*.*:*/us-hadoop-2.7.2/hdfs-site.xml">
          <xi:fallback>
             <xi:include href="http://10.*.*.*:*/us-hadoop-2.7.2/hdfs-site.xml"/>
          </xi:fallback>
    </xi:include>
    </configuration>
    
    opened by lytk01 0
  • Random seed doesn't seem to work well

    Random seed doesn't seem to work well

    Im new with petastorm and Im facing some issues. I need to iterate over a dataset getting three equals batches to transform 2 of them to extract some info. The dataset consist on users ratings movies (like the Movie-Lens dataset). I need to get three batches with the same ratings(rows) to extract each user(in ratings the user could appear repeated) and extract each movie rated. I write this code.

    Creating fake dataset and spark converter:

    ratings_l = [
        {'uid_dec': 0, 'mid_dec': 6, 'eval': 2.18},
        {'uid_dec': 0, 'mid_dec': 7, 'eval': 3.83},
        {'uid_dec': 0, 'mid_dec': 8, 'eval': 3.94},
        {'uid_dec': 0, 'mid_dec': 9, 'eval': 4.31},
        {'uid_dec': 0, 'mid_dec': 10, 'eval': 4.48},
        {'uid_dec': 0, 'mid_dec': 11, 'eval': 3.74},
        {'uid_dec': 1, 'mid_dec': 6, 'eval': 3.21},
        {'uid_dec': 1, 'mid_dec': 7, 'eval': 2.05},
        {'uid_dec': 1, 'mid_dec': 8, 'eval': 2.24},
        {'uid_dec': 1, 'mid_dec': 9, 'eval': 2.08},
        {'uid_dec': 1, 'mid_dec': 10, 'eval': 4.94},
        {'uid_dec': 1, 'mid_dec': 11, 'eval': 4.22},
        {'uid_dec': 2, 'mid_dec': 6, 'eval': 3.52},
        {'uid_dec': 2, 'mid_dec': 7, 'eval': 2.67},
        {'uid_dec': 2, 'mid_dec': 8, 'eval': 2.69},
        {'uid_dec': 2, 'mid_dec': 9, 'eval': 2.75},
        {'uid_dec': 2, 'mid_dec': 10, 'eval': 4.93},
        {'uid_dec': 2, 'mid_dec': 11, 'eval': 2.9},
        {'uid_dec': 3, 'mid_dec': 6, 'eval': 2.0},
        {'uid_dec': 3, 'mid_dec': 7, 'eval': 2.9},
        {'uid_dec': 3, 'mid_dec': 8, 'eval': 4.74},
        {'uid_dec': 3, 'mid_dec': 9, 'eval': 2.5},
        {'uid_dec': 3, 'mid_dec': 10, 'eval': 2.18},
        {'uid_dec': 3, 'mid_dec': 11, 'eval': 4.93},
        {'uid_dec': 4, 'mid_dec': 6, 'eval': 4.46},
        {'uid_dec': 4, 'mid_dec': 7, 'eval': 2.23},
        {'uid_dec': 4, 'mid_dec': 8, 'eval': 4.42},
        {'uid_dec': 4, 'mid_dec': 9, 'eval': 4.67},
        {'uid_dec': 4, 'mid_dec': 10, 'eval': 2.65},
        {'uid_dec': 4, 'mid_dec': 11, 'eval': 2.11},
        {'uid_dec': 5, 'mid_dec': 6, 'eval': 2.31},
        {'uid_dec': 5, 'mid_dec': 7, 'eval': 2.69},
        {'uid_dec': 5, 'mid_dec': 8, 'eval': 2.41},
        {'uid_dec': 5, 'mid_dec': 9, 'eval': 4.62},
        {'uid_dec': 5, 'mid_dec': 10, 'eval': 3.96},
        {'uid_dec': 5, 'mid_dec': 11, 'eval': 2.23}
    ]
    
    train_ds = spark.createDataFrame(ratings_l)
    
    conv_train = make_spark_converter(train_ds)
    

    Get three batches from the same converter(hoping they are the same):

    epochs = 4
    batch_size = 6
    with conv_train.make_tf_dataset(batch_size=batch_size, num_epochs=epochs, seed=1) as train, \
         conv_train.make_tf_dataset(batch_size=batch_size, num_epochs=epochs, seed=1) as train1, \
         conv_train.make_tf_dataset(batch_size=batch_size, num_epochs=epochs, seed=1) as train2:
         epoch_eval = True
         for i, (b, b1, b2) in enumerate(zip(train, train1, train2)):
            if i%(36//batch_size) == 0:
                print('==========Epoch==========: {0}'.format(i//(36//batch_size)))
            print('==========Group of Batches  {}:'.format(i%(36//batch_size)))
            print(b[0].numpy())
            print(b1[0].numpy())
            print(b2[0].numpy())
    

    This is the output:

    ==========Epoch==========: 0
    ==========Group of Batches 0:
    [2.   2.9  4.74 2.5  2.18 4.93]
    [2.18 3.83 3.94 4.31 4.48 3.74]
    [2.   2.9  4.74 2.5  2.18 4.93]
    ==========Group of Batches  1: 
    [4.46 2.23 4.42 4.67 2.65 2.11]
    [3.21 2.05 2.24 2.08 4.94 4.22]
    [4.46 2.23 4.42 4.67 2.65 2.11]
    ==========Group of Batches 2:
    [2.31 2.69 2.41 4.62 3.96 2.23]
    [3.52 2.67 2.69 2.75 4.93 2.9 ]
    [2.31 2.69 2.41 4.62 3.96 2.23]
    ==========Group of Batches 3:
    [2.18 3.83 3.94 4.31 4.48 3.74]
    [2.18 3.83 3.94 4.31 4.48 3.74]
    [2.18 3.83 3.94 4.31 4.48 3.74]
    ==========Group of Batches 4:
    [3.21 2.05 2.24 2.08 4.94 4.22]
    [3.21 2.05 2.24 2.08 4.94 4.22]
    [3.21 2.05 2.24 2.08 4.94 4.22]
    ==========Group of Batches 5:
    [3.52 2.67 2.69 2.75 4.93 2.9 ]
    [3.52 2.67 2.69 2.75 4.93 2.9 ]
    [3.52 2.67 2.69 2.75 4.93 2.9 ]
    ==========Epoch==========: 1
    ==========Group of Batches 0:
    [2.18 3.83 3.94 4.31 4.48 3.74]
    [2.   2.9  4.74 2.5  2.18 4.93]
    [2.18 3.83 3.94 4.31 4.48 3.74]
    ==========Group of Batches 1:
    [3.21 2.05 2.24 2.08 4.94 4.22]
    [4.46 2.23 4.42 4.67 2.65 2.11]
    [3.21 2.05 2.24 2.08 4.94 4.22]
    ==========Group of Batches 2:
    [3.52 2.67 2.69 2.75 4.93 2.9 ]
    [2.31 2.69 2.41 4.62 3.96 2.23]
    [3.52 2.67 2.69 2.75 4.93 2.9 ]
    ==========Group of Batches 3:
    [2.   2.9  4.74 2.5  2.18 4.93]
    [2.   2.9  4.74 2.5  2.18 4.93]
    [2.   2.9  4.74 2.5  2.18 4.93]
    ==========Group of Batches 4:
    [4.46 2.23 4.42 4.67 2.65 2.11]
    [4.46 2.23 4.42 4.67 2.65 2.11]
    [4.46 2.23 4.42 4.67 2.65 2.11]
    ==========Group of Batches 5:
    [2.31 2.69 2.41 4.62 3.96 2.23]
    [2.31 2.69 2.41 4.62 3.96 2.23]
    [2.31 2.69 2.41 4.62 3.96 2.23]
    

    The question is: Why in some groups the batched are differen, for example in the Epoch1, Group of Batches2 ?. The expected behavior is that all batches be always the same like in Epoch 1, Group of Batches3,4 and 5.

    opened by kisel4363 2
Releases(v0.12.1)
Owner
Uber Open Source
Open Source Software at Uber
Uber Open Source
Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray

A unified Data Analytics and AI platform for distributed TensorFlow, Keras and PyTorch on Apache Spark/Flink & Ray What is Analytics Zoo? Analytics Zo

null 2.5k Dec 28, 2022
BigDL: Distributed Deep Learning Framework for Apache Spark

BigDL: Distributed Deep Learning on Apache Spark What is BigDL? BigDL is a distributed deep learning library for Apache Spark; with BigDL, users can w

null 4.1k Jan 9, 2023
A fast, scalable, high performance Gradient Boosting on Decision Trees library, used for ranking, classification, regression and other machine learning tasks for Python, R, Java, C++. Supports computation on CPU and GPU.

Website | Documentation | Tutorials | Installation | Release Notes CatBoost is a machine learning method based on gradient boosting over decision tree

CatBoost 6.9k Jan 5, 2023
pure-predict: Machine learning prediction in pure Python

pure-predict speeds up and slims down machine learning prediction applications. It is a foundational tool for serverless inference or small batch prediction with popular machine learning frameworks like scikit-learn and fasttext. It implements the predict methods of these frameworks in pure Python.

Ibotta 84 Dec 29, 2022
Distributed scikit-learn meta-estimators in PySpark

sk-dist: Distributed scikit-learn meta-estimators in PySpark What is it? sk-dist is a Python package for machine learning built on top of scikit-learn

Ibotta 282 Dec 9, 2022
Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

eXtreme Gradient Boosting Community | Documentation | Resources | Contributors | Release Notes XGBoost is an optimized distributed gradient boosting l

Distributed (Deep) Machine Learning Community 23.6k Jan 3, 2023
DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective.

DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective. 10x Larger Models 10x Faster Trainin

Microsoft 8.4k Dec 30, 2022
Apache Liminal is an end-to-end platform for data engineers & scientists, allowing them to build, train and deploy machine learning models in a robust and agile way

Apache Liminals goal is to operationalise the machine learning process, allowing data scientists to quickly transition from a successful experiment to an automated pipeline of model training, validation, deployment and inference in production. Liminal provides a Domain Specific Language to build ML workflows on top of Apache Airflow.

The Apache Software Foundation 121 Dec 28, 2022
A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.

Light Gradient Boosting Machine LightGBM is a gradient boosting framework that uses tree based learning algorithms. It is designed to be distributed a

Microsoft 14.5k Jan 7, 2023
Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark environment.

pyspark-anonymizer Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark envir

null 6 Jun 30, 2022
WAGMA-SGD is a decentralized asynchronous SGD for distributed deep learning training based on model averaging.

WAGMA-SGD is a decentralized asynchronous SGD based on wait-avoiding group model averaging. The synchronization is relaxed by making the collectives externally-triggerable, namely, a collective can be initiated without requiring that all the processes enter it. It partially reduces the data within non-overlapping groups of process, improving the parallel scalability.

Shigang Li 6 Jun 18, 2022
XGBoost-Ray is a distributed backend for XGBoost, built on top of distributed computing framework Ray.

XGBoost-Ray is a distributed backend for XGBoost, built on top of distributed computing framework Ray.

null 92 Dec 14, 2022
A collection of interactive machine-learning experiments: 🏋️models training + 🎨models demo

?? Interactive Machine Learning experiments: ??️models training + ??models demo

Oleksii Trekhleb 1.4k Jan 6, 2023
Python Extreme Learning Machine (ELM) is a machine learning technique used for classification/regression tasks.

Python Extreme Learning Machine (ELM) Python Extreme Learning Machine (ELM) is a machine learning technique used for classification/regression tasks.

Augusto Almeida 84 Nov 25, 2022
SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker.

SageMaker Python SDK SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker. With the S

Amazon Web Services 1.8k Jan 1, 2023
Microsoft Machine Learning for Apache Spark

Microsoft Machine Learning for Apache Spark MMLSpark is an ecosystem of tools aimed towards expanding the distributed computing framework Apache Spark

Microsoft Azure 3.9k Dec 30, 2022
TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.

TensorFlowOnSpark TensorFlowOnSpark brings scalable deep learning to Apache Hadoop and Apache Spark clusters. By combining salient features from the T

Yahoo 3.8k Jan 4, 2023
[DEPRECATED] Tensorflow wrapper for DataFrames on Apache Spark

TensorFrames (Deprecated) Note: TensorFrames is deprecated. You can use pandas UDF instead. Experimental TensorFlow binding for Scala and Apache Spark

Databricks 757 Dec 31, 2022
DistML is a Ray extension library to support large-scale distributed ML training on heterogeneous multi-node multi-GPU clusters

DistML is a Ray extension library to support large-scale distributed ML training on heterogeneous multi-node multi-GPU clusters

null 27 Aug 19, 2022