Skip to content

Conversation

@jankatins
Copy link
Member

Refactors the file reader and adds s3 file reader as an alternative to local file reads.

New commands:

  • data_integration.parallel_tasks.files.ParallelReadS3File: reads in a whole bucket
  • data_integration.commands.files.ReadS3File: reads a single file from S3

From initial testing, this is a lot slower than sync + reading from a local file system (both iterating over the bucket to get the file list and the individual reads...) but then syncing that bucket to a local filesystem is also taking time... From my perspective this is only worth it if you have to do a "sync to local" every time (which we have to do, not volumns in our ETL container :-(), so the second run is then saving time compared to doing a sync + incremental read via file system. That's at least the theory, up to now I only tested locally.

The single file read will also come in handy as a replacement of google sheet imports.

WIP...

initial_node: Task = None
final_node: Task = None

def __init__(self, id: str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One can also add a pipeline or ParallelTask as initial / final node

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really: there are places which expect a task, at least I had places where intelij complained that a method wasn't available

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it in a different way


def add_final(self, node: Node) -> 'Pipeline':
def add_final(self, node: Task) -> 'Pipeline':
self.final_node = node
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it a different way

Copy link
Member

@martin-loetzsch martin-loetzsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very good otherwise. Please squash.

Let's wait with a release for the other PR

class ReadS3File(_ReadFile):
"""Reads data from a S3 file"""

def __init__(self, s3_url: str, compression: Compression, target_table: str,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd think the parameter s3_url should be called s3_uri, according to the cp command. An URL is always an URI, but not all URIs are URLs. See as well wikipedia URL

# Conflicts:
#	mara_pipelines/commands/files.py
@ghost ghost added the enhancement New feature or request label Sep 15, 2020
@martin-loetzsch
Copy link
Member

@jankatins is this running in production?

@jankatins
Copy link
Member Author

@martin-loetzsch Nope, should also be integrated into https://github.com/mara/mara-storage where this looks much easier to do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants