Source code for neurocaas_contrib.blueprint
## Module to manage blueprint creation and updating.
import docker
from collections import deque
import json
[docs]class Blueprint(object):
"""Blueprint object to manage blueprint entry read/write.
Inputs:
:param path: Path to a blueprint object.
"""
def __init__(self,path):
"""Constructor.
"""
self.config_filepath = path
with open(self.config_filepath,"r") as f:
config = json.load(f)
self.blueprint_dict = config
[docs] def reload(self):
"""Reload the blueprint from file.
"""
with open(self.config_filepath,"r") as f:
self.blueprint_dict = config
[docs] def write(self):
"""Write back to the original source file:
"""
with open(self.config_filepath,"w") as f:
json.dump(self.blueprint_dict,f,indent = 4)
[docs] def update_container_history(self,container_name):
"""Updates the container history with a most recent entry. #TODO check that this container exists.
:param container_name: name of the container.
"""
container_history = deque(self.blueprint_dict.get("container_history",[]),maxlen = 5)
container_history.append(container_name)
self.blueprint_dict["container_history"] = list(container_history)
@property
def active_container(self):
"""First check if container is running
"""
return self.blueprint_dict.get("container_history",[None])[-1]
@property
def active_container_status(self):
"""First check if container is running
"""
containername = self.blueprint_dict.get("container_history",[None])[-1]
if containername is None:
return containername
try:
client = docker.from_env()
cont = client.containers.get(containername)
status = cont.status
containernamestatus = f"{containername} ({status})"
except docker.errors.NotFound:
containernamestatus = containername
return containernamestatus
[docs] def update_image_history(self,image_name):
"""Updates the image history with a most recent entry. #TODO check that this image exists.
:param image_name: name of the image to update with.
"""
image_history = deque(self.blueprint_dict.get("image_history",[]),maxlen = 5)
image_history.append(image_name)
self.blueprint_dict["image_history"] = list(image_history)
@property
def active_image(self):
return self.blueprint_dict.get("image_history",[None])[-1]
#return self._active_image
#@active_image.getter
#def active_image(self):
# self._active_image = self.blueprint_dict.get("image_history",[None])[-1]
[docs] def update_develop_history(self,develop_dict):
"""Updates the development history with a most recent entry.
:param develop_dict: development dictionary to specify the NeuroCAASAMI object.
"""
develop_history = deque(self.blueprint_dict.get("develop_history",[]),maxlen = 5)
develop_history.append(develop_dict)
self.blueprint_dict["develop_history"] = list(develop_history)