neurocaas_contrib package¶
Subpackages¶
Submodules¶
neurocaas_contrib.Interface_S3 module¶
Script to download a video from the relevant amazon S3 bucket into a temporary diretory.
- class neurocaas_contrib.Interface_S3.ProgressPercentage_d(client, BUCKET, KEY, display=False)[source]¶
Bases:
objectHelper class to get and display percentage of data downloaded. If display is set to false, assume that we’re writing to a remote log file, and include newlines.
- class neurocaas_contrib.Interface_S3.ProgressPercentage_u(FILEPATH, display=False)[source]¶
Bases:
objectHelper class to get and display percentage of data uploaded. If display is set to false, assume that we’re writing to a remote log file, and include newlines.
- neurocaas_contrib.Interface_S3.download(s3path, localpath, display=False)[source]¶
Download function. Takes an s3 path to an object, and local object path as input. :param s3path: full path to an object in s3. Assumes the s3://bucketname/key syntax. :param localpath: full path to the object name locally (i.e. with basename attached). :param display: (optional) Defaults to false. If true, displays a progress bar.
- neurocaas_contrib.Interface_S3.upload(localpath, s3path, display=False)[source]¶
Upload function. Takes a local object paht and s3 path to the desired key as input. :param localpath: full path to the object name locally (i.e. with basename attached). :param s3path: full path to an object in s3. Assumes the s3://bucketname/key syntax. :param display: (optional) Defaults to false. If true, displays a progress bar.
neurocaas_contrib.blueprint module¶
- class neurocaas_contrib.blueprint.Blueprint(path)[source]¶
Bases:
objectBlueprint object to manage blueprint entry read/write.
Inputs: :param path: Path to a blueprint object.
- update_container_history(container_name)[source]¶
Updates the container history with a most recent entry. #TODO check that this container exists.
- Parameters
container_name – name of the container.
- property active_container¶
First check if container is running
- property active_container_status¶
First check if container is running
- update_image_history(image_name)[source]¶
Updates the image history with a most recent entry. #TODO check that this image exists.
- Parameters
image_name – name of the image to update with.
- property active_image¶
neurocaas_contrib.cli_commands module¶
- neurocaas_contrib.cli_commands.save_ami_to_cli(ami)[source]¶
Save a dictionary representing the development history to the cli’s config file.
- Parameters
ami – NeuroCAAS Ami object
- neurocaas_contrib.cli_commands.delete_ami_from_cli(develop_dict, force=False)[source]¶
Clears instance and blueprint from cli’s config file. :param develop_dict: the development dictionary that holds details about development you have already done. :returns: bool- whether or not deletion happened
- neurocaas_contrib.cli_commands.create_ctx(ctx, location, analysis_name, develop_dict)[source]¶
helper function to attempt to create as much of the context object as is available.
- Parameters
ctx – click context object, used to pass state to subcommands
location – path to the base blueprint directory. (or None)
analysis_name – name of the analysis we want to find in location. (or None)
developdict – dictionary holding details of development (or None)
- neurocaas_contrib.cli_commands.create_test_dir(path)[source]¶
Given an analysis location, creates a directory within it with testing resources that are configured correctly. :param path: path to the analysis folder (location where stack_config_template.json files are stored).
neurocaas_contrib.connect module¶
- neurocaas_contrib.connect.splitall(path)[source]¶
https://www.oreilly.com/library/view/python-cookbook/0596001673/ch04s16.html
- class neurocaas_contrib.connect.SSHConnection(hostname, hostuser, keypath)[source]¶
Bases:
neurocaas_contrib.connect.SSHContext Manager for paramiko managed ssh clients. From https://extsoft.pro/safely-destroying-connections-in-python/
- class neurocaas_contrib.connect.FTPConnection(hostname, hostuser, keypath)[source]¶
Bases:
neurocaas_contrib.connect.SSHContext Manager for file transfer.
- get(remotepath, localpath)[source]¶
Directly maps to paramiko.sftp_client.SFCTClient.get()
- Parameters
remotepath – path to the remote file we want to get.
localpath – location we want to write to locally.
- put(localpath, remotepath)[source]¶
Directly maps to paramiko.sftp_client.SFCTClient.put()
- Parameters
localpath – path to the local file we want to put.
remotepath – location we want to write to remotely.
- exists(filepath)[source]¶
Like the os.path.exists command through paramiko’s SFTP client. See https://stackoverflow.com/questions/850749/check-whether-a-path-exists-on-a-remote-host-using-paramiko
- isdir(dirpath)[source]¶
Checks if the given path is a directory: https://stackoverflow.com/questions/20507055/recursive-remove-directory-using-sftp/20507586#20507586 :param dirpath:
- mkdir(dirpath)[source]¶
Directly maps to paramiko.sftp_client.SFPTClient.mkdir() :param dirpath: requested path (must be absolute)
- mkdir_notexists(dirpath)[source]¶
Make directory only if it does not exist. :param dirpath: requested path (must be absolute)
- mkdir_r_notexists(dirpath)[source]¶
Make a nested directory, creating new subdirectories as necessary. NOTE: Will not check if dirpath is a filepath. if it is, you might have overwrite issues.
- Parameters
dirpath – requested path (must be absolute)
neurocaas_contrib.local module¶
- class neurocaas_contrib.local.NeuroCAASImage(image_tag=None, container_name='neurocaasdevcontainer')[source]¶
Bases:
objectNeuroCAAS image management. Builds a docker image from the dockerfile, if needed, or attaches to a known one.
- assign_default_image(image_tag)[source]¶
Assigns a new default image to this object.
- Param
The name of a docker image, with the tag parameter specified (as repository:tag)
- assign_default_container(container_name)[source]¶
Assigns a new default image to this object.
- Param
The name of a docker image, with the tag parameter specified (as repository:tag)
- find_image(image_tag)[source]¶
Looks to see if the image requested is locally available. Raises an exception if not.
- Parameters
image_tag – a tag given to the image we are discussing.
- get_default_image()[source]¶
Gets the default image. If it can’t be found, pulls the anaconda3 image and builds from Dockerfile.
- setup_container(image_tag=None, container_name=None, env=None)[source]¶
Probably the most important method in this class. Runs a container off of the image that you created, or another image of your choice. If you include a new image tag, all subsequent commands (until you run this command again) will refer to the corresponding image. :param image_tag: (optional) The name of an image, with the tag parameter specified. If given, will launch a container from this image, and set this object to interface with that image tag from now on (start containers from that image, test that image, etc.) :param container_name: (optional) If given, will launch a container with that name attached. Note this must be lowercase. If not given, will launch with the default name at self.container_name. :param env: (optional, NeuroCAASEnv) a NeuroCAASLocalEnv or NeuroCAASRemoteEnv instance. Files included here will be included in the environment on startup. Furhtermore, the outputs of analysis commands and results will be written to the directory referenced in this environment for easy inspection.
- test_container(command, container_name=None)[source]¶
Test the container with a command. If no container name is given, the container with name at self.container_name will be used. This command will print the output of the given command to the command line. If you want to examine the outputs of the command, do so by coordinating with the localenv object using method [TODO].
- Parameters
command – (str) a string representing the command you would like to be executed by the bash shell inside the container. Will be passed to /bin/bash inside the container as docker exec [container_name] /bin/bash -c ‘[command]’. We recommend passing this string with single quotes on the outside, and double quotes for shell arguments: ex. `NeuroCAASImage.test_container(command = ‘run.sh “parameter1”’
container_name – (optional) The name of the container where we should run the given command. If given, will be assigned status as the current container.
- save_container_to_image_workflow(tag, force=False, script=None)[source]¶
UNTESTED Once you have made appropriate changes and tested, you will want to save your running container to a new image. This version of the code is compatible with the scripting module, and assumes that the whole neurocaas_contrib workflow will be dockerized. This means that the docker container will recieve bucket, datapath, resultpath, and configpath parameters. :param tag: The tag that will be used to identify this image. We recommend providing your tag as the name of your analysis repo + a git commit, like neurocaas/contrib:mockanalysis.356d78a, where 356d78a is the output of running git rev-parse –short HEAD from your git repo. If you provide a tag that is already in use, you will have to provide a “force=True” argument. :param force: (optional) Whether or not to overwrite an image with this name already. Default is force = False :param script: (optional) Path to a script inside the container that should be run at startup. Will be assigned to the dockerfile command as follows: [“bash”,”-c”,”script”,”${bucketname}”,”${data}”,”${result}”,”${config}”], where data and config will be determined at runtime.
- save_container_to_image(tag, force=False, script=None)[source]¶
Once you have made appropriate changes and tested, you will want to save your running container to a new image. This image will be specified as a tag; i.e., your image’s name will be neurocaas/contrib:[tag]. :param tag: The tag that will be used to identify this image. We recommend providing your tag as the name of your analysis repo + a git commit, like neurocaas/contrib:mockanalysis.356d78a, where 356d78a is the output of running git rev-parse –short HEAD from your git repo. If you provide a tag that is already in use, you will have to provide a “force=True” argument. :param force: (optional) Whether or not to overwrite an image with this name already. Default is force = False :param script: (optional) Path to a script inside the container that should be run at startup. Will be assigned to the dockerfile command as follows: [“bash”,”-c”,”script”,”${data}”,”${config}”], where data and config will be determined at runtime.
- run_analysis(command, env, image_tag=None)[source]¶
Full-fledged test an analysis image. Expect outputs in the local environment after the analysis run, along with logs that the use would see.
- Parameters
command – (str) a string representing the command you would like to be executed by the bash shell inside the container. Will be passed to /bin/bash inside the container as docker exec [container_name] /bin/bash -c ‘[command]’. We recommend passing this string with single quotes on the outside, and double quotes for shell arguments: ex. `NeuroCAASImage.test_container(command = ‘run.sh “parameter1”’
env – (NeuroCAASEnv) a NeuroCAASLocalEnv or NeuroCAASRemoteEnv instance. The outputs of analysis commands and results will be written to the directory referenced in this environment for easy inspection.
image_tag – (optional) The name of an image, with the tag parameter specified. If given, will launch a container from this image, and set this object to interface with that image tag from now on (start containers from that image, test that image, etc.)
- run_analysis_workflow(bucket, data, result, config, env, image_tag=None)[source]¶
UNTESTED New version of run_analysis (May 7th) to integrate docker infrastructure built here with the scripting module. Assumes you will pass the bucketname, datapath, resultpath, configpath variables as expected.
- Parameters
bucket – (str) the name of the bucket where these datasets are located. .
data – (str) the path to the dataset to use for analysis.
result – (str) the path to the result folder where we will store outputs.
config – (str) the path to the config file to use for analysis.
env – (NeuroCAASEnv) a NeuroCAASLocalEnv or NeuroCAASRemoteEnv instance. The outputs of analysis commands and results will be written to the directory referenced in this environment for easy inspection.
image_tag – (optional) The name of an image, with the tag parameter specified. If given, will launch a container from this image, and set this object to interface with that image tag from now on (start containers from that image, test that image, etc.)
- run_analysis_parametrized(data, config, env, image_tag=None)[source]¶
Full-fledged test an analysis image. Expect outputs in the local environment after the analysis run, along with logs that the use would see. Don’t need to submit a command, as it’s assumed that this is baked in as the CMD command. instead, pass the data and config you would like to use.
- Parameters
data – (str) the name of the dataset to use for analysis. Assumed to live in ~/io-dir/inputs/
config – (str) the name of the config file to use for analysis. Assumed to live in ~/io-dir/configs/
env – (NeuroCAASEnv) a NeuroCAASLocalEnv or NeuroCAASRemoteEnv instance. The outputs of analysis commands and results will be written to the directory referenced in this environment for easy inspection.
image_tag – (optional) The name of an image, with the tag parameter specified. If given, will launch a container from this image, and set this object to interface with that image tag from now on (start containers from that image, test that image, etc.)
- track_job(env, datastatus, certificate, job_id, loginterval=1, timeout=None)[source]¶
- Function to write with the given logging objects to a local file. Logging will be terminated when the container enters any of the following states:
exited (recorded in “status” field of datastatus as success or failed) dead paused
- Parameters
env – (NeuroCAASEnv) a NeuroCAASLocalEnv or NeuroCAASRemoteEnv instance. The outputs of analysis commands and results will be written to the directory referenced in this environment for easy inspection.
datastatus – NeuroCAASDataStatus object to use to log data status.
certificate – NeuroCAASCertificate object to use to log high level data.
job_id – A job id string that uniquely identifies the job being run. Assumed to take the form of a timestamp.
loginterval – Integer giving the amount of time in seconds to wait between writing logs.
timeout – The total time to wait before giving up on tracking the job. NotImplementedYet
- class neurocaas_contrib.local.NeuroCAASEnv(path)[source]¶
Bases:
object
- class neurocaas_contrib.local.NeuroCAASRemoteEnv(path, remote_path, remote_host, remote_username, key_path)[source]¶
Bases:
neurocaas_contrib.local.NeuroCAASEnvClass to explicitly manage an environment around a docker container hosted on a remote instance, and to further sync that with the local environment indicated. One thing I dislike is that in the current implementation, the local directory is a docker volume if we use LocalEnv, but it’s just a directory if we use RemoteEnv. :param path: Local path where we will create a directory called io-dir :param remote_path: Remote location (must be absolute path) where we will create a docker volume to coordinate with docker container :param remote_host: The ip address of the remote host :param remote_username: The username to use on the remote machine. :param key_path: path to the ssh key we will use to connect with the remote host.
- setup_client()[source]¶
Sets up a paramiko client for duration of this object’s life. Separates out ssh and ftp connection context managers.
- config_io_path()[source]¶
Configure the local path using the path and io_path variables as in the abstract class. However
- class neurocaas_contrib.local.NeuroCAASLocalEnv(path)[source]¶
Bases:
neurocaas_contrib.local.NeuroCAASEnvA class to explicitly manage the local environment around a docker container. A key feature to running local tests. Will create/locate a local directory named “io-dir” at the specified location, with appropriately named subdirectories, and designate it as a docker volume ready to be mounted on testing runs. Volume setup from :https://stackoverflow.com/questions/39496564/docker-volume-custom-mount-point
- class neurocaas_contrib.local.NeuroCAASAutoScript(scriptjson, templatepath)[source]¶
Bases:
objectDeveloper tool to automate creation and testing of an analysis-specific bash script.
- add_dlami()[source]¶
Sources the dlami bash script to correctly configure the ec2 os environment with GPU.
- append_conda_path_command(path=None)[source]¶
Generates the material we want to append to the python path to find the anaconda environment correctly. Will assume that anaconda(3) is installed in the user’s home directory. An alternative path to anaconda3/bin can be supplied if this is not the case. :param path: (optional) if given, will check that the anaconda bin exists at that location, instead of being installed in the user’s root directory :return: the bash command we will use to appropriately format the anaconda path.
- check_conda_env(env_name)[source]¶
Checks if a conda env exists on this machine, and returns a boolean exists/not exists.
- Parameters
env_name – environment name.
- Returns
boolean, if this environment exists or not.
- add_conda_env(check=True, path=None)[source]¶
Adds commands to enter a conda virtual environment to template script. If check, will check that this virtual environment exists before adding.
- Parameters
check – boolean asking if we should check that the environment exists first or not.
path – (optional) if provided, looks here for the conda installation. Otherwise will defaults to $user_root_dir/conda_dir(anaconda3 if not provided).
- write_new_script(filename)[source]¶
Writes the current contents of scriptlines to file.
- Parameters
filename – name of file to write to.
- class neurocaas_contrib.local.NeuroCAASDeveloperInterface(pipelinename)[source]¶
Bases:
objectNew developer interface that will form the basis for a python package.
- initialize_blueprint(ami, region)[source]¶
Initializes a blueprint with the basic info needed to get an instance up and running. ## Creates a special directory where pipeline specific information will be stored.
neurocaas_contrib.log module¶
- neurocaas_contrib.log.find_linebreaks(tup)[source]¶
Finds part of the file indicating the per-dataset log. args: :param tup: tuple where the first element is an index and the second is the corresponding line of a text file. Compares against global variable “divider” to find linebreaks
- class neurocaas_contrib.log.WriteObj(init_dict)[source]¶
Bases:
objectWrapper to handle cases where we want to write to local or to s3. If s3, acts like an s3 resource object. If local, acts like a standard file object.
- class neurocaas_contrib.log.NeuroCAASLogObject(s3_path, write_localpath)[source]¶
Bases:
objectAbstract base class for logging objects. Defines an init method that does the following: 1. looks for an initialization file from Amazon S3. 2. if file is available, uses it to initialize internal information. The object will write updates back to this same file. 3. if file is not available, initializes from a local source. The object will write back to the file specified at the path given in parameter write_localpath. This init behavior also determines the initialization of a writeobject that will write to s3 (back to the same file given to initialize) or a local filepath. This local fallback ensures that we don’t lose valuable logging info in cases where processes are being run locally.
- load_init_s3(bucketname, path)[source]¶
Load in file to use as initialization for this logging object. :param bucketname: The name of the s3 bucket we are reading from. :param path: The name of the key within the s3 bucket corresponding to the initialization object. :return: Return the content of the s3 file without further processing.
- class neurocaas_contrib.log.NeuroCAASCertificate(s3_path, write_localpath='/home/docs/checkouts/readthedocs.org/user_builds/neurocaas-contrib/checkouts/stable/src/neurocaas_contrib/template_mats/certificate_update.txt', parse=True)[source]¶
Bases:
neurocaas_contrib.log.NeuroCAASLogObjectPer-submission log file that captures the setup of resources on neurocaas, and provides basic summary information about each instance started by the job as it runs. Captures the git commit of the neurocaas blueprint version to ensure reproducibility.
- load_init_s3(bucketname, path)[source]¶
Load in file to use as initialization for this logging object. :param bucketname: The name of the s3 bucket we are reading from. :param path: The name of the key within the s3 bucket corresponding to the initialization object. :return: Return the content of the s3 file without further processing.
- load_reinit_local()[source]¶
Load in an arbitrary file to use as reinitialization for this logging object. Should be a dictionary.
- reload()[source]¶
Reload certificate from designated location, and reprocess. returns rawfile as expected.
- assign_template()[source]¶
Assigns template strings to allow for easy fill in of certificate updates..
- get_default_rawfile()[source]¶
Get the default certificate from a local location. This ensures we can continue with processing even when the job is not launched from remote.
- Returns
raw certificate file.
- process_rawcert(cert)[source]¶
Takes the raw certificate and preprocesses it for easier handling. In particular, separates it into line breaks, identifies the parts of the file that we should write to, and identifies individual lines by their corresponding data. Will assign values to the self.certlines and self.writedict attributes.
- Parameters
cert – raw data containing certificate file.
- Returns
tuple (certdict, writedict, writearea) of dictionaries and a range object. First entry has line numbers as keys and content of those lines as values.Second entry has line numbers as keys, and a dictionary of format {“dataname”:dataname,”line”:text} as value. Third entry indicates the range of lines where we can write.
- update_instance_info(updatedict, loc=0)[source]¶
Updates the info on an instance in the certificate. Update takes the form of a dictionary, with the following entries, where all values are strings.: {
“n”: datasetname, “s”: job status (INITIALIZING, IN PROGRESS, FAILED, SUCCESS), “t”: time of last update, “r”: last command run, “u”: CPU usage
} If any of these entries are not given they are set to N/A. Additionally, the location in the certificate where these values should be written will be inferred from the provided dataset name: i.e., if there are multiple instances being tracked by the certificate file at once, we need to know which one write this data to. If no dataset name is provided, we fall back to writing on the line indexed by the variable “loc”, which is given relative to all writable lines.
- Parameters
updatedict – A dictionary giving the values to update individual parameters.
loc – (optional) The relative line number that this update should be written to. Default is 0.
- initialize_writeobj(mode, bucket=None, path=None, localpath=None)[source]¶
Method to initialize the WriteObj object passed to self.writeobj. Determines if we are writing to s3 (as in service mode) or to a local location (debugging). Note that if mode is local, bucket and path arguments are not required, and vice versa for s3 and localpath. However if they are not included for a particular mode an error will be thrown.
- Parameters
mode – processing mode; either “local” or “s3”. Will initiaize
:param bucket:(optional) name of the s3 bucket to write to. Will not be used if mode is local, but :param path:(optional) make of the key in the s3 bucket indicated to write to. :param path:(optional) make of the key in the s3 bucket indicated to write to.
- write()[source]¶
Writes the contents of the file as dictated by the self.writeobj attribute. If writeobj is s3 (default), the updated certificate will be written to the path at self.s3_path. If not (s3 not reachable for any reason) will be written to the file ./template_mats/certificate_update.txt for inspection. If you intend to write to a different file location, use the method write_local instead.
- class neurocaas_contrib.log.NeuroCAASDataStats(s3_path, write_localpath)[source]¶
Bases:
neurocaas_contrib.log.NeuroCAASLogObjectBase class for original and docker based DataStatus log objects.
- load_init_s3(bucketname, path)[source]¶
Load in file to use as initialization for this logging object. Should be a dictionary. :param bucketname: The name of the s3 bucket we are reading from. :param path: The name of the key within the s3 bucket corresponding to the initialization object. :return: Return the content of the s3 file without further processing (will be a dictionary).
- load_reinit_local()[source]¶
Load in local file to use as reinitialization for logging object. Should be a dictionary. :return: dictionary of status file.
- get_default_rawfile()[source]¶
Get the default dataset status file from a local location. This ensures we can continue with processing even when the job is not launched from remote. For this analysis, this file is a dictionary.
- Returns
raw certificate file .
- write()[source]¶
Writes the contents of rawfile as dictated by the self.writeobj attribute. Will sort entries with an ordereddict according to the attribute self.writeorder. If writeobj is s3 (default), the updated certificate will be written to the path at self.s3_path. If not (s3 not reachable for any reason) will be written to the file ./template_mats/certificate_update.txt for inspection. If you intend to write to a different file location, use the method write_local instead.
- class neurocaas_contrib.log.NeuroCAASDataStatusLegacy(s3_path, write_localpath='/home/docs/checkouts/readthedocs.org/user_builds/neurocaas-contrib/checkouts/stable/src/neurocaas_contrib/template_mats/DATASET_NAME-dataset_update.ext_STATUS.txt.json')[source]¶
Bases:
neurocaas_contrib.log.NeuroCAASDataStatsPer-instance log file that captures details about data analyses. Captures stdout/err, exit code, error info, and available information, but does not assume docker based deployment. :param dataset_name: name of the dataset this status object is tracking. :param suffix: any changes to the name of the dataset you want to make.
- get_stdout(filename)[source]¶
Assumes stdout/err are already routed to an existing file. Reads in from that file, line by line
- get_usage()[source]¶
Outputs usage statistics for the machine as a whole . :returns: Output dictionary with the following form:
outdict = { “cpu_total”:cpu_percent, “memory_total_mb”:memory_total_mb }
- class neurocaas_contrib.log.NeuroCAASDataStatus(s3_path, container, write_localpath='/home/docs/checkouts/readthedocs.org/user_builds/neurocaas-contrib/checkouts/stable/src/neurocaas_contrib/template_mats/DATASET_NAME-dataset_update.ext_STATUS.txt.json')[source]¶
Bases:
neurocaas_contrib.log.NeuroCAASDataStatsPer-instance log file that captures details about each individual dataset analysis run: entire history of messages printed to stdout/stderr, the exit code, any error information, etc. Written as a json file for convenience. Takes a running docker container and does everything needed to parse out relevant arguments from it. This includes the output to stdout and stderr, the current cpu usage and memory usage, the docker container object that we will be querying for relevant status information. Note that this file is also assumed to be initialized by a lambda generated file, so we should treat it like the certificate file with similar failsafes to fall back on local processing. We inherit an init method from NeuroCAASLogObject to enable this.
- Parameters
dataset_name – name of the dataset this status object is tracking.
container – docker container object that we will be querying for relevant status information.
- get_stdout()[source]¶
Get the current output to container.logs() and format without escape characters.
- Returns
Most recent logs, formatted as a list of strings.
- get_usage()[source]¶
Get the current usage information for the container. Unfortunately, docker does not itself calculate cpu percentages for you. We will take the raw, high level usage stats and return them as a dictionary. NOTE: It’s very difficult to find confirmation that these numbers are reported in bytes, but that is the assumption given the way that other commands (i.e. docker run) work.
- Returns
dictionary containing output statistics
neurocaas_contrib.monitor module¶
- class neurocaas_contrib.monitor.RangeFinder[source]¶
Bases:
objectobject class to keep track of the range of dates we are considering.
- diff(datetime_str)[source]¶
Takes in a string formatted datetime (formatted as self.form), and compares it with now.
- neurocaas_contrib.monitor.ls_name(bucket_name, path)[source]¶
Get the names of all objects in bucket under a given prefix path as strings. Takes the name of the bucket as input, not hte bucket itself for usage outside of the utils module.
- Parameters
bucket_name (str) – name of s3 bucket to list.
path (str) – prefix path specifying the location you want to list.
- Returns
A list of strings describing the objects under the specified path in the bucket.
- Return type
list of strings
- neurocaas_contrib.monitor.load_json(bucket_name, key)[source]¶
Function to load the contents of a json file stored in S3 into memory for a lambda function.
- Parameters
bucket_name (str) – the name of the bucket where the json file lives.
key (str) – the path to the json object.
- Raises
ValueError. If the key does not point to a properly formatted json file, an exception will be raised.
- Returns
json content: the content of the json file.
- Return type
dict
- neurocaas_contrib.monitor.get_analysis_cost(path, bucket_name)[source]¶
Given a username and the name of a bucket to look in, gets the cost incurred so far by a given group (as recorded in logs)
- neurocaas_contrib.monitor.get_users(dict_files)[source]¶
Presented with a dict of files (response of list_objects_v2), gets usernames from them. Asserts that buckets must be correctly formatted for logging (have an active and logs subfolder.)
- neurocaas_contrib.monitor.get_jobs(dict_files)[source]¶
Given the raw response output, returns a flat list of all the jobs that have been run.
- Parameters
dict_files – raw output of list objects api.
- neurocaas_contrib.monitor.sort_activity_by_users(dict_files, userlist)[source]¶
When given the raw response output + list of usernames, returns a dictionary of files organized by that username. Passes on debugging logs and logs that are currently active. :param dict_files: raw output of list objects api. :param userlist: a list of usernames for whom we will assign jobs. :return: userdict, a dictionary indexed by user names, with values giving lists of jobs attributed to that user.
- neurocaas_contrib.monitor.get_user_logs(bucket_name)[source]¶
returns a list of s3 paths corresponding to logged users inside a bucket.
- Parameters
bucket_name – the name of the s3 bucket we are looking for
- neurocaas_contrib.monitor.get_duration(start, end)[source]¶
Get the duration of a job from a pair of strings using datetime. 2020-05-17T01:21:05Z
- neurocaas_contrib.monitor.calculate_usage(bucket_name, usage_list, user)[source]¶
gets the json files containing the usage for a particular user, and returns the total (number of hours, cost, and number of jobs run) per month. :param bucket_name: string giving the s3 bucket we are reading into. :param usage_list: a list of job logs, for a particular user authorized to use this analysis. :param user: the user to whom we should assign this usage.
- neurocaas_contrib.monitor.calculate_parallelism(bucket_name, usage_list, user)[source]¶
calculates the paralellism of user’s usage. How much of the total running job time was spent on jobs running together?
- neurocaas_contrib.monitor.postprocess_jobdict(by_job)[source]¶
Given a dictionary where the keys are job names, and the values are dictionaries with metadata about that job, looks in particular for jobs where some of the time entries have been neglected. If just the Start time has been neglected, replaces that with the last recorded start time as an esimate, and fills in the corresponding duration. If the whole job has no start or end times, remove it.
- neurocaas_contrib.monitor.calculate_parallelism_nones(bucket_name, usage_list, user)[source]¶
Organizes individual runs into jobs, enven if none.
- class neurocaas_contrib.monitor.LambdaMonitor(stackname)[source]¶
Bases:
objectBase class for lambda monitoring. Has specific subtypes for main and sub lambdas
- get_logs(hours=1)[source]¶
Get the lambda logs indicating NeuroCAAS job processing for the last {hours} hours. The result will be returned as a list of dictionaries, with the key indicating the request id, and the value the lines of text included. Code from :https://stackoverflow.com/questions/59240107/how-to-query-cloudwatch-logs-using-boto3-in-python :param hours: the number of hours to start collecting logs in. :returns: a list of dictionaries, containing logs for requests in reverse chronological order.
- class neurocaas_contrib.monitor.LogMonitor(stackname)[source]¶
Bases:
neurocaas_contrib.monitor.LambdaMonitorMonitor the logs coming off of a given analysis.
- class neurocaas_contrib.monitor.JobMonitor(stackname)[source]¶
Bases:
neurocaas_contrib.monitor.LambdaMonitorMonitor a job as it is running. Given a submit file as input, uses it to trace details about a running job.
- get_lambda_id()[source]¶
Code to get the physical resource id of a cfn main lambda function from the stackname:
- Returns
physical resource id of the cloudformation lambda function.
- get_certificate(submitfile)[source]¶
Get the certificate file corresponding to a given submit file.
- Parameters
submitfile – path to a submit file.
- Returns
a NeuroCAASCertificate object.
- get_certificate_values(timestamp, groupname)[source]¶
Get the certificate file given only the timestamp and groupname of a job (useful if running as dev).
- Parameters
groupname – name of the group where we’re going look for jobs.
timestamp – timestamp field of a submit file.
- Returns
a NeuroCAASCertificate object
- get_datasets(submitfile)[source]¶
Get the list of datasets associated with a given submit file.
- Parameters
submitfile – path to a submit file.
- Returns
dictionary of instances.
- get_datastatus(submitfile, dataset)[source]¶
Get the datastatus file associated with a given submit file and dataset.
- Parameters
submitfile – path to a submit file.
dataset – basename of the dataset to use.
- Returns
dictionary of instances.
- get_datastatus_values(groupname, timestamp, dataset)[source]¶
Get the datastatus file associated with a given group name, timestamp, and dataset.
- Parameters
groupname – name of the group where we’re going look for jobs.
timestamp – timestamp field of a submit file.
dataset – basename of the dataset to use.
- Returns
a NeuroCAASDataStatusLegacy object
neurocaas_contrib.remote module¶
- neurocaas_contrib.remote.return_tags(timeout)[source]¶
Formats tags to launch instances in a way that will not be shut down by neurocaas AWS account monitoring.
- Parameters
timeout – The amount of time, in minutes, for which you are requesting this instance to be up. Should be given as an integer.
- class neurocaas_contrib.remote.NeuroCAASAMI(path)[source]¶
Bases:
objectThis class streamlines the experience of building an ami for a new pipeline, or impriving one within an existing pipeline. It has three main functions: 1) to launch a development instance from amis associated with a particular algorithm or pipeline, 2) to test said amis with simulated job submission events, and 3) to create new images once development instances are stable and ready for deployment.
This class only allows for one development instance to be launched at a time to encourage responsible usage.
This class assumes that you have already configured a pipeline, having created a folder for it, and filled out the template with relevant details [not the ami, as this is what we will build here.]
Inputs: path (str): the path to the directory for a given pipeline.
Example Usage:
`python devenv = NeuroCaaSAMI("../../sam_example_stack/") ## Declare in reference to a particular NCAP pipeline devenv.launch_ami() ## function 1 referenced above ### Do some development on the remote instance devenv.submit_job("/path/to/submit/file") ## function 2 referenced above ### Monitor the remote instance to make sure that everything is running as expected, outputs are returned devenv.create_devami("new_ami") ## function 3 referenced above devenv.terminate_devinstance() ## clean up after done developing `- assign_instance(instance_id)[source]¶
Add a method to assign instances instances as the indicated development instance.
- Parameters
instance_id – takes the instance id as a string.
- launch_devinstance(ami=None, volume_size=None, timeout=60, DryRun=False)[source]¶
Launches an instance from an ami. If ami is not given, launches the default ami of the pipeline as indicated in the stack configuration file. Launches on the instance type given in this same stack configuration file.
Inputs: :param ami (str): (Optional) if not given, will be the default ami of the path. This has several text options to be maximally useful.
[amis recent as of 3/16] ubuntu18: ubuntu linux 18.06, 64 bit x86 (ami-07ebfd5b3428b6f4d) ubuntu16: ubuntu linux 16.04, 64 bit x86 (ami-08bc77a2c7eb2b1da) dlami18: ubuntu 18.06 version 27 (ami-0dbb717f493016a1a) dlami16: ubuntu 16.04 version 27 (ami-0a79b70001264b442)
- Parameters
(int) (volume_size) – (Optional) the size of the volume to attach to this devinstance.
timeout (int): (Optional) the amount of time for which you are requesting this instance, in minutes. default is 1 hour. INTANCE WILL BE STOPPED AFTER THIS TIMEOUT COMPLETES. :param DryRun (bool): for debugging. if dryrun will not launch an instance.
- change_owner(owner, DryRun=True)[source]¶
Change the owner of a pipeline. Currently does not work with testdev permissions; included for testing purposes
- extend_lifetime(additional_time, DryRun=False)[source]¶
If you need more time to develop, extend the requested lifetime of your instance by additional_time minutes.
- Parameters
(additional_time) (additional_time) – The amount of time that you woul
- submit_job(submitpath)[source]¶
Submit a test job with a submit.json file. Inputs: submitpath:(str) path to a submit.json formatted file.
- job_status(jobind=- 1)[source]¶
method to get out stdout and stderr from the jobs that were run on the instance. Inputs: jobind (int): index giving which job we should be paying attention to. Defaults to -1
- job_output(jobind=- 1)[source]¶
method to get out stdout and stderr from the jobs that were run on the instance. Inputs: jobind (int): index giving which job we should be paying attention to. Defaults to -1
- start_devinstance(timeout=60)[source]¶
method to stop the current development instance. Specify a timeout for how long you expect the instance to be active.
- terminate_devinstance(force=False)[source]¶
Method to terminate the current development instance. Inputs: force (bool): if set to true, will terminate even if results have not been saved into an ami.
- create_devami(name)[source]¶
Method to create a new ami from the current development instance.
Inputs: name (str): the name to give to the new ami.
- update_blueprint(ami_id=None, message=None)[source]¶
NOTE: update 4/28: this function will no longer update the whole blueprint, but only the ami id. For most cases, this should not matter, but it will when you change the command to run a development job, or initialize blueprints from a separate blueprint. Method to take more recently developed amis, and assign them to the stack_config_template of the relevant instance, and create a git commit to document this change.
Inputs: ami_id:(str) the ami id with which to update the blueprint for the pipeline in question. If none is given, defaults to the most recent ami in the ami_hist list. message:(str) (Optional) the message we associate with this particular commit.
- get_instance_state()[source]¶
Checks the instance associated with the DevAMI object, and determines its state. Used to maintain a limit of one live instance at a time during development.
Outputs: (dict): a dictionary returning the status of the instance asso
- check_running()[source]¶
A function to check if the instance associated with this object is live.
Outputs: (bool): a boolean representing if the current instance is in the state “running” or not.
- check_clear()[source]¶
A function to check if the current instance is live and can be actively developed. Prevents rampant instance propagation. Related to check_running, but not direct negations of each other.
Outputs: (bool): a boolean representing if the current instance is inactive, and can be replaced by an active one.
neurocaas_contrib.scripting module¶
- neurocaas_contrib.scripting.get_yaml_field(yamlfile, fieldname)[source]¶
returns the value of a field in a yaml file. If dict, returns json string. If list, returns bash array parsable string. else, returns standard string conversion.
- Parameters
yamlfile – path to the yaml file you want to parse.
fieldname – the name of the field you want to extract.
- neurocaas_contrib.scripting.parse_zipfile(zipname, path=None)[source]¶
Given a zipfile, confirms that it is a zipfile, and that it contains one top level directory. Unzips the zip file, and returns the name of the top level directory. Will throw an error if 1) the file path is not a zip file, or 2) if it contains more than one top level directory.
- neurocaas_contrib.scripting.log_process(command, logpath, s3status)[source]¶
Given a path to an executable, runs it, logs output and prints to stdout.
- Parameters
processpath – command you want to run.
logpath – path where you will log the stdout/err outputs locally.
s3status – s3 path where the dataset is stored
- Returns
return code of the command.
- class neurocaas_contrib.scripting.NeuroCAASScriptManager(path, write=True)[source]¶
Bases:
objectAn object to take care of the management logic of handling input/output and logging on a NeuroCAAS job. Has all of its state stored in a json file called “registration.json” in the io-dir folder where job inputs and outputs are kept.
- classmethod from_registration(path)[source]¶
If a registration file “registration.json” already exists at a given location, initialize from this file.
- register_data(s3path)[source]¶
Given an s3 path, registers that as the location of the data we care about. :param s3path: path to a file in aws s3, given in “s3://bucket/path” format
- register_data_local(localpath)[source]¶
Given a local path, registers that as the location of the data we care about. :param localpath: path to a file on the machine itself.
- register_config(s3path)[source]¶
Given an s3 path, registers that as the location of the data we care about. :param s3path: path to a file in aws s3, given in “s3://bucket/path” format
- register_config_local(localpath)[source]¶
Given a local path, registers that as the location of the config file we care about. :param localpath: path to a file on the machine itself.
- register_file(name, s3path)[source]¶
Given an s3 path, registers that as the location of a file we care about. :param name: name of the file to register this data path under. :param s3path: path to a file in aws s3, given in “s3://bucket/path” format
- register_file_local(name, localpath)[source]¶
Given a local path, registers that as the location of a file we care about. :param name: name of the file to register this data path under. :param localpath: path to a file on the machine itself.
- register_resultpath(s3path)[source]¶
Given an s3 path, registers that as the location where we will upload job data. Give a folder, where you want to generate two subdirectories, “logs”, and “process_results”. Logs and analysis results will be sent to these respective locations.
- register_resultpath_local(localpath)[source]¶
Given an local path, registers that as the location where we will upload job data. Give a folder, where you want to generate two subdirectories, “logs”, and “process_results”. Logs and analysis results will be sent to these respective locations.
- get_data(path=None, force=False, display=False)[source]¶
Get currently registered data. If desired, you can pass a path where you would like data to be moved. Otherwise, it will be moved to self.path/self.subdirs[data] :param path: (optional) the location you want to write data to. :param force: (optional) by default, will not redownload if data of the same name already lives here. Can override with force = True :param display: (optional) by default, will not display downlaod progress. :return: bool (True if downloaded, False if not)
- get_config(path=None, force=False, display=False)[source]¶
Get currently registered config. If desired, you can pass a path where you would like config to be moved. Otherwise, it will be moved to self.path/self.subdirs[config] :param path: (optional) the location you want to write data to. :param force: (optional) by default, will not redownload if config of the same name already lives here. Can override with force = True :param display: (optional) by default, will not display downlaod progress. :return: bool (True if downloaded, False if not)
- get_file(varname, path=None, force=False, display=False)[source]¶
Get currently registered file. If desired, you can pass a path where you would like file to be moved. Otherwise, it will be moved to self.path/self.subdirs[data] :param varname: name of the file key in the registration dictionary. :param path: (optional) the location you want to write data to. :param force: (optional) by default, will not redownload if file of the same name already lives here. Can override with force = True :param display: (optional) by default, will not display downlaod progress. :return: bool (True if downloaded, False if not)
- put_result(localfile, display=False)[source]¶
- Parameters
localfile – the location you want to write data from.
display – (optional) by default, will not display upload progress.
- Returns
bool (True if uploaded, False if not)
- get_name(contents)[source]¶
Given a generic dictionary of structure self.pathtemplate, correctly returns the filename if available. :param contents: a dictionary of structure {“s3”:location,”local”:location}
- get_group(contents)[source]¶
Given a generic dictionary of structure self.pathtemplate, correctly returns the filename if available. :param contents: a dictionary of structure {“s3”:location,”local”:location}
- get_path(contents)[source]¶
Given a generic dictionary of structure self.pathtemplate, correctly returns the local filepath if available. :param contents: a dictionary of structure {“s3”:location,”local”:location}
- get_bucket_name()[source]¶
Given a generic dictionary of structure self.pathtemplate, correctly returns the bucketname if a dataset is registered..
- get_resultpath_tmp()[source]¶
Get the local path to a directory where you can write easily erasable data.
- get_resultpath(filepath)[source]¶
Given the path to a file or directory locally, give the path we would upload it to in S3 (useful for using aws s3 sync)
- neurocaas_contrib.scripting.register_data(s3_datapath)[source]¶
Register the dataset. Get the dataset name and local path, and write it to a persistent file stored at “configpath”.
- neurocaas_contrib.scripting.register_config(s3_configpath)[source]¶
Register the config file to use. Get the config name and local path, and write it to a persistent file stored at “configpath”.