Apache Spark - A unified analytics engine for large-scale data processing

Overview

Apache Spark

Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.

https://spark.apache.org/

Jenkins Build AppVeyor Build PySpark Coverage

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

./build/mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.)

More detailed documentation is available from the project site, at "Building Spark".

For general development tips, including info on developing Spark using an IDE, see "Useful Developer Tools".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1,000,000,000:

scala> spark.range(1000 * 1000 * 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1,000,000,000:

>>> spark.range(1000 * 1000 * 1000).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run tests for a module, or individual tests.

There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version and Enabling YARN" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions.

Configuration

Please refer to the Configuration Guide in the online documentation for an overview on how to configure Spark.

Contributing

Please review the Contribution to Spark guide for information on how to get started contributing to the project.

Comments
  • [SPARK-7017][Build][Project Infra]: Refactor dev/run-tests into Python

    [SPARK-7017][Build][Project Infra]: Refactor dev/run-tests into Python

    All, this is a first attempt at refactoring dev/run-tests into Python. Initially I merely converted all Bash calls over to Python, then moved to a much more modular approach (more functions, moved the calls around, etc.). What is here is the initial culmination and should provide a great base to various downstream issues (e.g. SPARK-7016, modularize / parallelize testing, etc.). Would love comments / suggestions for this initial first step!

    /cc @srowen @pwendell @nchammas

    opened by brennonyork 319
  • SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and refactor the DAGScheduler with Akka

    SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and refactor the DAGScheduler with Akka

    https://spark-project.atlassian.net/browse/SPARK-1235

    In the current implementation, the running job will hang if the DAGScheduler crashes for some reason (eventProcessActor throws exception in receive() )

    The reason is that the actor will automatically restart when the exception is thrown during the running but is not captured properly (Akka behaviour), and the JobWaiters are still waiting there for the completion of the tasks

    In this patch, I refactored the DAGScheduler with Akka and manage the eventProcessActor with supervisor, so that upon the failure of a eventProcessActor, the supervisor will terminate the EventProcessActor and close the SparkContext

    thanks for @kayousterhout and @markhamstra to give the hints in JIRA

    opened by CodingCat 311
  • [SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sources API

    [SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sources API

    This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path. Existing data sources like JSON and Parquet can be simplified with this work.

    New features provided

    1. Hive compatible partition discovery

      This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.

    2. Generalized partition pruning optimization

      Now partition pruning is handled during physical planning phase. Specific data sources don't need to worry about this harness anymore.

      (This also implies that we can remove CatalystScan after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)

    3. Insertion with dynamic partitions

      When inserting data to a FSBasedRelation, data can be partitioned dynamically by specified partition columns.

    New structures provided

    Developer API

    1. FSBasedRelation

      Base abstract class for file system based data sources.

    2. OutputWriter

      Base abstract class for output row writers, responsible for writing a single row object.

    3. FSBasedRelationProvider

      A new relation provider for FSBasedRelation subclasses. Note that data sources extending FSBasedRelation don't need to extend RelationProvider and SchemaRelationProvider.

    User API

    New overloaded versions of

    1. DataFrame.save()
    2. DataFrame.saveAsTable()
    3. SQLContext.load()

    are provided to allow users to save/load DataFrames with user defined dynamic partition columns.

    Spark SQL query planning

    1. InsertIntoFSBasedRelation

      Used to implement write path for FSBasedRelations.

    2. New rules for FSBasedRelation in DataSourceStrategy

      These are added to hook FSBasedRelation into physical query plan in read path, and perform partition pruning.

    TODO

    • [ ] Use scratch directories when overwriting a table with data selected from itself.

      Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.

    • [ ] When inserting with dynamic partition columns, use external sorter to group the data first.

      This ensures that we only need to open a single OutputWriter at a time. For data sources like Parquet, OutputWriters can be quite memory consuming. One issue is that, this approach breaks the row distribution in the original DataFrame. However, we did't promise to preserve data distribution when writing a DataFrame.

    • [x] More tests. Specifically, test cases for

      • [x] Self-join
      • [x] Loading partitioned relations with a subset of partition columns stored in data files.
      • [x] SQLContext.load() with user defined dynamic partition columns.

    Parquet data source migration

    Parquet data source migration is covered in PR https://github.com/liancheng/spark/pull/6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.

    opened by liancheng 283
  • [MLLIB] [spark-2352] Implementation of an Artificial Neural Network (ANN)

    [MLLIB] [spark-2352] Implementation of an Artificial Neural Network (ANN)

    The code contains a multi-layer ANN implementation, with variable number of inputs, outputs and hidden nodes. It takes as input an RDD vector pairs, corresponding to the training set with inputs and outputs.

    Next to two automated tests, an example program is also included, which also contains a graphical representation.

    opened by bgreeven 272
  • [WIP][SPARK-20628][CORE][K8S] Keep track of nodes (/ spot instances) which are going to be shutdown

    [WIP][SPARK-20628][CORE][K8S] Keep track of nodes (/ spot instances) which are going to be shutdown

    What changes were proposed in this pull request?

    Design document: https://docs.google.com/document/d/1bC2sxHoF3XbAvUHQebpylAktH6B3PSTVAGIOCYj0Mbg/edit?usp=sharing

    Keep track of nodes which are going to be shutdown to prevent scheduling tasks. The PR is designed with spot instances in mind, where there is some notice (depending on the cloud vendor) that the node will be shut down.

    Since Kubernetes has a first class notion of pod shut down and grace periods the decommissioning support is available on Kubernetes. For other deployments it is left to the instance to notify the worker(s) of decommissioning with SIGPWR.

    SPARK-20628 is a sub-task of SPARK-20624 with follow up tasks to perform migration of data and re-launching of tasks. SPARK-20628 is distinct from other mechanism where Spark its self has control of executor decommissioning, however the later follow up tasks in SPARK-20624 should be usable across voluntary and involuntary termination (e.g. https://github.com/apache/spark/pull/19041 could provide a good mechanism for doing data copy during involuntary termination).

    How was this patch tested?

    Extension of AppClientSuite to cover decommissioning and addition of explicit worker decom suite.

    Areas of future work:

    • Follow up with relevant companion notification scripts in the relevant spark-on-[X] for X in {cloud providers} projects.
    • Integrate Yarn support ( see https://github.com/apache/spark/pull/19267 for how to get / handle Yarn state, but depends on YARN-6483 which is merged but not back ported).
    SPARK CORE 
    opened by holdenk 234
  • [SPARK-33212][BUILD] Move to shaded clients for Hadoop 3.x profile

    [SPARK-33212][BUILD] Move to shaded clients for Hadoop 3.x profile

    What changes were proposed in this pull request?

    This switches Spark to use shaded Hadoop clients, namely hadoop-client-api and hadoop-client-runtime, for Hadoop 3.x. For Hadoop 2.7, we'll still use the same modules such as hadoop-client.

    In order to still keep default Hadoop profile to be hadoop-3.2, this defines the following Maven properties:

    hadoop-client-api.artifact
    hadoop-client-runtime.artifact
    hadoop-client-minicluster.artifact
    

    which default to:

    hadoop-client-api
    hadoop-client-runtime
    hadoop-client-minicluster
    

    but all switch to hadoop-client when the Hadoop profile is hadoop-2.7. A side affect from this is we'll import the same dependency multiple times. For this I have to disable Maven enforcer banDuplicatePomDependencyVersions.

    Besides above, there are the following changes:

    • explicitly add a few dependencies which are imported via transitive dependencies from Hadoop jars, but are removed from the shaded client jars.
    • removed the use of ProxyUriUtils.getPath from ApplicationMaster which is a server-side/private API.
    • modified IsolatedClientLoader to exclude hadoop-auth jars when Hadoop version is 3.x. This change should only matter when we're not sharing Hadoop classes with Spark (which is mostly used in tests).

    Why are the changes needed?

    This serves two purposes:

    • to unblock Spark from upgrading to Hadoop 3.2.2/3.3.0+. Latest Hadoop versions have upgraded to use Guava 27+ and in order to adopt the latest Hadoop versions in Spark, we'll need to resolve the Guava conflicts. This takes the approach by switching to shaded client jars provided by Hadoop.
    • avoid pulling 3rd party dependencies from Hadoop and avoid potential future conflicts.

    Does this PR introduce any user-facing change?

    When people use Spark with hadoop-provided option, they should make sure class path contains hadoop-client-api and hadoop-client-runtime jars. In addition, they may need to make sure these jars appear before other Hadoop jars in the order. Otherwise, classes may be loaded from the other non-shaded Hadoop jars and cause potential conflicts.

    How was this patch tested?

    Relying on existing tests.

    SQL STRUCTURED STREAMING BUILD YARN DOCS CORE INFRA DSTREAM 
    opened by sunchao 230
  • [SPARK-1776] Have Spark's SBT build read dependencies from Maven.

    [SPARK-1776] Have Spark's SBT build read dependencies from Maven.

    Patch introduces the new way of working also retaining the existing ways of doing things.

    For example build instruction for yarn in maven is mvn -Pyarn -PHadoop2.2 clean package -DskipTests in sbt it can become MAVEN_PROFILES="yarn, hadoop-2.2" sbt/sbt clean assembly Also supports sbt/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 clean assembly

    opened by ScrapCodes 229
  • [SPARK-33705][SQL][TEST] Fix HiveThriftHttpServerSuite flakiness

    [SPARK-33705][SQL][TEST] Fix HiveThriftHttpServerSuite flakiness

    What changes were proposed in this pull request?

    TO FIX flaky tests:

    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/132345/testReport/

    org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.JDBC query execution
    org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.Checks Hive version
    org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.SPARK-24829 Checks cast as float
    

    The root cause here is a jar conflict issue. NewCookie.isHttpOnly is not defined in the jsr311-api.jar which conflicts The transitive artifact jsr311-api.jar of hadoop-client is excluded at the maven side. See https://issues.apache.org/jira/browse/SPARK-27179.

    The Jenkins PR builder and Github Action use SBT as the compiler tool.

    First, the exclusion rule from maven is not followed by sbt, so I was able to see jsr311-api.jar from maven cache to be added to the classpath directly. This seems to be a bug of sbt-pom-reader plugin but I'm not that sure.

    Then I added an ExcludeRule for the hive-thriftserver module at the SBT side and did see the jsr311-api.jar gone, but the CI jobs still failed with the same error.

    I added a trace log in ThriftHttpServlet

    ERROR ThriftHttpServlet: !!!!!!!!! Suspect???????? --->
    file:/home/jenkins/workspace/SparkPullRequestBuilder/assembly/target/scala-2.12/jars/jsr311-api-1.1.1.jar
    

    And the log pointed out that the assembly phase copied it to assembly/target/scala-2.12/jars/ which will be added to the classpath too. With the help of SBT dependencyTree tool, I saw the jsr311-api again as a transitive of jersery-core from yarn module with a test scope. So This seems to be another bug from the SBT side of the sbt-assembly plugin. It copied a test scope transitive artifact to the assembly output.

    In this PR, I defined some rules in SparkBuild.scala to bypass the potential bugs from the SBT side.

    First, exclude the jsr311 from all over the project and then add it back separately to the YARN module for SBT.

    Additionally, the HiveThriftServerSuites was reflected for reducing flakiness too, but not related to the bugs I have found so far.

    Why are the changes needed?

    fix test here

    Does this PR introduce any user-facing change?

    NO

    How was this patch tested?

    passing jenkins and ga

    SQL BUILD YARN CORE 
    opened by yaooqinn 206
  • [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

    [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

    What changes were proposed in this pull request?

    Two new options, modifiiedBefore and modifiedAfter, is provided expecting a value in 'YYYY-MM-DDTHH:mm:ss' format. PartioningAwareFileIndex considers these options during the process of checking for files, just before considering applied PathFilters such as pathGlobFilter. In order to filter file results, a new PathFilter class was derived for this purpose. General house-keeping around classes extending PathFilter was performed for neatness. It became apparent support was needed to handle multiple potential path filters. Logic was introduced for this purpose and the associated tests written.

    Why are the changes needed?

    When loading files from a data source, there can often times be thousands of file within a respective file path. In many cases I've seen, we want to start loading from a folder path and ideally be able to begin loading files having modification dates past a certain point. This would mean out of thousands of potential files, only the ones with modification dates greater than the specified timestamp would be considered. This saves a ton of time automatically and reduces significant complexity managing this in code.

    Does this PR introduce any user-facing change?

    This PR introduces an option that can be used with batch-based Spark file data sources. A documentation update was made to reflect an example and usage of the new data source option.

    Example Usages
    Load all CSV files modified after date: spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()

    Load all CSV files modified before date: spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()

    Load all CSV files modified between two dates: spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load()

    How was this patch tested?

    A handful of unit tests were added to support the positive, negative, and edge case code paths. It's also live in a handful of our Databricks dev environments.

    SQL EXAMPLES DOCS PYTHON R 
    opened by cchighman 205
  • [SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort

    [SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort

    This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records.

    The new shuffle manager, UnsafeShuffleManager, can be enabled by setting spark.shuffle.manager=tungsten-sort in SparkConf.

    The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles.

    UnsafeShuffleManager's optimizations will apply when all of the following conditions hold:

    • The shuffle dependency specifies no aggregation or output ordering.
    • The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers).
    • The shuffle produces fewer than 16777216 output partitions.
    • No individual record is larger than 128 MB when serialized.

    In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer.

    At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file.

    UnsafeShuffleManager optimizes this process in several ways:

    • Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details.
    • It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache.
    • The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge.
    • When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's transferTo, to be used and avoids the need to allocate decompression or copying buffers during the merge.

    The shuffle read path is unchanged.

    This patch is similar to SPARK-4550 / #4450 but uses a slightly different implementation. The unsafe-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new unsafe operators (such as the new aggregation operator introduced in #5725).

    Future work

    There are several tasks that build upon this patch, which will be left to future work:

    • SPARK-7271 Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data.
    • Extension / redesign of the Serializer API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how UnsafeRowConverter works in Spark SQL).

    Review on Reviewable

    opened by JoshRosen 204
  • [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

    [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

    What is changed?

    This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

    To allow this change the MapOutputTracker has been extended to allow the location of shuffle files to be updated with updateMapOutput. When a shuffle block is put, a block update message will be sent which triggers the updateMapOutput.

    Instead of rejecting remote puts of shuffle blocks BlockManager delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

    The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

    Why are the changes needed?

    Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

    Does this PR introduce any user-facing change?

    This PR introduces two new configs parameters, spark.storage.decommission.shuffleBlocks.enabled & spark.storage.decommission.rddBlocks.enabled that control which blocks should be migrated during storage decommissioning.

    How was this patch tested?

    New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

    This PR is a cleaned-up version of the previous WIP PR I made https://github.com/apache/spark/pull/28331 (thanks to @attilapiros for his very helpful reviewing on it :)).

    SQL KUBERNETES CORE DSTREAM 
    opened by holdenk 179
  • [SPARK-XXXX][CONNECT][PYTHON] Truncating nanoseconds timestampsl

    [SPARK-XXXX][CONNECT][PYTHON] Truncating nanoseconds timestampsl

    What changes were proposed in this pull request?

    This patch creates compatible behavior for truncating nanosecond timestamps to microseconds since Spark does not support nanosecond resolution.

    Why are the changes needed?

    Compatibility.

    Does this PR introduce any user-facing change?

    No

    How was this patch tested?

    Added UT.

    SQL CORE PYTHON CONNECT 
    opened by grundprinzip 0
  • [SPARK-41708][SQL][FOLLOWUP] WriteFiles should replace exprId using new query

    [SPARK-41708][SQL][FOLLOWUP] WriteFiles should replace exprId using new query

    What changes were proposed in this pull request?

    This is the followup of https://github.com/apache/spark/pull/39277, does three things:

    • replace WriteFiles attribute exprId using new query to avoid potential issue
    • remove unnecessary explain info with WriteFiles
    • cleanup unnecessary Logging

    Why are the changes needed?

    Improve the implementation of WriteFiles

    Does this PR introduce any user-facing change?

    no

    How was this patch tested?

    add test

    SQL 
    opened by ulysses-you 1
  • [SPARK-41950][PS][BUILD] Pin mlflow to 2.0.X

    [SPARK-41950][PS][BUILD] Pin mlflow to 2.0.X

    What changes were proposed in this pull request?

    After MLflow 2.1 release (https://github.com/mlflow/mlflow/releases), the unittest in pandas API on Spark is broken.

    from mlflow.tracking import MlflowClient, set_tracking_uri
    from sklearn.linear_model import LinearRegression
    import mlflow.sklearn
    from tempfile import mkdtemp
    d = mkdtemp("pandas_on_spark_mlflow")
    set_tracking_uri("file:%s"%d)
    client = MlflowClient()
    exp_id = mlflow.create_experiment("my_experiment")
    exp = mlflow.set_experiment("my_experiment")
    train = pd.DataFrame({"x1": np.arange(8), "x2": np.arange(8)**2,
                          "y": np.log(2 + np.arange(8))})
    train_x = train[["x1", "x2"]]
    train_y = train[["y"]]
    with mlflow.start_run():
        lr = LinearRegression()
        lr.fit(train_x, train_y)
        mlflow.sklearn.log_model(lr, "model")
    from pyspark.pandas.mlflow import load_model
    run_info = client.search_runs(exp_id)[-1].info
    model = load_model("runs:/{run_id}/model".format(run_id=run_info.run_id))
    prediction_df = ps.DataFrame({"x1": [2.0], "x2": [4.0]})
    prediction_df["prediction"] = model.predict(prediction_df)
    print(prediction_df)
    

    https://github.com/apache/spark/blob/06ec98b0d6a51e0c3ffec70e78d86d577b0e7a72/python/pyspark/pandas/mlflow.py#L134-L202

    File "/__w/spark/spark/python/pyspark/pandas/mlflow.py", line 172, in pyspark.pandas.mlflow.load_model
    Failed example:
        prediction_df
    Exception raised:
        Traceback (most recent call last):
          File "/usr/lib/python3.9/doctest.py", line 1336, in __run
            exec(compile(example.source, filename, "single",
          File "<doctest pyspark.pandas.mlflow.load_model[18]>", line 1, in <module>
            prediction_df
          File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13322, in __repr__
            pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
          File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13313, in _get_or_create_repr_pandas_cache
            self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
          File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13308, in _to_internal_pandas
            return self._internal.to_pandas_frame
          File "/__w/spark/spark/python/pyspark/pandas/utils.py", line 588, in wrapped_lazy_property
            setattr(self, attr_name, fn(self))
          File "/__w/spark/spark/python/pyspark/pandas/internal.py", line 1056, in to_pandas_frame
            pdf = sdf.toPandas()
          File "/__w/spark/spark/python/pyspark/sql/pandas/conversion.py", line 208, in toPandas
            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
          File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line 1197, in collect
            sock_info = self._jdf.collectToPython()
          File "/__w/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
            return_value = get_return_value(
          File "/__w/spark/spark/python/pyspark/sql/utils.py", line 209, in deco
            raise converted from None
        pyspark.sql.utils.PythonException: 
          An exception was thrown from the Python worker. Please see the stack trace below.
        Traceback (most recent call last):
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 829, in main
            process()
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 821, in process
            serializer.dump_stream(out_iter, outfile)
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 345, in dump_stream
            return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 86, in dump_stream
            for batch in iterator:
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 338, in init_stream_yield_batches
            for series in iterator:
          File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 519, in func
            for result_batch, result_type in result_iter:
          File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1253, in udf
            yield _predict_row_batch(batch_predict_fn, row_batch_args)
          File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1057, in _predict_row_batch
            result = predict_fn(pdf)
          File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1237, in batch_predict_fn
            return loaded_model.predict(pdf)
          File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 413, in predict
            return self._predict_fn(data)
          File "/usr/local/lib/python3.9/dist-packages/sklearn/linear_model/_base.py", line 355, in predict
            return self._decision_function(X)
          File "/usr/local/lib/python3.9/dist-packages/sklearn/linear_model/_base.py", line 338, in _decision_function
            X = self._validate_data(X, accept_sparse=["csr", "csc", "coo"], reset=False)
          File "/usr/local/lib/python3.9/dist-packages/sklearn/base.py", line 518, in _validate_data
            self._check_feature_names(X, reset=reset)
          File "/usr/local/lib/python3.9/dist-packages/sklearn/base.py", line 451, in _check_feature_names
            raise ValueError(message)
        ValueError: The feature names should match those that were passed during fit.
        Feature names unseen at fit time:
        - 0
        - 1
        Feature names seen at fit time, yet now missing:
        - x1
        - x2
    

    https://github.com/apache/spark/actions/runs/3871715040/jobs/6600578830

    Why are the changes needed?

    To recover the broken test.

    Does this PR introduce any user-facing change?

    No, test-only (for now). Maybe a regression but from MLflow.

    How was this patch tested?

    CI in this PR should test it out.

    BUILD 
    opened by HyukjinKwon 2
  • [WIP][SPARK-41948][SQL] Fix NPE for error classes: CANNOT_PARSE_JSON_FIELD

    [WIP][SPARK-41948][SQL] Fix NPE for error classes: CANNOT_PARSE_JSON_FIELD

    What changes were proposed in this pull request?

    The pr aims to fix NPE for error classes: CANNOT_PARSE_JSON_FIELD.

    Why are the changes needed?

    1.When I want to delete redundant 'toString()' in code block as follow image I found the UT("select from_json('[1, "2", 3]', 'array')") failed.

    A.Why can it succeed before deletion? parse.getCurrentName.toString() => null.toString() => throw NPE, but follow logical can cover it, https://github.com/apache/spark/blob/15a0f55246bee7b043bd6081f53744fbf74403eb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala#L569-L573 But obviously this is not our original intention.

    B.After deletion, The IllegalArgumentException will be thrown. parse.getCurrentName => throw java.lang.IllegalArgumentException as follow: Caused by: java.lang.IllegalArgumentException: Cannot resolve variable 'fieldName' (enableSubstitutionInVariables=false). at org.apache.commons.text.StringSubstitutor.substitute(StringSubstitutor.java:1532) at org.apache.commons.text.StringSubstitutor.substitute(StringSubstitutor.java:1389) at org.apache.commons.text.StringSubstitutor.replace(StringSubstitutor.java:893) at org.apache.spark.ErrorClassesJsonReader.getErrorMessage(ErrorClassesJSONReader.scala:51) ... 140 more Above code can't handle IllegalArgumentException, so the UT failed.

    2.So we should consider the case where parse.getCurrentName is null.

    Does this PR introduce any user-facing change?

    No.

    How was this patch tested?

    Pass GA. Existed UT.

    SQL 
    opened by panbingkun 2
  • [SPARK-41947][CORE][DOCS] Update the contents of error class guidelines

    [SPARK-41947][CORE][DOCS] Update the contents of error class guidelines

    What changes were proposed in this pull request?

    This PR proposes to update error class guidelines for core/src/main/resources/error/README.md.

    Why are the changes needed?

    Because some of contents are out of date, and no longer valid for current behavior.

    Does this PR introduce any user-facing change?

    No. It fixed the developer guidelines for error class.

    How was this patch tested?

    The existing CI should pass.

    DOCS CORE 
    opened by itholic 0
  • [SPARK-41879][CONNECT][PYTHON] Make `DataFrame.collect` support nested types

    [SPARK-41879][CONNECT][PYTHON] Make `DataFrame.collect` support nested types

    What changes were proposed in this pull request?

    Make DataFrame.collect support nested types, by introducing a new data converter.

    Note that the duplicated field names are not supported in this PR, since we cannot even read the batches in the client side.

    Why are the changes needed?

    to be consistent with PySpark

    Does this PR introduce any user-facing change?

    yes

    How was this patch tested?

    added UT and enabled doctests

    SQL CORE PYTHON CONNECT 
    opened by zhengruifeng 4
Owner
The Apache Software Foundation
The Apache Software Foundation
TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.

TensorFlowOnSpark TensorFlowOnSpark brings scalable deep learning to Apache Hadoop and Apache Spark clusters. By combining salient features from the T

Yahoo 3.8k Jan 4, 2023
Objax Apache-2Objax (🥉19 · ⭐ 580) - Objax is a machine learning framework that provides an Object.. Apache-2 jax

Objax Tutorials | Install | Documentation | Philosophy This is not an officially supported Google product. Objax is an open source machine learning fr

Google 729 Jan 2, 2023
Unified Interface for Constructing and Managing Workflows on different workflow engines, such as Argo Workflows, Tekton Pipelines, and Apache Airflow.

Couler What is Couler? Couler aims to provide a unified interface for constructing and managing workflows on different workflow engines, such as Argo

Couler Project 781 Jan 3, 2023
This is a Pytorch implementation of the paper: Self-Supervised Graph Transformer on Large-Scale Molecular Data.

This is a Pytorch implementation of the paper: Self-Supervised Graph Transformer on Large-Scale Molecular Data.

null 212 Dec 25, 2022
null 190 Jan 3, 2023
A data annotation pipeline to generate high-quality, large-scale speech datasets with machine pre-labeling and fully manual auditing.

About This repository provides data and code for the paper: Scalable Data Annotation Pipeline for High-Quality Large Speech Datasets Development (subm

Appen Repos 86 Dec 7, 2022
DeepGNN is a framework for training machine learning models on large scale graph data.

DeepGNN Overview DeepGNN is a framework for training machine learning models on large scale graph data. DeepGNN contains all the necessary features in

Microsoft 45 Jan 1, 2023
Open-AI's DALL-E for large scale training in mesh-tensorflow.

DALL-E in Mesh-Tensorflow [WIP] Open-AI's DALL-E in Mesh-Tensorflow. If this is similarly efficient to GPT-Neo, this repo should be able to train mode

EleutherAI 432 Dec 16, 2022
[ICLR 2021, Spotlight] Large Scale Image Completion via Co-Modulated Generative Adversarial Networks

Large Scale Image Completion via Co-Modulated Generative Adversarial Networks, ICLR 2021 (Spotlight) Demo | Paper [NEW!] Time to play with our interac

Shengyu Zhao 373 Jan 2, 2023
The implementation of the CVPR2021 paper "Structure-Aware Face Clustering on a Large-Scale Graph with 10^7 Nodes"

STAR-FC This code is the implementation for the CVPR 2021 paper "Structure-Aware Face Clustering on a Large-Scale Graph with 10^7 Nodes" ?? ?? . ?? Re

Shuai Shen 87 Dec 28, 2022
SLIDE : In Defense of Smart Algorithms over Hardware Acceleration for Large-Scale Deep Learning Systems

The SLIDE package contains the source code for reproducing the main experiments in this paper. Dataset The Datasets can be downloaded in Amazon-

Intel Labs 72 Dec 16, 2022
This repo contains the official code of our work SAM-SLR which won the CVPR 2021 Challenge on Large Scale Signer Independent Isolated Sign Language Recognition.

Skeleton Aware Multi-modal Sign Language Recognition By Songyao Jiang, Bin Sun, Lichen Wang, Yue Bai, Kunpeng Li and Yun Fu. Smile Lab @ Northeastern

Isen (Songyao Jiang) 128 Dec 8, 2022
Official implementation of "Towards Good Practices for Efficiently Annotating Large-Scale Image Classification Datasets" (CVPR2021)

Towards Good Practices for Efficiently Annotating Large-Scale Image Classification Datasets This is the official implementation of "Towards Good Pract

Sanja Fidler's Lab 52 Nov 22, 2022
Official Implementation and Dataset of "PPR10K: A Large-Scale Portrait Photo Retouching Dataset with Human-Region Mask and Group-Level Consistency", CVPR 2021

Portrait Photo Retouching with PPR10K Paper | Supplementary Material PPR10K: A Large-Scale Portrait Photo Retouching Dataset with Human-Region Mask an

null 184 Dec 11, 2022
An Efficient Training Approach for Very Large Scale Face Recognition or F²C for simplicity.

Fast Face Classification (F²C) This is the code of our paper An Efficient Training Approach for Very Large Scale Face Recognition or F²C for simplicit

null 33 Jun 27, 2021
DeepLM: Large-scale Nonlinear Least Squares on Deep Learning Frameworks using Stochastic Domain Decomposition (CVPR 2021)

DeepLM DeepLM: Large-scale Nonlinear Least Squares on Deep Learning Frameworks using Stochastic Domain Decomposition (CVPR 2021) Run Please install th

Jingwei Huang 130 Dec 2, 2022
A large-scale video dataset for the training and evaluation of 3D human pose estimation models

ASPset-510 ASPset-510 (Australian Sports Pose Dataset) is a large-scale video dataset for the training and evaluation of 3D human pose estimation mode

Aiden Nibali 36 Oct 30, 2022
A large-scale video dataset for the training and evaluation of 3D human pose estimation models

ASPset-510 (Australian Sports Pose Dataset) is a large-scale video dataset for the training and evaluation of 3D human pose estimation models. It contains 17 different amateur subjects performing 30 sports-related actions each, for a total of 510 action clips.

Aiden Nibali 25 Jun 20, 2021
Baseline model for "GraspNet-1Billion: A Large-Scale Benchmark for General Object Grasping" (CVPR 2020)

GraspNet Baseline Baseline model for "GraspNet-1Billion: A Large-Scale Benchmark for General Object Grasping" (CVPR 2020). [paper] [dataset] [API] [do

GraspNet 209 Dec 29, 2022