Hi,
I integrated the flask example of upload-test.py to airflow webserver UI as a uploading plugin, it allows user to upload a csv file within airflow webserver UI and save the file to server directory(''/usr/local/airflow/uploads/'), however the parser fails to get registered with any header information and the chunked data won't be written to file through the parser.
- I have verified that the upload-test.py worked well on local flask host with @app.route.
- I have verified the airflow plugin interface below functioned well when I using
request.files
and .save(path_to_save)
.
Here is the flask app(@expose) under airflow plugin:
class PipelineLauncher(AppBuilderBaseView): # from flask_appbuilder import BaseView as AppBuilderBaseView
@expose('/', methods=('GET', 'POST'))
def list(self):
if request.method == 'POST':
path_to_save = '/usr/local/airflow/uploads/temp.csv' #path mounted with airflow
file_ = FileTarget(path_to_save)
parser = StreamingFormDataParser(headers=request.headers)
parser.register('file', file_)
while True:
chunk = request.stream.read(8192)
if not chunk:
break
parser.data_received(chunk)
#df = pd.read_csv(path_to_save) this will throw error 'pandas.errors.EmptyDataError: No columns to parse from file'
#rows = df.shape[0]
return self.render_template("debug.html",
path_to_save=path_to_save,
file_object=file_,
header=request.headers,
filename=file_.multipart_filename,
content_type=file_.multipart_content_type)
return self.render_template("index.html")
# debug.html
# path_to_save: {{ path_to_save }}
# file_object: {{ file_object }}
# header: {{ header }}
# filename: {{ filename }}
# content_type: {{ content_type }}
bp = Blueprint(
"pipeline", __name__,
template_folder='templates',
static_folder='static',
static_url_path='/static/pipeline_launcher')
class AirflowCustomLauncher(AirflowPlugin):
name = "pipeline"
pipeline_launcher = PipelineLauncher()
pipeline_launcher_package = {
"name": "Manual Upload Plugin",
"category": "Launch Pipeline",
"view": pipeline_launcher
}
appbuilder_views = [pipeline_launcher_package]
admin_views = [pipeline_launcher_package]
flask_blueprints = [bp]
index.html
{% include "airflow/master.html" %}
{% block body %}
<title>Upload XLS/XLSX/CSV files to InfluxDB.</title>
<form method="post" class="admin-form form-horizontal" enctype="multipart/form-data" role='form'>
<div class="col-md-12 text-center">
<h3>Manual Upload for InfluxDB</h3>
<br/>
<p> This plugin currently only supports .csv, .xls, and .xlsx files. Larger files and .xlsx files will take longer than usual to process. Upon submitting a file, you will be taken to a page to preview your file as well as configure upload parameters. </p>
</div>
{% if csrf_token %}
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
{% endif %}
<div class="form-group"> -->
<!-- You can take parameters from the user using the form elements and pass them to backend -->
<label class="col-md-4 control-label">File: </label>
<div class="col-md-6">
<input class="form-control" type="file" name="file" />
</div>
</div>
<div class="col-md-offset-4 col-md-10 submit-row">
<button type="submit" class="btn btn-primary">Process File</button>
</div>
<div class="container">
{% for message in get_flashed_messages() %}
<div class="alert alert-warning">
{{ message }}
</div>
{% endfor %}
</div>
</form>
{% endblock %}
The plugin allows me to choose a file to upload, and after I selected a csv file, here is the output from debug.html
page:
path_to_save
: /usr/local/airflow/uploads/temp.csv
file_object
: <streaming_form_data.targets.FileTarget object at 0x7f7be38fb550>
header
: Host: localhost:8080 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8 Accept-Encoding: gzip, deflate Accept-Language: en-us Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryDSj0i1GXH4P0ITsx Origin: http://localhost:8080 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15 Connection: keep-alive Upgrade-Insecure-Requests: 1 Referer: http://localhost:8080/pipelinelauncher/ Content-Length: 328842 Cookie: session=.eJwlj0tuQyEMRffCOAP-tt9mngzYbVSaVMAbRd17iSqPru5Hxy9z6pD5aQ7lPuVmznszhwnQFAGJrGvFh4aOxbP1PiOpT14qVScgBUKELATYgFJRpSSQhKu3pFmEi5Kr6kVjzM0iWqLkss9SiEKstlllZIGguGvIO6wCxWyQHxnf_JDHMsca10arc-i5nl_y2ISsEZLDrLW6nGy2GPYxSAQBTrYgqrftvdT445yL1zVPvfcl413vfTv9WbnLlnvyZq4p4_99Z37_AH8MU-Q.YTuY0A.t-_l07dcNPe_RN6CWI_Pg5cZ3vo
filename
: None
content_type
: None
Any help would be appreciated. Thank you.