Distributed scikit-learn meta-estimators in PySpark

Overview
sk-dist

sk-dist: Distributed scikit-learn meta-estimators in PySpark

License Build Status PyPI Package Downloads Python Versions

What is it?

sk-dist is a Python package for machine learning built on top of scikit-learn and is distributed under the Apache 2.0 software license. The sk-dist module can be thought of as "distributed scikit-learn" as its core functionality is to extend the scikit-learn built-in joblib parallelization of meta-estimator training to spark. A popular use case is the parallelization of grid search as shown here:

sk-dist

Check out the blog post for more information on the motivation and use cases of sk-dist.

Main Features

  • Distributed Training - sk-dist parallelizes the training of scikit-learn meta-estimators with PySpark. This allows distributed training of these estimators without any constraint on the physical resources of any one machine. In all cases, spark artifacts are automatically stripped from the fitted estimator. These estimators can then be pickled and un-pickled for prediction tasks, operating identically at predict time to their scikit-learn counterparts. Supported tasks are:
  • Distributed Prediction - sk-dist provides a prediction module which builds vectorized UDFs for PySpark DataFrames using fitted scikit-learn estimators. This distributes the predict and predict_proba methods of scikit-learn estimators, enabling large scale prediction with scikit-learn.
  • Feature Encoding - sk-dist provides a flexible feature encoding utility called Encoderizer which encodes mix-typed feature spaces using either default behavior or user defined customizable settings. It is particularly aimed at text features, but it additionally handles numeric and dictionary type feature spaces.

Installation

Dependencies

sk-dist requires:

Dependency Notes

  • versions of numpy, scipy and joblib that are compatible with any supported version of scikit-learn should be sufficient for sk-dist
  • sk-dist is not supported with Python 2

Spark Dependencies

Most sk-dist functionality requires a spark installation as well as PySpark. Some functionality can run without spark, so spark related dependencies are not required. The connection between sk-dist and spark relies solely on a sparkContext as an argument to various sk-dist classes upon instantiation.

A variety of spark configurations and setups will work. It is left up to the user to configure their own spark setup. The testing suite runs spark 2.4 and spark 3.0, though any spark 2.0+ versions are expected to work.

Additional spark related dependecies are pyarrow, which is used only for skdist.predict functions. This uses vectorized pandas UDFs which require pyarrow>=0.8.0, tested with pyarrow==0.16.0. Depending on the spark version, it may be necessary to set spark.conf.set("spark.sql.execution.arrow.enabled", "true") in the spark configuration.

User Installation

The easiest way to install sk-dist is with pip:

pip install --upgrade sk-dist

You can also download the source code:

git clone https://github.com/Ibotta/sk-dist.git

Testing

With pytest installed, you can run tests locally:

pytest sk-dist

Examples

The package contains numerous examples on how to use sk-dist in practice. Examples of note are:

Gradient Boosting

sk-dist has been tested with a number of popular gradient boosting packages that conform to the scikit-learn API. This includes xgboost and catboost. These will need to be installed in addition to sk-dist on all nodes of the spark cluster via a node bootstrap script. Version compatibility is left up to the user.

Support for lightgbm is not guaranteed, as it requires additional installations on all nodes of the spark cluster. This may work given proper installation but has not beed tested with sk-dist.

Background

The project was started at Ibotta Inc. on the machine learning team and open sourced in 2019.

It is currently maintained by the machine learning team at Ibotta. Special thanks to those who contributed to sk-dist while it was initially in development at Ibotta:

Thanks to James Foley for logo artwork.

IbottaML
Comments
  • OSError: [WinError 123]: Spark on Windows local

    OSError: [WinError 123]: Spark on Windows local

    I'm trying to execute an expample:

    spark Version: 2.3.4

    import timefrom sklearn import datasets, svm
    from skdist.distribute.search import DistGridSearchCV
    from pyspark.sql import SparkSession # instantiate spark session
    spark = (   
        SparkSession    
        .builder    
        .getOrCreate()    
        )
    sc = spark.sparkContext # the digits dataset
    digits = datasets.load_digits()
    X = digits["data"]
    y = digits["target"] # create a classifier: a support vector classifier
    classifier = svm.SVC()
    param_grid = {
        "C": [0.01, 0.01, 0.1, 1.0, 10.0, 20.0, 50.0], 
        "gamma": ["scale", "auto", 0.001, 0.01, 0.1], 
        "kernel": ["rbf", "poly", "sigmoid"]
        }
    scoring = "f1_weighted"
    cv = 10# hyperparameter optimization
    start = time.time()
    model = DistGridSearchCV(    
        classifier, param_grid,     
        sc=sc, cv=cv, scoring=scoring,
        verbose=True    
        )
    

    when I try to train my model, model.fit(X,y) it's fails with

    OSError: [WinError 123] Die Syntax für den Dateinamen, Verzeichnisnamen oder die Datenträgerbezeichnung ist falsch: 'C:\C:\spark\jars\spark-core_2.11-2.3.4.jar'

    The Spak_Home is set as "C:\spark" and the PATH %SPARK_HOME%\bin

    Without SparkContext I'm able to run the code

    model = DistGridSearchCV(    
        classifier, param_grid,     
        **sc=sc**, cv=cv, scoring=scoring)
    
    

    to:

    model = DistGridSearchCV(    
        classifier, param_grid,     
        cv=cv, scoring=scoring)
    
    

    I did also try to pass the "spakHome" variable when defnied the SparkContext without the driver letter "C:"

    SparkContext(appName="Dist_Exmp",sparkHome="spark") sc.sparkHome --> spark

    But the variable is taken from the EvnVar.

    Here the whole Trace:

    `--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in ----> 1 model.fit(X_train, y_train)

    C:\MeineProgramme\anaconda3\lib\site-packages\skdist\distribute\search.py in fit(self, X, y, groups, **fit_params) 367 base_estimator_ = self.sc.broadcast(base_estimator) 368 partitions = _parse_partitions(self.partitions, len(fit_sets)) --> 369 out = self.sc.parallelize(fit_sets, numSlices=partitions).map(lambda x: [x[0], fit_and_score( 370 base_estimator, X, y, scorers, x[2][0], x[2][1], 371 verbose, x[1], fit_params=fit_params,

    C:\MeineProgramme\anaconda3\lib\site-packages\pyspark\rdd.py in collect(self) 812 """ 813 with SCCallSiteSync(self.context) as css: --> 814 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 815 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 816

    C:\MeineProgramme\anaconda3\lib\site-packages\py4j\java_gateway.py in call(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args:

    C:\MeineProgramme\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()

    C:\MeineProgramme\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(

    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 6, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 240, in main File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 60, in read_command File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 171, in read_with_length return self.loads(obj) File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 566, in loads return pickle.loads(obj, encoding=encoding) File "C:\MeineProgramme\anaconda3\lib\site-packages\skdist\distribute\search.py", line 14, in from sklearn.model_selection import ( File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_init.py", line 19, in from .validation import cross_val_score File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_validation.py", line 27, in from ..metrics.scorer import check_scoring, check_multimetric_scoring File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics_init.py", line 7, in from .ranking import auc File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in from ..preprocessing import label_binarize File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_init.py", line 6, in from .function_transformer import FunctionTransformer File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_function_transformer.py", line 5, in from ..utils.testing import assert_allclose_dense_sparse File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in import pytest File "C:\MeineProgramme\anaconda3\lib\site-packages\pytest.py", line 6, in from pytest.assertion import register_assert_rewrite File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion_init.py", line 7, in from pytest.assertion import rewrite File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\rewrite.py", line 26, in from pytest.assertion import util File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\util.py", line 8, in import pytest.code File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code_init.py", line 2, in from .code import Code # noqa File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code\code.py", line 23, in import pluggy File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy_init.py", line 16, in from .manager import PluginManager, PluginValidationError File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy\manager.py", line 11, in import importlib_metadata File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 547, in version = version(name) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 509, in version return distribution(distribution_name).version File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 482, in distribution return Distribution.from_name(distribution_name) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 183, in from_name dist = next(dists, None) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 425, in for path in map(cls.switch_path, paths) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 449, in _search_path if not root.is_dir(): File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1358, in is_dir return S_ISDIR(self.stat().st_mode) File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1168, in stat return self._accessor.stat(self) OSError: [WinError 123] Die Syntax für den Dateinamen, Verzeichnisnamen oder die Datenträgerbezeichnung ist falsch: 'C:\C:\spark\jars\spark-core_2.11-2.3.4.jar'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:336)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:475)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:458)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    

    Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 240, in main File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 60, in read_command File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 171, in read_with_length return self.loads(obj) File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 566, in loads return pickle.loads(obj, encoding=encoding) File "C:\MeineProgramme\anaconda3\lib\site-packages\skdist\distribute\search.py", line 14, in from sklearn.model_selection import ( File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_init.py", line 19, in from .validation import cross_val_score File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_validation.py", line 27, in from ..metrics.scorer import check_scoring, check_multimetric_scoring File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics_init.py", line 7, in from .ranking import auc File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in from ..preprocessing import label_binarize File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_init.py", line 6, in from .function_transformer import FunctionTransformer File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_function_transformer.py", line 5, in from ..utils.testing import assert_allclose_dense_sparse File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in import pytest File "C:\MeineProgramme\anaconda3\lib\site-packages\pytest.py", line 6, in from pytest.assertion import register_assert_rewrite File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion_init.py", line 7, in from pytest.assertion import rewrite File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\rewrite.py", line 26, in from pytest.assertion import util File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\util.py", line 8, in import pytest.code File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code_init.py", line 2, in from .code import Code # noqa File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code\code.py", line 23, in import pluggy File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy_init.py", line 16, in from .manager import PluginManager, PluginValidationError File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy\manager.py", line 11, in import importlib_metadata File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 547, in version = version(name) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 509, in version return distribution(distribution_name).version File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 482, in distribution return Distribution.from_name(distribution_name) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 183, in from_name dist = next(dists, None) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 425, in for path in map(cls.switch_path, paths) File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 449, in _search_path if not root.is_dir(): File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1358, in is_dir return S_ISDIR(self.stat().st_mode) File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1168, in stat return self._accessor.stat(self) OSError: [WinError 123] Die Syntax für den Dateinamen, Verzeichnisnamen oder die Datenträgerbezeichnung ist falsch: 'C:\C:\spark\jars\spark-core_2.11-2.3.4.jar'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:336)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:475)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:458)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more`
    
    bug 
    opened by progsurfer 13
  • Simple Voter Class Error - TypeError: Cannot cast array data from dtype('O') to dtype('int64') according to the rule 'safe'

    Simple Voter Class Error - TypeError: Cannot cast array data from dtype('O') to dtype('int64') according to the rule 'safe'

    After creating a pipeline with my best models and including the feature-transformer I used to create the training data... I get the following error:

    TypeError: Cannot cast array data from dtype('O') to dtype('int64') according to the rule 'safe'

    My feature pipeline makes use of StandardScaler, OneHotEncoder, HashingVectorizer and uses make_column_transformer to make a Pandas column transformer.

    ensemble_pipeline = make_pipeline(
        feature_pipeline,
        SimpleVoter(
        best_models, 
        classes=best_models[0][1].classes_, voting="hard"
        ),
    )
    

    Any idea the reason behind this error?

    Thanks!

    opened by S-C-H 5
  • Update Travis References

    Update Travis References

    Pull Request Template

    Updating references of travis-ci.org to travis-ci.com

    Description

    On June 30, travis-ci.org will shut off. The new url will be travis-ci.com. I've done the migration to the new url already. This pr updates any references to travis-ci.org to travis-ci.com

    Types of changes

    • [x] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
    • [x] I have read the CONTRIBUTING document.
    • [x] I have added reviewers to the PR.
    • [x] I have added tests to cover my changes.
    • [x] All new and existing tests passed.
    opened by rpcrimi 2
  • Python REPL connection issues

    Python REPL connection issues

    Describe the bug

    When performing a GridSearch (or distributed or randomised), the tasks finish but the connection is never returned. Eventually the connection is closed with an out-of-memory exception. It does not appear to be an out-of-memory error as the tasks complete.

    To Reproduce Steps to reproduce the behavior:

    Train a large Tree Based model across worker nodes in a grid search. It seems to only occur when a larger amount of time is required to construct the model.

    Expected behavior

    Additional context

    Seems related to: https://github.com/apache/spark/pull/24898

    I replaced sk-dist with joblibspark, trained the same model which completed (joblib reports tasks done) and received the error:

    because pyspark py4j is not in pinned thread mode, we could not terminate running spark jobs correctly.

    bug 
    opened by S-C-H 2
  • PicklingError: Could not serialize broadcast

    PicklingError: Could not serialize broadcast

    When I use a Tree Method (such as ExtraTrees or RandomForest) rather than another classification algorithm, I seem to run into the following error:

    PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB

    predict = get_prediction_udf(model, method="predict", feature_type="pandas", names=list(X_train.columns))
    
    cols = [F.col(str(c)) for c in list(X_train.columns)]
    # apply predict UDFs and select prediction output
    prediction_df = (
        unlabelled
        .withColumn("pred", predict(*cols))
        )
    
    

    Traceback below:

    ---------------------------------------------------------------------------
    OverflowError                             Traceback (most recent call last)
    /databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
        120         try:
    --> 121             pickle.dump(value, f, 2)
        122         except pickle.PickleError:
    
    OverflowError: cannot serialize a string larger than 4GiB
    
    During handling of the above exception, another exception occurred:
    
    PicklingError                             Traceback (most recent call last)
    <command-1349179484758480> in <module>()
          3 prediction_df = (
          4     unlabelled
    ----> 5     .withColumn("pred", predict(*cols))
          6     )
    
    /databricks/spark/python/pyspark/sql/udf.py in wrapper(*args)
        194         @functools.wraps(self.func, assigned=assignments)
        195         def wrapper(*args):
    --> 196             return self(*args)
        197 
        198         wrapper.__name__ = self._name
    
    /databricks/spark/python/pyspark/sql/udf.py in __call__(self, *cols)
        172 
        173     def __call__(self, *cols):
    --> 174         judf = self._judf
        175         sc = SparkContext._active_spark_context
        176         return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    
    /databricks/spark/python/pyspark/sql/udf.py in _judf(self)
        156         # and should have a minimal performance impact.
        157         if self._judf_placeholder is None:
    --> 158             self._judf_placeholder = self._create_judf()
        159         return self._judf_placeholder
        160 
    
    /databricks/spark/python/pyspark/sql/udf.py in _create_judf(self)
        165         sc = spark.sparkContext
        166 
    --> 167         wrapped_func = _wrap_function(sc, self.func, self.returnType)
        168         jdt = spark._jsparkSession.parseDataType(self.returnType.json())
        169         judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    
    /databricks/spark/python/pyspark/sql/udf.py in _wrap_function(sc, func, returnType)
         33 def _wrap_function(sc, func, returnType):
         34     command = (func, returnType)
    ---> 35     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
         36     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
         37                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    
    /databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
       2464     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
       2465         # The broadcast will have same life cycle as created PythonRDD
    -> 2466         broadcast = sc.broadcast(pickled_command)
       2467         pickled_command = ser.dumps(broadcast)
       2468     broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
    
    /databricks/spark/python/pyspark/context.py in broadcast(self, value)
        897         be sent to each cluster only once.
        898         """
    --> 899         return Broadcast(self, value, self._pickled_broadcast_vars)
        900 
        901     def accumulator(self, value, accum_param=None):
    
    /databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value, pickle_registry, path, sock_file)
         97                 # no encryption, we can just write pickled data directly to the file from python
         98                 broadcast_out = f
    ---> 99             self.dump(value, broadcast_out)
        100             if sc._encryption_enabled:
        101                 self._python_broadcast.waitTillDataReceived()
    
    /databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
        126                   % (e.__class__.__name__, _exception_message(e))
        127             print_exec(sys.stderr)
    --> 128             raise pickle.PicklingError(msg)
        129         f.close()
        130 
    
    PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB
    
    
    opened by S-C-H 2
  • cannot get answer with LGBMClassifier

    cannot get answer with LGBMClassifier

    I change xgb to lgb but can't get any return

    it cost 1sec on GridSearchCV

    grid=dict(num_leaves=[8,15,31],
         n_estimators=[100, 200, 300])
    for _ in trange(1):
        model_lgb = GridSearchCV(
            LGBMClassifier(),
            grid, n_jobs=4, cv=3
            )
        model_lgb.fit(X,y)
    

    but no return in 10 min with DistGridSearchCV

    grid=dict(num_leaves=[8,15,31],
         n_estimators=[100, 200, 300],
             n_jobs=1)
    for _ in trange(1):
        model_lgb = DistGridSearchCV(
            LGBMClassifier(),
            grid, sc, cv=3,n_jobs=1
            )
        model_lgb.fit(X,y)
    
    bug 
    opened by jiahengqi 2
  • Full pipeline tuning?

    Full pipeline tuning?

    Does DistGridSearchCV allow for full pipeline tuning? (it doesn't appear to unless I am missing something?)

    It would be pretty useful to tune some of the feature extraction parts e.g. number of ngram -hashed features.

    • would just like to state: well done with this project. It's excellent and I appreciate that care and time has been taken to ensure it works as intended.
    opened by S-C-H 2
  • update spark 2.4 to 2.4.8 in test build

    update spark 2.4 to 2.4.8 in test build

    Update Spark 2.4 Version

    Description

    Spark 2.4 version updated at source. We need to update the address and naming of the distribution.

    Motivation and Context

    Need a valid spark 2.4 version for test suite

    How Has This Been Tested?

    Types of changes

    • [x] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [ ] My change requires a change to the documentation.
    • [ ] I have updated the documentation accordingly.
    • [x] I have read the CONTRIBUTING document.
    • [x] I have added reviewers to the PR.
    • [ ] I have added tests to cover my changes.
    • [x] All new and existing tests passed.
    opened by denver1117 1
  • _index_param_value not defined when searching params for CatBoostClassifier

    _index_param_value not defined when searching params for CatBoostClassifier

    Getting a NameError: name '_index_param_value' is not defined error while fitting DistRandomizedSearchCV on a CatBoostClassifier model. Here is some sample code:

    params = {
        'depth': [4,6,8],
        'learning_rate': [0.02,0.05,0.1],
        'l2_leaf_reg': [1,5,10],
        'random_strength': [1,5,10],
        'bagging_temperature': [1,5,10],
        'one_hot_max_size': [2,10,50],
        'logging_level': ['Silent'],
        'use_best_model': [True],
        'od_type': ['Iter'],
        'od_wait': [20],
        'eval_metric': ['F1'],
        'iterations': [2000],
        'scale_pos_weight': [scale_multiplier],
        'colsample_bylevel': [0.6,0.8,1],
        'random_state': [42],
        'cat_features': [categorical_features_indices]
        }
    
    model = DistRandomizedSearchCV(
        CatBoostClassifier(),
        params, sc, cv=3, scoring='roc_auc', n_iter=10, verbose=1
        )
    
    model.fit(X_train, y_train, eval_set=(X_validation, y_validation))
    

    Possibly an error on my part. But the error message implies _index_param_value needs to be defined, which appears to be missing in the source code.

    I am able to run a DistRandomizedSearchCV search if I drop the eval_set and other overfitting detection parameters. However, it would be great to be able to utilized CatBoost's native overfitting detection.

    bug 
    opened by Stuart-D-King 1
  • Pyspark testing suite

    Pyspark testing suite

    Description

    This PR adds functionality to test within a spark environment, via TravisCI. It should address the this issue. This enables us to test the various distribution components of skdist.

    Working Travis build: https://travis-ci.com/Ibotta/sk-dist

    TODO:

    • Add tests for encoderizer
    • Looking into 2 java install warnings
    • Think through supporting different Spark versions

    Motivation and Context

    I followed examples in these repos: https://github.com/malexer/pytest-spark https://github.com/fdosani/travis-pytest-spark https://github.com/databricks/spark-sklearn

    Additional notes:

    • added the openjdk8 for jdk (this is what spark 2.4 needs)
    • ensure correct java env variables (JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64)
    • separate install_requires,tests_require, and extras_require (install these in Travis) in setup.py
    • Tests are pretty quick (34s)
    • Install fairly quick (38s)

    How Has This Been Tested?

    • Local Spark environment setup via TravisCI
    • Added initial tests
    • Ensured tests passed in all 3 supported python versions

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [X] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [X] My code follows the code style of this project.
    • [X] My change requires a change to the documentation.
    • [X] I have updated the documentation accordingly.
    • [X] I have read the CONTRIBUTING document.
    • [X] I have added reviewers to the PR.
    • [X] I have added tests to cover my changes.
    • [X] All new and existing tests passed.
    enhancement help wanted 
    opened by cfrazier91 1
  • Spark local-mode testing for Travis CI

    Spark local-mode testing for Travis CI

    Is your feature request related to a problem? Please describe. The current test suite is meant to run only in a python environment after setup/installation. This doesn't allow testing Spark functionality in a spark environment.

    Describe the solution you'd like Ideally, we'd have either a separate test suite, or the entire test suite would be meant to run only in a Spark environment, setup locally by Travis CI.

    Additional context Most functionality can be tested without Spark, but the Spark parallelization itself cannot be tested, and Spark version compatibility cannot be tested. This would make the testing suite more robust to both sk-dist code changes and Spark/PySpark version changes/updates.

    enhancement 
    opened by denver1117 1
  • Any chance to use also the Pandas_UDF interface for even faster speeds?

    Any chance to use also the Pandas_UDF interface for even faster speeds?

    Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

    Not a problem per se. But Spark UDFs are slower than pyspark Pandas_UDFs. And both are slower than Scala UDFs Pandas_udfs however are in python and use the pandas interface internally so they are easier to code.

    Describe the solution you'd like A clear and concise description of what you want to happen.

    Any chance that you could add functionality so things can be achieved via the Pandas_UDF interface

    Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

    Additional context Add any other context or screenshots about the feature request here.

    This is how PANDAS_UDFs work internally.

    image

    For more info: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

    enhancement 
    opened by mamonu 0
  • Support for sklearn 0.24

    Support for sklearn 0.24

    Is your feature request related to a problem? Please describe. sklearn updated to 0.24 and skdist is not compatible

    Describe the solution you'd like Full support for sklearn 0.24

    Additional context This only impacts the ensemble module. Maybe there is a workaround?

    enhancement 
    opened by denver1117 0
  • sk-dist Conda package

    sk-dist Conda package

    Hi, I am sorry for not contacting you sooner, but I needed quite urgently to import sk-dist as a Conda package, so I created Conda-forge feedstock for your package. As this package does not have any special dependencies, this was a rather straightforward process. This means that sk-dist is now available as a conda package through conda-forge. Right now I am the maintainer of this feedstock, but I will be happy to forward it to you or add you as a maintainer.

    opened by sevo 1
  • Multi-Metric Evaluation with DistGridSearchCV results in NameError

    Multi-Metric Evaluation with DistGridSearchCV results in NameError

    Describe the bug Using multi-metric scoring in DistGridSearchCV results in an NameError: File "/home//.local/lib/python3.6/site-packages/skdist/distribute/search.py", line 315, in fit not isinstance(self.refit, six.string_types) or NameError: name 'six' is not defined

    To Reproduce Steps to reproduce the behavior: Create a DistGridSearchCV:

    GS_EVALUATION_METRICS_DICT = { 'accuracy' : 'accuracy', 'roc_auc' : 'roc_auc' }

    model = GaussianNB() model_param_grid: {'var_smoothing': [1e-08, 0.0001, 0.01]}

    grid_search = DistGridSearchCV(estimator=model, 
                               param_grid=model_param_grid,
                               sc=sc, 
                               scoring=GS_EVALUATION_METRICS_DICT, 
                               n_jobs=6, 
                               pre_dispatch=6,
                               cv=3, 
                               refit='roc_auc',
                               verbose=1,
                               error_score=0,
                               return_train_score=True,
                               )
    

    Expected behavior A clear and concise description of what you expected to happen. No NameError

    Additional context I think the error is easily fixable --> Add the import of the six library

    bug 
    opened by davcem 1
  • Pyarrow version 0.16.0 is not working with test suite

    Pyarrow version 0.16.0 is not working with test suite

    Describe the bug The test suite cannot import pyarrow (0.16.0) in python 3.5 or python 3.7. There do not seem to be any errors in the installation. Failing travis build: https://travis-ci.org/Ibotta/sk-dist/jobs/648507535?utm_medium=notification&utm_source=github_status Addressed by pinning pyarrow in this PR: https://github.com/Ibotta/sk-dist/pull/36

    To Reproduce Steps to reproduce the behavior:

    1. Unpin pyarrow in setup.py
    2. Run the travis build

    Expected behavior Ideally we'd be able to unpin pyarrow in the test suite and regularly test on the most recent version (0.16.0)

    Additional context This was revealed when pyarrow released 0.16.0 on 2/7/2020

    bug 
    opened by denver1117 1
  • Warnings in Pyspark Tests

    Warnings in Pyspark Tests

    Describe the bug There are many warnings in the testing suite for the pyspark tests.

    To Reproduce These can be seen directly in the travis logs: https://travis-ci.org/Ibotta/sk-dist

    Expected behavior Ideally, we have no warnings in the test suite.

    Additional context Some warnings are due to pyspark code under the hood which we do not have control over.

    bug 
    opened by denver1117 0
Owner
Ibotta
Ibotta
Auto updating website that tracks closed & open issues/PRs on scikit-learn/scikit-learn.

Repository Status for Scikit-learn Live webpage Auto updating website that tracks closed & open issues/PRs on scikit-learn/scikit-learn. Running local

Thomas J. Fan 6 Dec 27, 2022
Uber Open Source 1.6k Dec 31, 2022
XGBoost-Ray is a distributed backend for XGBoost, built on top of distributed computing framework Ray.

XGBoost-Ray is a distributed backend for XGBoost, built on top of distributed computing framework Ray.

null 92 Dec 14, 2022
A scikit-learn based module for multi-label et. al. classification

scikit-multilearn scikit-multilearn is a Python module capable of performing multi-label learning tasks. It is built on-top of various scientific Pyth

null 802 Jan 1, 2023
Highly interpretable classifiers for scikit learn, producing easily understood decision rules instead of black box models

Highly interpretable, sklearn-compatible classifier based on decision rules This is a scikit-learn compatible wrapper for the Bayesian Rule List class

Tamas Madl 482 Nov 19, 2022
Automated Machine Learning with scikit-learn

auto-sklearn auto-sklearn is an automated machine learning toolkit and a drop-in replacement for a scikit-learn estimator. Find the documentation here

AutoML-Freiburg-Hannover 6.7k Jan 7, 2023
Relevance Vector Machine implementation using the scikit-learn API.

scikit-rvm scikit-rvm is a Python module implementing the Relevance Vector Machine (RVM) machine learning technique using the scikit-learn API. Quicks

James Ritchie 204 Nov 18, 2022
Iris species predictor app is used to classify iris species created using python's scikit-learn, fastapi, numpy and joblib packages.

Iris Species Predictor Iris species predictor app is used to classify iris species using their sepal length, sepal width, petal length and petal width

Siva Prakash 5 Apr 5, 2022
A collection of Scikit-Learn compatible time series transformers and tools.

tsfeast A collection of Scikit-Learn compatible time series transformers and tools. Installation Create a virtual environment and install: From PyPi p

Chris Santiago 0 Mar 30, 2022
Penguins species predictor app is used to classify penguins species created using python's scikit-learn, fastapi, numpy and joblib packages.

Penguins Classification App Penguins species predictor app is used to classify penguins species using their island, sex, bill length (mm), bill depth

Siva Prakash 3 Apr 5, 2022
Scikit learn library models to account for data and concept drift.

liquid_scikit_learn Scikit learn library models to account for data and concept drift. This python library focuses on solving data drift and concept d

null 7 Nov 18, 2021
Interactive Web App with Streamlit and Scikit-learn that applies different Classification algorithms to popular datasets

Interactive Web App with Streamlit and Scikit-learn that applies different Classification algorithms to popular datasets Datasets Used: Iris dataset,

Samrat Mitra 2 Nov 18, 2021
K-Means clusternig example with Python and Scikit-learn

Unsupervised-Machine-Learning Flat Clustering K-Means clusternig example with Python and Scikit-learn Flat clustering Clustering algorithms group a se

Emin 1 Dec 13, 2021
Scikit-Learn useful pre-defined Pipelines Hub

Scikit-Pipes Scikit-Learn useful pre-defined Pipelines Hub Usage: Install scikit-pipes It's advised to install sklearn-genetic using a virtual env, in

Rodrigo Arenas 1 Apr 26, 2022
Predicting Baseball Metric Clusters: Clustering Application in Python Using scikit-learn

Clustering Clustering Application in Python Using scikit-learn This repository contains the prediction of baseball metric clusters using MLB Statcast

Tom Weichle 2 Apr 18, 2022
To design and implement the Identification of Iris Flower species using machine learning using Python and the tool Scikit-Learn.

To design and implement the Identification of Iris Flower species using machine learning using Python and the tool Scikit-Learn.

Astitva Veer Garg 1 Jan 11, 2022
Painless Machine Learning for python based on scikit-learn

PlainML Painless Machine Learning Library for python based on scikit-learn. Install pip install plainml Example from plainml import KnnModel, load_ir

null 1 Aug 6, 2022
icepickle is to allow a safe way to serialize and deserialize linear scikit-learn models

icepickle It's a cooler way to store simple linear models. The goal of icepickle is to allow a safe way to serialize and deserialize linear scikit-lea

vincent d warmerdam 24 Dec 9, 2022
Estudos e projetos feitos com PySpark.

PySpark (Spark com Python) PySpark é uma biblioteca Spark escrita em Python, e seu objetivo é permitir a análise interativa dos dados em um ambiente d

Karinne Cristina 54 Nov 6, 2022