We would like to modify the SPEC in order to provide the following additional features:
- Schema Discovery - Allow a Tap to indicate what the Schema will be without actually streaming data
- Stream and Field Selection - Allow the user to select a subset of the available streams and fields
- Stream Renaming - Allow the user to provide names for stream in order to resolve name collisions
We'll refer to these two features together as Schema Editing.
Motivation
-
Stream and Field Selection
Suppose we had a generic Tap that pulled data from a Postgres database. It could potentially produce one stream for every table in every schema, including every field in each table. But it's likely that users would want to only include a subset of the tables and a subset of the fields within each selected table. A Postgres Tap would not have a statically defined schema. It would need to connect to the database in order to find the available schemas, tables, and fields. So we need some way to allow the tap to query the database for the available schemas, tables, and fields, then allow the user to select which ones they want, then run another job taking the user's selections as input.
-
Stream Renaming
Currently the RECORD and SCHEMA messages use a "stream" field to identify a stream. A data source that contains a hierarchy of namespaces, such as Postgres, it's not clear how a Tap should derive the name of the stream. For example, suppose we have a Postgres database called "prod", with a schema called "public", with a table called "users". If we were to call the stream "users", that might conflict with a table named "users" in some other schema. In order to disambiguate, we may want to allow the user to provide a mapping from the data source's notion of a stream or table to the Tap's stream name.
Note that Schema Discovery is required by both the Stream and Field Selection and Stream Renaming features.
Proposed Solution
Extend the specification as follows.
Add Discover Mode
Taps should allow an optional --discover
command line flag. If --discover
is provided, the Tap should print out a SCHEMA message for every stream that is available to it
It is expected that the user will edit the discovered schemas through some interface in order to delete schemas for streams they don't want, or delete specific fields they don't want. Then they can pass the resulting pruned schemas back in via a --schemas
option...
Add Schema Selection
A Tap should allow an optional --schemas SCHEMAS
argument that points to a file containing the list of schemas describing the desired output. It is expected that the schemas provided will be a pruned version of the schemas produced by a previous run of the same Tap in discover mode. The Tap should attempt to produce output that conforms to the schemas provided with the --schemas
option. If no --schemas
option is provided, the Tap should fetch all fields of all streams available.
Add "source" to SCHEMA message
Extend the SCHEMA message to add a "source" field, the structure of which is determined entirely by the Tap. The "source" field identifies the source of the stream. For a Tap that pulls from a database source, this could be something like
"source": {
"schema": "public",
"table": "users"
}
For a Tap that pulls from an API, it could be
"source": {
"endpoint": "users"
}
If the user wants to rename a Stream, they can provide a --schemas
argument that provides a new value for the "stream" field for the same source.
Example
Suppose we have a Postgres Tap, with a configuration that points to a database that has the following schema / table / field structure:
- public
- users
- orders
- id
- user_id
- amount
- credit_card_number
Suppose the Postgres tap normally names the stream "_", so the stream names would be "public_users" and "public_orders".
So if we ran the Tap in discover mode:
$ tap_postgres --config config.json --discover > schemas.json
we would get the following output
{
"type": "SCHEMA",
"stream": "public_users",
"source": {"schema": "public", "table": "users"},
"key_properties": ["id"],
"schema": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"first_name": {"type": "string"},
"last_name": {"type": "string"},
}
}
}
{
"type": "SCHEMA",
"stream": "public_orders",
"source": {"schema": "public", "table": "orders"},
"key_properties": ["id"],
"schema": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"user_id": {"type": "integer"},
"amount": {"type": "number"},
"credit_card_number": {"type": "string"}
}
}
}
Now let's assume the user wants to make the following changes to the schema:
- Remove the public_ prefix from the stream names
- Get rid of the users table
- Get rid of the credit card field
The user could make those changes by deleting the schema message for the users table, deleting the schema property for the "credit_card_number" field, and changing the stream name for the orders table:
{
"type": "SCHEMA",
"stream": "orders",
"soruce": {"schema": "public",
"table": "orders"},
"key_properties": ["id"],
"schema": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"user_id": {"type": "integer"},
"amount": {"type": "number"},
}
}
}
So now the user would run the Tap again, specifying the edited schema file as input:
$ tap_postgres --config config.json --schemas schemas_edited.json
Concerns
-
Can we come up with a better name for "discover mode"?
-
How do we keep this from overly complicating taps that don't need schema selection?
The schema editing adds a lot of complexity. For Taps that can provide very large sets of streams and fields, this is necessary. But what about a Tap with a small static schema, that doesn't need to support schema selection? In particular:
- Can a Tap choose not to support schema selection? If so,
- What should a Tap that doesn't support schema selection do if I call it with the
--discover
flag? Print out the schema and exit 0? Exit non-0?
- What should a Tap that doesn't support schema selection do if I call it with a
--schemas SCHEMAS
option? Ignore it, or fail?
- What if a Tap that does support schema selection is invoked with a
--schemas SCHEMAS
option where the schemas provided do not match the schema that's available to it?
Given the complexity introduced by these changes, I'm inclined to say that we should make Stream and Field Selection and Stream Renaming optional parts of the spec, and say that a Tap that does not want to support these features should fail hard if they are invoked with a --discover
or --schemas
option.
opened by mdelaurentis 7
I can only think of two real options for specifying which fields should be used as the primary key:
-
Put a "key": true property on each of the key fields in the SCHEMA message. This is what we have right now, in that the target looks for that property and will set the key fields based on that.
-
Put a top-level "key_names" property in the message. For example:
{"type": "SCHEMA",
"stream": "users",
"schema": { "..." },
"key_names": ["customer_id", "email"]}
The advantage of the first option is that we avoid cluttering up the top level of the message with additional properties, which is I think desirable. The advantage of the second option is that it would make the choice of primary key columns more explicit and obvious. If we went with option 1, it would be easy for a tap author to simply forget to mark some fields as keys. We have already forgotten to do that with the last four taps we've done. With option 2, we could require a "key_names" field, even if it points to an empty list.
I would vote for option 2, because I would prefer to be as explicit as possible about what the key fields are.
question
opened by mdelaurentis 7
Updated Schema, Record, and State links in Developing a Target section of Running and Developing Singer Taps and Targets documentation page.
Closes #70
opened by alexvaldez-edge 3
As a best practice, a Tap or Target should provide a Dry Run mode, where it just verifies that it can connect to its data source or destination using the configuration provided.
Motivation
It would be helpful if a Tap could give the user quick feedback as to whether it can connect to the data source using the configuration provided. Currently if you run a tap with invalid configuration, it will exit with a non-zero status rather quickly. But if you run it with valid credentials, it will start streaming data. Users may find it desireable to have a mode of operation where the Tap just makes a quick attempt to connect to the data source and then exits zero or non-zero in order to indicate success or failure.
Proposed Solution
A Tap or Target should support a -n
and --dry-run
option. This option indicates that the Tap or Target should just attempt to connect to the data source or destination with the configuration provided. If it can connect, exit 0. If it can't, exit non-zero with a useful error message.
question wontfix
opened by mdelaurentis 3
I've been adding experimental metrics logging to singer-python and a few taps. I'd like to add a best practice recommendation for logging metrics from Taps. I would really appreciate feedback on this best practice recommendation and the singer-python changes. Below are links to diffs for a few taps that use the new singer-python stats utilities, so you can see how the implementation would work in practice.
- Updated Best Practices guide: https://github.com/singer-io/getting-started/blob/metrics/BEST_PRACTICES.md (look at that rather than the diff)
- singer-python - https://github.com/singer-io/singer-python/pull/19
- tap-shippo - https://github.com/singer-io/tap-shippo/pull/3/files
- tap-facebook - https://github.com/singer-io/tap-facebook/pull/3/files
- tap-closeio - https://github.com/singer-io/tap-closeio/pull/6/files
I'm particularly interested in answering feedback in the following areas:
- What terminology should we use? "stats"? "metrics"?
- Are the field names clear?
- For the singer-python change, is the distinction between a Counter and a Timer clear enough?
opened by mdelaurentis 2
{"type": "RECORD", "stream": "stream": "users", "record": {"id": 2, "name": "Mike"}}
Stream stream
opened by criccomini 2
The table_name
property should be table
, as per in this source code https://github.com/singer-io/singer-python/blob/0c066de21111d8572425083b4a8792d193c80af1/singer/catalog.py#L21
cla-missing
opened by burmecia 1
This is a part of a lot of Singer taps, and if done right will make handling child streams a lot easier. We've wanted to include something like this for awhile, so I'm putting up this PR to start talking about how best to convey this guidance.
opened by dmosorast 1
Here's the error I saw:
Either git or ssh (required by git to clone through SSH) is not installed
in the image. Falling back to CircleCI's native git client but the
behavior may be different from official git. If this is an issue, please
use an image that has official git and ssh installed.
ssh: no key found
The first section appeared on the last successful run as well.
I rebuilt the container with SSH, connected, and got
root@400194e20721:~# git
-bash: git: command not found
So this PR will install git
and then checkout the code
opened by luandy64 1
For someone coming to singer for the first time, a lot of time can be wasted wondering why a stream is skipped when running a tap. It appears only after adding the selected
property to metadata
does a stream get selected to be synced.
Problem
The get started documentation should state that a stream is only processed if selected
is true
opened by Jagjit-Thind 1
Hey there!
I'm developing a tap for BigCommerce here https://github.com/chrisgoddard/tap-bigcommerce
What is the process for getting new taps integrated in Stitch? This is my first one but I have 3-4 other platforms that I want to write taps for and ideally have available as Stitch connectors.
Thanks.
-Chris
opened by chrisgoddard 1
Hello,
We have a use case where we want to run our tap-target infinitely and not stop after 1 run
meaning,
Tap runs at a time and take some data, pass it to target to the destination,
Now we dont want to stop here
Again tap should be executed so that it can take new data.
Is this somehow possible and how we can achieve it?
opened by shubhransh-locale 0
i have an feature called country and it is containing and list of country values how to insert these using singer
{"country":["india","usa","japan","china"]}
this was for mat of data i wanted to send them to postgres table under column named country
please help about how to do these what was the schema and all
opened by santhoshvempali 0
Hi Community,
We are using the SFMC integration to sync data from SFMC to BigQuery. There are intermittent errors happening quite often. Sometimes it last for over a day, sometimes last for a couple of hours. And they can recover at sometime later on w/o any work from our side. Could you help us to understand
- the root cause of such error
- does the integration need to end up omitting some data in order to recover such error ? (or what's changed for the sync to move on w/o errors)
Thanks!
One error log example attached below
`
2021-05-27 11:55:57,858Z target - INFO replicated 6805 records from "data_extension.PROD_Businesses" endpoint
2021-05-27 11:55:58,008Z tap - INFO Getting more results from 'DataExtensionObject' endpoint
2021-05-27 11:55:58,555Z tap - ERROR <suds.sax.document.Document object at 0x7f6e142ad470>
2021-05-27 11:55:58,555Z tap - ERROR :2:62: syntax error
2021-05-27 11:55:58,555Z tap - Traceback (most recent call last):
2021-05-27 11:55:58,555Z tap - File "/root/.pyenv/versions/3.5.2/lib/python3.5/xml/sax/expatreader.py", line 210, in feed
2021-05-27 11:55:58,555Z tap - self._parser.Parse(data, isFinal)
2021-05-27 11:55:58,555Z tap - xml.parsers.expat.ExpatError: syntax error: line 2, column 62
2021-05-27 11:55:58,555Z tap -
2021-05-27 11:55:58,555Z tap - During handling of the above exception, another exception occurred:
2021-05-27 11:55:58,555Z tap -
2021-05-27 11:55:58,556Z tap - Traceback (most recent call last):
2021-05-27 11:55:58,556Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/tap_exacttarget/init.py", line 136, in do_sync
2021-05-27 11:55:58,556Z tap - stream_accessor.sync()
2021-05-27 11:55:58,556Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/tap_exacttarget/dao.py", line 74, in sync
2021-05-27 11:55:58,556Z tap - return self.sync_data()
2021-05-27 11:55:58,556Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/tap_exacttarget/endpoints/data_extensions.py", line 283, in sync_data
2021-05-27 11:55:58,556Z tap - replication_key=replication_key)
2021-05-27 11:55:58,556Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/tap_exacttarget/endpoints/data_extensions.py", line 209, in _replicate
2021-05-27 11:55:58,556Z tap - for row in result:
2021-05-27 11:55:58,556Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/tap_exacttarget/client.py", line 153, in request_from_cursor
2021-05-27 11:55:58,556Z tap - response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
2021-05-27 11:55:58,556Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/tap_exacttarget/fuel_overrides.py", line 32, in tap_exacttarget__getMoreResults
2021-05-27 11:55:58,556Z tap - obj = TapExacttarget__ET_Continue(cursor.auth_stub, cursor.last_request_id, batch_size)
2021-05-27 11:55:58,556Z target - INFO Serializing batch with 1518 messages for table data_extension.PROD_Businesses
2021-05-27 11:55:58,557Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/tap_exacttarget/fuel_overrides.py", line 26, in init
2021-05-27 11:55:58,557Z tap - response = auth_stub.soap_client.service.Retrieve(ws_continueRequest)
2021-05-27 11:55:58,557Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/suds/client.py", line 521, in call
2021-05-27 11:55:58,557Z tap - return client.invoke(args, kwargs)
2021-05-27 11:55:58,557Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/suds/client.py", line 581, in invoke
2021-05-27 11:55:58,557Z tap - result = self.send(soapenv)
2021-05-27 11:55:58,557Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/suds/client.py", line 621, in send
2021-05-27 11:55:58,557Z tap - original_soapenv=original_soapenv)
2021-05-27 11:55:58,557Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/suds/client.py", line 661, in process_reply
2021-05-27 11:55:58,557Z tap - replyroot = _parse(reply)
2021-05-27 11:55:58,557Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/suds/client.py", line 832, in _parse
2021-05-27 11:55:58,557Z tap - return Parser().parse(string=string)
2021-05-27 11:55:58,558Z tap - File "/code/orchestrator/tap-env/lib/python3.5/site-packages/suds/sax/parser.py", line 133, in parse
2021-05-27 11:55:58,558Z tap - sax.parse(source)
2021-05-27 11:55:58,558Z tap - File "/root/.pyenv/versions/3.5.2/lib/python3.5/xml/sax/expatreader.py", line 110, in parse
2021-05-27 11:55:58,558Z tap - xmlreader.IncrementalParser.parse(self, source)
2021-05-27 11:55:58,558Z tap - File "/root/.pyenv/versions/3.5.2/lib/python3.5/xml/sax/xmlreader.py", line 125, in parse
2021-05-27 11:55:58,558Z tap - self.feed(buffer)
2021-05-27 11:55:58,558Z tap - File "/root/.pyenv/versions/3.5.2/lib/python3.5/xml/sax/expatreader.py", line 214, in feed
2021-05-27 11:55:58,558Z tap - self._err_handler.fatalError(exc)
2021-05-27 11:55:58,558Z tap - File "/root/.pyenv/versions/3.5.2/lib/python3.5/xml/sax/handler.py", line 38, in fatalError
2021-05-27 11:55:58,558Z tap - raise exception
2021-05-27 11:55:58,558Z tap - xml.sax._exceptions.SAXParseException: :2:62: syntax error
2021-05-27 11:55:58,558Z tap - ERROR Failed to sync endpoint, moving on!
`
opened by FionaYiZhao 0
Hi,
We are planning to integrate Singer in our application, but right now we are seeing that Singer config can have only a single account added in the configuration.
Is it possible to have multiple accounts added in the configuration. Say we have 100 B2B companies that are using Stripe and we would like to fetch their data through Singer. Is this something can be done through Singer?
Thanks.
opened by azhard4int 0
8
Dec 6, 2022
31
Nov 17, 2022
18
Feb 3, 2022
0
Dec 3, 2022
1
Oct 10, 2022
206
Jan 2, 2023
38
Dec 31, 2022
6
Mar 5, 2022
1
Aug 8, 2021
12
Jan 1, 2023
1
Nov 26, 2021
1
Jun 26, 2022
2
Jan 10, 2022
440
Dec 31, 2022
164
Dec 31, 2022
9
Nov 9, 2022
0
Oct 14, 2021
1.5k
Jan 4, 2023
1
Oct 17, 2021