sparcur developer guide
Table of Contents
- Demos
- Extending implementation
- Workflow
- All datasets
- Single dataset
- Future correct single dataset workflow
- Protocols
- Release
- SCKAN
- Internal Structure
Demos
Remote only connection
If you have sparcur.simple
this is the simplest way to get a remote
only connection to Pennsieve.
from sparcur.config import auth from sparcur.simple.utils import backend_pennsieve project_id = auth.get('remote-organization') PennsieveRemote = backend_pennsieve(project_id) root = PennsieveRemote(project_id) datasets = list(root.children)
Otherwise you have to do what backend_pennsieve does for you
from sparcur.paths import PennsieveCache, Path from sparcur.config import auth from sparcur.backends import PennsieveRemote project_id = auth.get('remote-organization') PennsieveRemote = PennsieveRemote._new(Path, PennsieveCache) PennsieveRemote.init(project_id) root = PennsieveRemote(PennsieveRemote.root) datasets = list(root.children)
Validate a dataset
This code is an example of how to use sparcur to get structured metadata from a local dataset.
You can run this example block and it will validate the DatasetTemplate.
Example usage
rsync -r path/to/some/curation/dataset test pushd test/dataset SCIGRAPH_API_KEY=$(python -c 'from pyontutils.config import auth; print(auth.get("scigraph-api-key"))') \ HOME=/tmp/test-home python -m sparcur.simple.validate
Example python usage
from sparcur.simple.validate import main as validate from pathlib import Path path = Path('../resources/DatasetTemplate') blob = validate(path)
Implementation of sparcur.simple.validate
.
import augpathlib as aug from sparcur import pipelines as pipes from sparcur.paths import PathL, CacheL from sparcur.utils import GetTimeNow def makeValidator(dataset_path, time_now=None): if time_now is None: time_now = GetTimeNow() class CacheX(CacheL): def __new__(cls, *args, **kwargs): return super().__new__(cls, *args, **kwargs) CacheX._bind_flavours() class PathX(PathL): """ Workaround absense of cache. """ _cache_class = CacheX def __new__(cls, *args, **kwargs): return super().__new__(cls, *args, **kwargs) # TODO likely will also need to rebind the cache class as well #@property #def dataset_relative_path(self, __drp=dataset_path): #return self.relative_path_from(self.__class__(__drp)) CacheX._local_class = PathX PathX._bind_flavours() # XXX monkey patch TODO sigh FIXME DatasetStructure calls Path directly inside #PathL.dataset_relative_path = Path.dataset_relative_path # must caste before anything else is done so that anchor and # datasets are known dataset_path = PathX(dataset_path) CacheX._dataset_dirs = [CacheX(dataset_path)] # FIXME this is very much not ideal because we don't actually want # the parent in this case CacheX._asserted_anchor = CacheX(dataset_path.parent) class context: path = dataset_path.resolve() id = path.id uri_api = path.as_uri() uri_human = path.as_uri() class lifters: id = context.id remote = 'local' folder_name = context.path.name uri_api = context.uri_api uri_human = context.uri_human timestamp_export_start = time_now.START_TIMESTAMP affiliations = lambda *args, **kwargs: None techniques = tuple() modality = None organ_term = None protocol_uris = tuple() award_manual = None return pipes.PipelineEnd(dataset_path, lifters, context) return pipes.SDSPipeline(dataset_path, lifters, context) # shouldn't need network def main(path=PathL.cwd(), time_now=None, export_local=False, export_parent_path=None, _entry_point=False, validate=False, **kwargs): # ('../resources/DatasetTemplate') pipeline = makeValidator(path, time_now=time_now) data = pipeline.data if _entry_point: from sparcur.simple.export import export_blob export_blob_path = export_blob( data, 'curation-export.json', time_now=time_now, export_parent_path=export_parent_path if export_parent_path is not None else path, **kwargs) return export_blob_path else: return data if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main)
Load Python IR
from sparcur.utils import path_ir ir = path_ir('curation-export.json')
def main(): from sparcur.reports import Report from sparcur.utils import path_ir from pyontutils.core import OntResIri ori = OntResIri('https://cassava.ucsd.edu/sparc/preview/exports/curation-export.ttl') graph = ori.graph ir = path_ir('/tmp/curation-export-test.json') rows = Report._hubmap_terms(graph, ir) anat = [r for r in rows if r[1].prefix in ('UBERON', 'FMA', 'ILX')] all_r = expand_label_curie(rows) ana_r = expand_label_curie(anat) return all_r, ana_r if __name__ == '__main__': return main()
Retrieve protocols
Create a protocols.io account and get API keys.
Then run the following to register with protocols.io. NOTE this is broken at the moment. Manual steps can be found in /docs/sparc-curation/docs/setup.html#config-templates
python -c "import idlib; idlib.Pio._setup()"
You can then run the following to retrieve protocol data.
import idlib from pyontutils.core import OntResIri from pyontutils.namespaces import sparc, rdf def getpio(i): try: return idlib.Pio(i) except idlib.exc.IdlibError as e: pass def getdata(s): try: return s.data() except (idlib.exc.NotAuthorizedError) as e: print(e) except (idlib.exc.IdDoesNotExistError) as e: print(e) def main(): ori = OntResIri("https://cassava.ucsd.edu/sparc/preview/exports/protcur.ttl") g = ori.graph pids = list(g[:rdf.type:sparc.Protocol]) streams = [s for i in pids for s in (getpio(i),) if s] datas = [getdata(s) for s in streams] return datas if __name__ == '__main__': main()
Extending implementation
Adding a new xml type
def new_xml_format(path): from sparcur.extract import xml ex = xml.XmlSource(path) top_tag = ex.e.getroot().tag
Workflow
All datasets
Retrieve
The dependency DAG is as follows.
function sparc-time-friendly () { local UTC_OFFSET_START local TIME_START_NO_OFFSET # gnu coreutils gdate needed for osx support/freebsd # gdate on darwin only has millisecond resolution? # this also won't work on freebsd without gnu coreutils iso8601millis="+%FT%T,%6N" # FIXME do we _really_ need millis!? yemaybe? concurrent startups? utcoffset="+%:z" # I really hope the utc offset doesn't change between start & end # but laptops and airplains do exist, so it could # also how utterly annoying that the date separator and the # negative utc offset share the same symbol ... talk about # an annoying design flaw that is going to haunt humanity # with double the number of calls to date for # ... as # long as anoyone is writing code to deal with time TIME_START_NO_OFFSET=$(date ${iso8601millis} || gdate ${iso8601millis}) UTC_OFFSET_START=$(date ${utcoffset} || gdate ${utcoffset}) local TIME_START="${TIME_START_NO_OFFSET}${UTC_OFFSET_START}" # XXX unused local TIME_START_NO_OFFSET_FS_OK=${TIME_START_NO_OFFSET//:/} local UTC_OFFSET_START_FS_OK=${UTC_OFFSET_START//:/} local TIME_START_FRIENDLY=${TIME_START_NO_OFFSET_FS_OK}${UTC_OFFSET_START_FS_OK} # So. iso8601 guidance on what to do about subsecond time and the utc offset in the compact # representation is not entirely clear, however I _think_ that %FT%T%H%M%S,%6N%z is ok but # the -/+ must stay between the timezone and the rest, so we will have to grab tz by itself local TIME_START_SAFE=${TIME_START_NO_OFFSET_FS_OK//-/}${UTC_OFFSET_START_FS_OK} # XXX unused mv "$(mktemp --directory sparcur-all-XXXXXX)" "${TIME_START_FRIENDLY}" || \ { CODE=$?; echo 'mv failed'; return $CODE; } echo "${TIME_START_FRIENDLY}" } function sparc-get-all-remote-data () { # NOTE not quite all the remote data, the google sheets # don't have caching functionality yet # parse args local POSITIONAL=() while [[ $# -gt 0 ]] do key="$1" case $key in # (ref:(((((((sigh) --project-id) local PROJECT_ID="${2}"; shift; shift ;; --symlink-objects-to) local SYMLINK_OBJECTS_TO="${2}"; shift; shift ;; --log-path) local LOG_PATH="${2}"; shift; shift ;; --parent-path) local PARENT_PATH="${2}"; shift; shift ;; --only-filesystem) local ONLY_FILESYSTEM="ONLY_FS"; shift ;; -h|--help) echo "${HELP}"; return ;; *) POSITIONAL+=("$1"); shift ;; esac done # Why, you might be asking, are we declaring a local project path here without assignment? # Well. Let me tell you. Because local is a command with an exist status. So it _always_ # returns zero. So if you need to check the output of the command running in a subshell # that you are assigning to a local variable _ALWAYS_ set local separately first. # Yes, shellcheck does warn about this. See also https://superuser.com/a/1103711 local PROJECT_PATH if [[ -z "${PARENT_PATH}" ]]; then local PARENT_PATH set -o pipefail PARENT_PATH=$(sparc-time-friendly) || { CODE=$?; echo "Creating "'${PARENT_PATH}'" failed!" set +o pipefail return $CODE; } set +o pipefail fi local LOG_PATH=${LOG_PATH:-"${PARENT_PATH}/logs"} #local LOG_PATH=$(python -c "from sparcur.config import auth; print(auth.get_path('log-path'))") local PROJECT_ID=${PROJECT_ID:-$(python -c "from sparcur.config import auth; print(auth.get('remote-organization'))")} local maybe_slot=() if [[ -n "${SYMLINK_OBJECTS_TO}" ]]; then # MUST use arrays to capture optional arguments like this otherwise # arg values with spaces in them will destroy your sanity maybe_slot+=(--symlink-objects-to "${SYMLINK_OBJECTS_TO}") fi echo "${PARENT_PATH}" # needed to be able to follow logs if [ ! -d "${LOG_PATH}" ]; then mkdir "${LOG_PATH}" || { CODE=$?; echo 'mkdir of ${LOG_PATH} failed'; return $CODE; } fi if [[ -z "${ONLY_FILESYSTEM}" ]]; then # fetch annotations (ref:bash-pipeline-fetch-annotations) echo "Fetching annotations metadata" python -m sparcur.simple.fetch_annotations > "${LOG_PATH}/fetch-annotations.log" 2>&1 & local pids_final[0]=$! # fetch remote metadata (ref:bash-pipeline-fetch-remote-metadata-all) # if this fails with 503 errors, check the # remote-backoff-factor config variable echo "Fetching remote metadata" python -m sparcur.simple.fetch_remote_metadata_all \ --project-id "${PROJECT_ID}" \ > "${LOG_PATH}/fetch-remote-metadata.log" 2>&1 & local pids[0]=$! fi local FAIL=0 # clone aka fetch top level # we do not background this assignment because it runs quickly # and everything that follows depends on it finishing, plus we # need it to finish to set the PROJECT_PATH variable here echo python -m sparcur.simple.clone --project-id "${PROJECT_ID}" --parent-path "${PARENT_PATH}" "${maybe_slot[@]}" echo "Cloning top level" set -o pipefail PROJECT_PATH=$(python -m sparcur.simple.clone \ --project-id "${PROJECT_ID}" \ --parent-path "${PARENT_PATH}" \ "${maybe_slot[@]}" \ 2>&1 | tee "${LOG_PATH}/clone.log" | tail -n 1) || { # TODO tee the output when verbose is passed CODE=$?; tail -n 100 "${LOG_PATH}/clone.log"; echo "Clone failed! The last 100 lines of ${LOG_PATH}/clone.log are listed above."; apids=( "${pids[@]}" "${pids_final[@]}" ); for pid in "${apids[@]}"; do kill $pid; done; set +o pipefail return $CODE; } set +o pipefail # explicit export of the current project path for pipelines # ideally we wouldn't need this, and when this pipeline # finished the export pipeline would kick off, or the export # pipeline would search for ... an existing project path ... # by ... oh right, looking for an environment variable or # checksing some other persistent state ... so this is the one # unless some controlling process sets it top down from the start # but we can't assume that export SPARCUR_PROJECT_PATH="${PROJECT_PATH}" for pid in "${pids[@]}"; do wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; } done if [[ $FAIL -ne 0 || -z "${PROJECT_PATH}" ]]; then echo "${FAIL} commands failed. Cannot continue." echo "${PROJECT_PATH}" return 1 fi # pull aka fetch file system metadata echo "Fetching file system metadata" echo python -m sparcur.simple.pull --project-path "${PROJECT_PATH}" python -m sparcur.simple.pull \ --project-path "${PROJECT_PATH}" \ > "${LOG_PATH}/pull.log" 2>&1 || { CODE=$?; tail -n 100 "${LOG_PATH}/pull.log"; echo "Pull failed! The last 100 lines of ${LOG_PATH}/pull.log are listed above."; echo "${PROJECT_PATH}"; return $CODE; } # fetch metadata files echo "Fetching metadata files" # have to pass project path as a position argument here so that it # does not try to pull aka fetch the file system metadata again echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}" python -m sparcur.simple.fetch_metadata_files \ --project-path "${PROJECT_PATH}" \ > "${LOG_PATH}/fetch-metadata-files.log" 2>&1 & pids_final[1]=$! # fetch files echo "Fetching files" # XXX at some point this will probably also depend on the manifests # so we don't fetch everything with a matching extension # TODO derive --extension from manifests or all it to be passed in echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}" --extension xml # FIXME fetch_files fails silently here :/ python -m sparcur.simple.fetch_files \ --project-path "${PROJECT_PATH}" \ --extension xml \ > "${LOG_PATH}/fetch-files.log" 2>&1 & pids_final[2]=$! local FAIL=0 for pid in "${pids_final[@]}"; do wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; } done # FIXME HACK #find -type f -size 0 -exec getfattr -d {} \; #find -type f -size 0 -exec spc fetch --limit=-1 {} \; if [[ $FAIL -ne 0 ]]; then echo "${FAIL} commands failed. Cannot continue." echo "${PROJECT_PATH}" return 1 fi echo "All fetching completed successfully." }
Validate
This is the graph of the existing approach more or less as implemented
by spc export
.
A slightly more sane version is being implemented as part of
sparcur.simple
which will sandbox the network dependencies.
Export
In the current implementation validation and export are conflated. This is bad, and will be changed.
spc export
must only be run after sparc-get-all-remote-data
,
otherwise there will be network sandbox violations.
For the record there are multiple way invoke spc export
.
# pushd to the project location pushd "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}" spc export popd # pass the project location as a positional argument spc export "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}" # pass the project location as an option spc export --project-path "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}"
At the moment sparc-export-all
is just a wrapper around spc export
.
This will change as we move to a single dataset export model. There
will then likely be a function that checks for datasets that have
changed since last export, updates only those and then collects the
outputs.
function sparc-export-all () { # parse args local POSITIONAL=() while [[ $# -gt 0 ]] do key="$1" case $key in # (ref:(((sigh) --project-path) local PROJECT_PATH="${2}"; shift; shift ;; -h|--help) echo "${HELP}"; return ;; *) POSITIONAL+=("$1"); shift ;; esac done local PROJECT_PATH="${PROJECT_PATH:-$SPARCUR_PROJECT_PATH}" spc export --project-path "${PROJECT_PATH}" }
Single dataset
Retrieve
Desired invocation.
sparc-fexport ${UUID} sparc-fexport dataset:${UUID} sparc-fexport N:dataset:${UUID} sparc-fexport https://api.pennsieve.io/datasets/N:dataset:${UUID} sparc-fexport https://app.pennsieve.io/N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0/datasets/N:dataset:${UUID}
Desired behavior.
dataset state | objects state | action | outcome |
---|---|---|---|
not-existing | do symlink to | retrieve | spc export; path-metadata |
existing | either don't fail | retrieve or equiv | spc export; path-metadata |
Desired output.
Dataset to exports/datasets/${UUID}/${TIMESTAMP_FRIENDLY}/
and is
symlinked to exports/${UUID}/LATEST
. The last run dataset itself
goes to exports/datasets/LATEST
. We don't have to care about the
project identifier because each UUID is unique.
TODO Logging goes ????
This is a quick and dirty version that should just do the right thing given only the dataset id as an input.
Usage sparc-fexport N:dataset:totally-not-a-uuid
. The id provided
may be any of the variants, url, curie, api, human, uuid, etc.
function sparc-export () { echo TODO not ready yet return 1 } function sparc-fexport () { local DATASET_ID="${1}" local DATASET_UUID local DATASET_PATH local EXPORT_PATH DATASET_UUID="$(python -m sparcur.simple.utils --dataset-id ${DATASET_ID})" python -m sparcur.simple.retrieve --dataset-id ${DATASET_UUID} && EXPORT_PATH="$(realpath "${DATASET_UUID}/exports")" && DATASET_PATH="$(realpath "${DATASET_UUID}/dataset")" && pushd "${DATASET_PATH}" && # FIXME we shouldn't need this refetch so I think that retrieve is # broken if files/folders already exist python -m sparcur.cli find \ --name '*.xlsx' \ --name '*.xml' \ --name 'submission*' \ --name 'code_description*' \ --name 'dataset_description*' \ --name 'subjects*' \ --name 'samples*' \ --name 'manifest*' \ --name 'resources*' \ --name 'README*' \ --no-network \ --limit -1 \ --fetch wait $! python -m sparcur.cli export --export-path "${EXPORT_PATH}" & # FIXME TODO this conflates phases local pids[0]=$! # FIXME TODO for now export_single_dataset produces this so we don't run it independently # FIXME there is also a difference in the export folder because the path metadata targets # the last updated data and thus overwrites if the data has not changed but the code has #python -m sparcur.simple.path_metadata_validate --export-path "${EXPORT_PATH}" & #local pids[1]=$! local FAIL=0 # TODO log/notify export failure for pid in "${pids[@]}"; do wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; } done if [[ $FAIL -ne 0 ]]; then echo "${FAIL} commands failed. Cannot continue." echo "${DATASET_UUID}" echo "${DATASET_PATH}" return 1 fi popd # or do it yourself because we might need to explore?? }
for d in $(ls *-* -d); do refetch ${d}; done for d in $(ls *-* -d); do find ~/".local/share/sparcur/export/datasets/${d}/LATEST/curation-export.json" -name 'curation-export.json'; done
Validate
See ref:graph-validate-all for all the related bits.
Extract
python -m sparcur.simple.extract "${DATASET_PATH}"
Retrieve 2
Export
Future correct single dataset workflow
Network access should only be possible during the retrieve phase.
The validate step may happen during extract and transform as well since structure or data content issues may be easier to detect during certain phases. Ideally this would not be the case, but practically it will take more work than necessary given our use cases.
We have to be careful to separate basic validation of the structure of the dataset data from the validation that the identifiers provided in that structure point to values known to their respective remotes.
For example we need to be able to say you are missing a protocol reference
at a separate point in time from saying the remote(s) we asked had no record
of the protocol reference you provided
.
Hypothes.is
This is pulled in bulk independently in a different workflow but it is probably worth checking to see if we need to run it again whenever we run a dataset.
Structure metadata
Retrieve
This is ref:clone.py, ref:fetch_remote_metadata_all.py, and ref:pull.py.
Extract
This is now called ref:path_metadata, but it is also dealt with as part of specimen_dirs.
Transform
Validate
We can't do this right now because the current dataset template cannot be statically validated. Only some of it can be validated when we have the data from the subjects and samples sheets. In this future pipeline the type prefixes will be required so that the structure can be statically verified.
Dataset metadata
Run this if the structure metadata is in a state where we can proceed (i.e. that there is a dataset_description file).
Retrieve
This is ref:fetch_metadata_files.py.
Extract
Transform
Validate
File metadata
Retrieve
This is ref:fetch_files.py. Depends on the manifest files and the dataset structure.
Extract
Transform
Validate
This is local validation, not remote networked validation.
Identifiers and mapping
Retrieve
This is the retrieval/dereferencing of identifier metadata.
It must happen after the file metadata step has been completed so that e.g. identifiers used in MBF segmentation files can be validated. In this step in particular validation and retrieval are essentially the same step. If there is an error during retrieval then it must produce a validation error.
Protocols
Checking and/or retrieving these depends on 3 things. The protocols.io group, the hypothesis group, and the dataset metadata.
Protc
Needs hypothesis and protocols.
Export
Convert
Serialize
Probably also includes load in some cases e.g. for the file level metadata that will be attached to package-id file-id pairs.
Protocols
Release
Since we are moving to run individual datasets the aggregate release process is decouple, and mediated via git:36a749b5c321cdb81ba81f9d35e050ceb8479976
Testing
Code
Data
Reporting
After a release
SCKAN
Overview
Implementation
It should be possible to run all of these steps in a container
derived from a tgbugs/musl:kg-dev-user
docker image. See
https://github.com/tgbugs/dockerfiles/blob/master/source.org#kg-dev-user
https://hub.docker.com/repository/docker/tgbugs/musl/tags?name=kg-dev-user
Data synchronization
All of these things either run manually or are independent of the SCKAN release process. In most cases manual intervention is needed to ensure that various component sources are up to date.
SPARC curation export
See /docs/sparc-curation/docs/setup.html#export-v4 for the most recent workflow for batch release.
NPO
python -m neurondm.build release python -m neurondm.models.apinat_pops_more
Super manual curation, identifier sync, review, merge, commit, push, etc.
ApiNATOMY
~/git/sparc-curation/docs/apinatomy.org --all
deploy to remote
apinat-build --deploy # XXX manual and not fully implemented
For new ApiNATOMY models also update https://github.com/SciCrunch/sparc-curation/blob/master/resources/scigraph/sparc-data.ttl.
NIF-Ontology dev branch
Maybe update sparc community terms (sparc community termset now).
# sh ~/git/pyontutils/nifstd/scigraph/README.org --sct # XXX old use prrequaestor.el approach instead
Currently, run /docs/pyontutils/nifstd/scigraph/README.html#sparc-community-terms-update, review, merge, push to NIF-Ontology
Use ~/ni/sparc/prrequaestor.el
with ~/ni/sparc/sync-specs.el
once they are published (still trying to figure out the best way to do
that … orgstrap repo? new repo? here?).
Blazegraph and SciGraph builds
### make sure all code is up-to-date # scigraph/README.org sh ~/git/pyontutils/nifstd/scigraph/README.org --tangle ### release artifacts # TODO prepare all ontology files first up here # release.org to retrieve ~/git/sparc-curation/docs/release.org --build --sckan --no-blaze --no-load # XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc # we have a circular dependency issue here, which is that # we want to use the apinatomy models from blazegraph for scigraph # but we also want to use the configured NIF-Ontology repo from # scigraph for blazegraph, too many layers ... ~/git/pyontutils/nifstd/scigraph/bin/run-load-graph-sparc-sckan # if you are iterating hard on apinat model development, this is the point # at which you usually want to deploy to dev scigraph and test # release.org ~/git/sparc-curation/docs/release.org --build --sckan --no-blaze --resume # XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc ### testing phase 1, probably move to docker # XXX manual echo manual step waiting for signal from human to start next automated portion
Initial testing
Deploy to dev scigraph.
~/ni/dev/bin/run-deploy-graph-selene
Deploy to local dev blazegraph. /docs/sparc-curation/docs/release.html#*Deploy journal to local server
critical checks:
- run the ApiNATOMY dashboard queries and inspect the numbers
- check files in data folder for empty classes e.g. via
grep 'a owl:Class \.$'
(can do this before scigraph load) - check prov-record matches expected
- future run the NPO dashboard
Deploy to staging
### deploy to sparc-scigraph # XXX NOTE we NEVER deploy to sckan-scigraph directly always via # sparc-scigraph and then once that passes we do the release ~/git/pyontutils/nifstd/scigraph/bin/run-deploy-graph-sparc-sckan ### docker image build echo manual step waiting for signal from human to start next automated portion
Test staging
Use mapknowledge to test over all neurons where all neurons is the list from some other query
(defun advise--obe-python-path (command &rest args) (let* ((params (cadr args)) (_ (message "%s" params)) (python-path (cdr (assq :python-path params))) (process-environment (or (and python-path (cons (format "PYTHONPATH=%s" python-path) process-environment)) process-environment))) (apply command args))) (advice-add #'org-babel-execute:python :around #'advise--obe-python-path)
#import sys #return [[p] for p in sys.path] from mapknowledge import KnowledgeStore from mapknowledge.scicrunch import SCICRUNCH_PRODUCTION, SCICRUNCH_STAGING, SciCrunch from pyontutils.scigraph_codegen import moduleDirect from pyontutils.config import auth from sparcur.utils import log def test_prod(curie): return test_endpoint(curie, SCICRUNCH_PRODUCTION) def test_stage(curie): return test_endpoint(curie, SCICRUNCH_STAGING) def test_endpoint(curie, endpoint): store = KnowledgeStore(scicrunch_release=endpoint) store._KnowledgeStore__scicrunch._SciCrunch__scicrunch_key = auth.get('scigraph-api-key') # XXX SSIIIIIGH try: wat, *rest = curie.rsplit('/', 1) result = store.entity_knowledge(wat) log.info(result) finally: store.close() # skip = ('id', 'label', 'long-label', 'phenotypes', 'references') out = {k:v for k, v in result.items() if v and k not in skip} return out # !?!??!?!!?! apparently a newline before this breaks everything !??! def testn(curie): p = test_prod(curie) s = test_stage(curie) # XXX probably not quite right given ps and s values if p and not s: return False, ('removed', p, s) elif not s: return False, ('missing', p, s) if s and not p: log.info(f'new neuron {curie}') # return True, ('ok', p, s) def testnall(curies): bads = [] for curie in curies: try: ok, data = testn(curie) except Exception as e: ok, data = False, ('error', e, e) if not ok: bads.append((curie, data)) if bads: log.error(bads) # return bads def dewit(endpoint): base = f'https://scicrunch.org/api/1/{endpoint}' # XXX no trailing path scigraphd = moduleDirect(base, 'scigraphd') scigraphd.restService._api_key = auth.get('scigraph-api-key') sgc = scigraphd.Cypher() #sgd = scigraphd.Dynamic() # XXX FIXME all pops no in the dynamic endpoints atm curies_raw = sgc.execute( # XXX must use `org-babel-lob-ingest' on queries.org for this to work """ OPTIONAL MATCH (start:Ontology) <-[:isDefinedBy]-(graph:NamedIndividual) -[:type]->({iri: "https://apinatomy.org/uris/elements/Graph"}) , (start) <-[:isDefinedBy]-(external:Class) -[:subClassOf*]->(:Class {iri: "http://uri.interlex.org/tgbugs/uris/readable/NeuronEBM"}) return external """, limit=99999, output='application/json') #curies_raw = sgd curies = [c['id'] for c in curies_raw['nodes']] #breakpoint() count = 2 sample_size = 15 bads = [] for n in range(count): # randomly sample the subset to test test_set = random.sample(curies, sample_size) _bads = testnall(test_set) bads.extend(_bads) return bads result = dewit(SCICRUNCH_STAGING)
Future: similar for NPO
Build docker images
run the docker build blocks in /docs/dockerfiles/source.html for the sckan release that do all the copying of the build state to the requisite locations order is bottom to top (maybe use ob-lob) /docs/dockerfiles/source.html#&musl-build-sckan-base /docs/dockerfiles/source.html#&musl-build-sckan-services
dump the docker image for the zenodo release /docs/dockerfiles/source.html#&sckan-save-image
Archive release artifacts
# archive all publication artifacts at one of the usual locations e.g. ~/nas/data # FIXME hardcoded paths # FIXME need to set -e on ALL of these blocks I think? _archive=~/"nas/data/" _sckanrz="$(ls -d /tmp/build/release-*-sckan.zip | sort -u | tail -n 1)" _sckansz=/tmp/scigraph-build/sparc-sckan/$(readlink /tmp/scigraph-build/sparc-sckan/LATEST) _sckandz="$(ls -d /tmp/docker-sckan-data-*Z.tar.gz | sort -u | tail -n 1)" declare -a _arts=(${_sckanrz} ${_sckansz} ${_sckandz}) for _art in "${_arts[@]}"; do echo $(basename ${_art}) rsync -a ${_art} ${_archive} done
GitHub release
Create a new pre-release at https://github.com/SciCrunch/NIF-Ontology/releases
tag: sckan-%Y-%m-pre-{n}
target: dev
title: SCKAN %Y-%m-pre-{n}
Upload the blazegraph and scigraph zips.
Push docker images
# push docker images docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:base-*Z | sort | tail -n 1 | awk '{ print $2 }') docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:data-*Z | sort | tail -n 1 | awk '{ print $2 }') docker push tgbugs/sckan:base-latest docker push tgbugs/sckan:latest
Test docker images
Most of the testing at this stage is of the functionality in tgbugs/musl:kg-release-user so it is ok to push the data images first.
### testing echo manual step waiting for signal from human to start next automated portion
TODO automate. Create the new docker container volume for sckan data run it with the latest version of tgbugs/musl:kg-release-user run all the examples etc. Use the shebang block in queries.org or similar to execute all the blocks and check to make sure they are working out as expected. Better yet if we can write some internal consistency checks i.e. between NPO and ApiNATOMY.
Update changelog
### publication echo manual step waiting for signal from human to start next automated portion
### changelog # XXX manual
One way to generate a changelog, not the best, but possible.
pushd ~/git/apinatomy-models git diff $(the last commit before the previous release)..HEAD -- models pushd ~/git/NIF-Ontology git diff $(the last commit before the previous release)..HEAD -- *.ttl ttl
Zenodo release
# prepare the new zenodo release # deploy scigraph image to production
Promote to production
Once all tests have passed and we receive the ok from MAP we promote sparc-scigraph to sckan-scigraph.
- back up existing prod services.yaml
- copy data from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph
- alternately curl from github release or rsync from internal stores, the issue is the release id is not predictable right now
- copy services.yaml (and the raw) from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph
- unzip data
- stop service
- unlink/ln -s
- start service
echo "not running to avoid accidental deployment" echo "if you want to do this comment out these lines and the exit line" exit 1 source "$(eval echo ~/git/pyontutils/nifstd/scigraph/bin/scigraph-functions.sh)" host_stage=aws-scigraph-scigraph host_prod=aws-scigraph-sckan-scigraph host_prod_sudo=aws-scigraph-sckan path_stamped="$(ssh ${host_stage} "realpath graph")" path_zip="${path_stamped}.zip" path_yaml="$(ssh ${host_stage} "realpath services.yaml")" path_yaml_raw="$(ssh ${host_stage} 'realpath $(head -n 1 services.yaml | cut -d" " -f 2)')" __TEST='echo ${USER}@${HOSTNAME} ' __TEST='' # prod backup existing services ssh ${host_prod} "${__TEST}"'cp services.yaml services.yaml-'"$(date +%s)" || exit $? # stage>prod data and services # XXX no rsync because the service users should not have ssh access to anything # FIXME the egress on this is stupid, ideally run this whole script from within aws paths=("${path_zip}" "${path_yaml}" "${path_yaml_raw}") for path in "${paths[@]}"; do ssh ${host_stage} "${__TEST}"'sha256sum '"${path}"; # XXX FIXME BEWARE cannot test the pipe and redirection easily ssh ${host_stage} "${__TEST}"'cat '"${path}" |\ ssh ${host_prod} 'cat - > '"${path}"; ssh ${host_prod} "${__TEST}"'sha256sum '"${path}"; done # prod unzip ssh ${host_prod} "${__TEST}"'unzip -n '"${path_zip}" || exit $? # prod stop relink start ssh -t ${host_prod_sudo} "$(typeset -f service-manager);"\ "${__TEST}"'service-manager scigraph stop &&'\ "${__TEST}"'sudo unlink /var/lib/scigraph/graph;'\ "${__TEST}"'sudo ln -s '"${path_stamped}"' /var/lib/scigraph/graph;'\ "${__TEST}"'service-manager scigraph start'
Internal Structure
Pipelines
Easier to read, harder to debug. The python paradox.
Retrieve
Protocols
Cache annotations. See for usage.
from pathlib import Path from hyputils import hypothesis as hyp from sparcur.config import auth def from_group_name_fetch_annotations(group_name): """ pull hypothesis annotations from remote to local """ group_id = auth.user_config.secrets('hypothesis', 'group', group_name) cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur')) get_annos = hyp.Memoizer(cache_file, group=group_id) get_annos.api_token = auth.get('hypothesis-api-key') # FIXME ? annos = get_annos() return cache_file # needed for next phase, annos are not def from_group_name_cached_annos(group_name): group_id = auth.user_config.secrets('hypothesis', 'group', group_name) cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur')) get_annos = hyp.AnnoReader(cache_file, group=group_id) annos = get_annos() return annos def from_user_name_group_name_h(user_name, group_name): group_id = auth.user_config.secrets('hypothesis', 'group', group_name) h = hyp.HypothesisUtils(username=user_name, group=group_id) h.token = auth.user_config.secrets('hypothesis', 'api', user_name) return h def main(hypothesis_group_name=None, **kwargs): if hypothesis_group_name is None: hypothesis_group_name = 'sparc-curation' from_group_name_fetch_annotations(hypothesis_group_name) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main)
Temporary location for some helper code to clone protcur annos to a new group.
from sparcur.simple.fetch_annotations import ( from_group_name_fetch_annotations, from_group_name_cached_annos, from_user_name_group_name_h) from protcur import document as ptcdoc #def main(): annos = from_group_name_cached_annos('sparc-curation') bannos = [ptcdoc.Annotation(a) for a in annos] pool = ptcdoc.Pool(bannos) anno_counts = ptcdoc.AnnoCounts(pool) if False: # sort debug ta = pool.topological_annos def asdf(ta): hrm = [a.id for a in ta] de = [(i, [(hrm.index(pid) if pid in hrm else 'oops') for pid in a.text_parent_ids]) for i, a in enumerate(ta) if a.text_parent_ids] qq = [(i, l) for i, l in de if [_ for _ in l if _ != 'oops' and i < _]] sigh = sorted(set([x for i, l in qq for x in [i] + l if x != 'oops'])) return qq, sigh qq, sigh = asdf(ta) bads = [ta[f] for f in sigh] sbads = ptcdoc.toposort(bads) qq2, sigh2 = asdf(sbads) group_name = 'fixed-sparc-curation' cache_file = from_group_name_fetch_annotations(group_name) html_annos = from_group_name_cached_annos(group_name) html_bannos = [ptcdoc.Annotation(a) for a in html_annos] #html_bannos = [] html_pool = ptcdoc.Pool(html_bannos) h_p_f = from_user_name_group_name_h('protbot', group_name) #pool.clone_to(html_pool, h_p_f) test = False if test: test_bannos = list(pool.bySlugTail('wzuff6w')) # published with most annos #test_bannos = list(pool.bySlugTail('ba8hiht6')) # published with low number of annos test_pool = ptcdoc.Pool(test_bannos) test_pool.clone_to(html_pool, h_p_f, test=False) else: pool.clone_to(html_pool, h_p_f, test=test) [a._row for a in html_pool._annos] # [h_p_f.delete_annotation(a.id) for a in html_pool._annos] if __name__ == '__main__': main()
Datasets
Clone
This is an example of how to clone the top level of a project.
See ref:utils.py for a good way to instantiate RemotePath
.
from pathlib import Path # clone top level def from_path_id_and_backend_project_top_level(parent_path, project_id, RemotePath, symlink_objects_to=None,): """ given the enclosing path to clone to, the project_id, and a fully configured (with Local and Cache) backend remote path, anchor the project pointed to by project_id along with the first level of children """ project_path = _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to) return _from_project_path_top_level(project_path) def from_path_project_backend_id_dataset(parent_path, project_id, dataset_id, RemotePath, symlink_objects_to=None,): project_path = _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to) return _from_project_path_id_dataset(project_path, dataset_id) def _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to): RemotePath.init(project_id) # calling init is required to bind RemotePath._api anchor = RemotePath.smartAnchor(parent_path) anchor.local_data_dir_init(symlink_objects_to=symlink_objects_to) project_path = anchor.local return project_path def _from_project_path_top_level(project_path): """ given a project path with existing cached metadata pull the top level children WARNING: be VERY careful about using this because it does not gurantee that rmeta is available to mark sparse datasets. It may be the case that the process will fail if the rmeta is missing, or it may not. Until we are clear on the behavior this warning will stay in place. """ # this is a separate function in case the previous step fails # which is also why it is hidden, it makes too many assuptions # to be used by itself anchor = project_path.cache list(anchor.children) # this fetchs data from the remote path to the local path return project_path # returned instead of anchor & children because it is needed by next phase def _from_project_path_id_dataset(project_path, dataset_id): anchor = project_path.cache remote = anchor._remote_class(dataset_id) cache = anchor / remote return cache.local def main(parent_path=None, project_id=None, parent_parent_path=Path.cwd(), project_id_auth_var='remote-organization', # FIXME move default to clifun symlink_objects_to=None, id=None, dataset_id=None, **kwargs): """ clone a project into a random subfolder of the current folder or specify the parent path to clone into """ from sparcur.simple.utils import backend_pennsieve if parent_path is None: breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX FIXME import tempfile parent_path = Path(tempfile.mkdtemp(dir=parent_parent_path)) if project_id is None: from sparcur.config import auth from sparcur.utils import PennsieveId project_id = auth.get(project_id_auth_var) project_id = PennsieveId(project_id) # FIXME abstract the hardcoded backend RemotePath = backend_pennsieve() if id and dataset_id: # FIXME doesn't check for existing so if the name changes we get duped folders # this issue possibly upstream in retrieve, clone just clones whatever you tell # it to clone, but maybe it should check the existing metadata and fail or warn? dataset_path = from_path_project_backend_id_dataset( parent_path, project_id, id, # FIXME multiple datasets RemotePath, symlink_objects_to,) return dataset_path project_path = from_path_id_and_backend_project_top_level( parent_path, project_id, RemotePath, symlink_objects_to,) return project_path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print)
Remote metadata
Remote metadata must be retrieved prior to the first pull in order to ensure that large datasets can be marked as sparse datasets before they are pulled.
Remote metadata can be retrieved using only a project_id. However, for all retrieval after the first pull it is usually more effective to retrieve it at the same time as fetching metadata files since it runs in parallel per dataset.
See for usage.
from joblib import Parallel, delayed from sparcur.backends import PennsieveDatasetData from sparcur.simple.utils import backend_pennsieve def from_id_fetch_remote_metadata(id, project_id=None, n_jobs=12): """ given an dataset id fetch its associated dataset metadata """ if id.type == 'organization': RemotePath = backend_pennsieve() project = RemotePath(id) prepared = [PennsieveDatasetData(r) for r in project.children] if n_jobs <= 1: [p() for p in prepared] else: # FIXME Paralle isn't really parallel here ... # can't use multiprocessing due to broken aug.RemotePath implementation # LOL PYTHON everything is an object, except when you want to pickle it # then some objects are more equal than others Parallel(n_jobs=n_jobs)(delayed(p._no_return)() for p in prepared) elif id.type == 'dataset': RemotePath = backend_pennsieve(project_id) dataset = RemotePath(id) bdd = PennsieveDatasetData(dataset) bdd() else: raise NotImplementedError(id) def main(id=None, project_id=None, project_id_auth_var='remote-organization', # FIXME move to clifun n_jobs=12, **kwargs): if project_id is None: from sparcur.utils import PennsieveId from sparcur.config import auth project_id = auth.get(project_id_auth_var) project_id = PennsieveId(project_id) # FIXME abstract the hardcoded backend if id is None: id = project_id from_id_fetch_remote_metadata(id, project_id=project_id, n_jobs=n_jobs,) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) # nothing to print or do after
The implementation of sparcur.backends.PennsieveDatasetData
supports the ability
to retrieve metadata directly from the remote without the need for an intervening
local path. However this functionality is obscured here because we want to derive
a consistent view of the data from the file system snapshot.
from joblib import Parallel, delayed from sparcur.paths import Path from sparcur.backends import PennsieveDatasetData def _from_project_path_fetch_remote_metadata(project_path, n_jobs=12, cached_ok=False): if n_jobs <= 1: prepared = [PennsieveDatasetData(dataset_path.cache) for dataset_path in project_path.children] [bdd() for bdd in prepared if not (cached_ok and bdd.cache_path.exists())] else: fetch = lambda bdd: bdd() if not (cached_ok and bdd.cache_path.exists()) else None fetch_path = (lambda path: fetch(PennsieveDatasetData(path.cache))) Parallel(n_jobs=n_jobs)(delayed(fetch_path)(dataset_path) for dataset_path in project_path.children) # fetch remote metadata def from_path_fetch_remote_metadata(path, n_jobs=12, cached_ok=False): """ Given a path fetch remote metadata associated with that path. """ cache = path.cache if cache.is_organization(): _from_project_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=cached_ok) else: # dataset_path # TODO more granular rather than roll up to dataset if inside? bdd = PennsieveDatasetData(cache) if not (cached_ok and bdd.cache_path.exists()): bdd() def main(path=Path.cwd(), n_jobs=12, rmeta_cached_ok=False, **kwargs): if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.clone import main as clone path = clone(path=path, n_jobs=n_jobs, **kwargs) # NOTE path is passed along here, but kwargs is expected to contain # parent_path or parent_parent_path and project_id note that if that # happens then the path returned from clone will change accordingly from_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=rmeta_cached_ok) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) # we probably don't print here?
Pull
Pull a single dataset or pull all datasets or clone and pull all datasets.
from joblib import Parallel, delayed from sparcur.paths import Path from sparcur.utils import GetTimeNow # pull dataset def from_path_dataset_file_structure(path, time_now=None, exclude_uploaded=False): """ pull the file structure and file system metadata for a single dataset right now only works from a dataset path """ if time_now is None: time_now = GetTimeNow() path._pull_dataset(time_now, exclude_uploaded) # pull all in parallel def from_path_dataset_file_structure_all(project_path, *args, paths=None, time_now=None, n_jobs=12, exclude_uploaded=False): """ pull all of the file structure and file system metadata for a project paths is a keyword argument that accepts a list/tuple of the subset of paths that should be pulled """ if time_now is None: time_now = GetTimeNow() project_path.pull( paths=paths, time_now=time_now, # TODO debug=False, # TODO n_jobs=n_jobs, log_level='DEBUG' if False else 'INFO', # TODO Parallel=Parallel, delayed=delayed, exclude_uploaded=exclude_uploaded,) # mark datasets as sparse def sparse_materialize(path, sparse_limit:int=None): """ given a path mark it as sparse if it is a dataset and beyond the sparse limit """ cache = path.cache if cache.is_organization(): # don't iterate over cache children because that pulls remote data for child in path.children: sparse_materialize(child, sparse_limit=sparse_limit) else: cache._sparse_materialize(sparse_limit=sparse_limit) def main(path=Path.cwd(), time_now=None, sparse_limit:int=None, n_jobs=12, exclude_uploaded=False, **kwargs): if path != path.resolve(): raise ValueError(f'Path not resolved! {path}') project_path = None # path could be None so can't find_cache_root here if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.fetch_remote_metadata import main as remote_metadata project_path = remote_metadata(path=path, **kwargs) # transitively calls clone else: project_path = path.find_cache_root() if path != project_path: # dataset_path case sparse_materialize(path, sparse_limit=sparse_limit) from_path_dataset_file_structure(path, time_now=time_now, exclude_uploaded=exclude_uploaded) if path == Path.cwd(): print('NOTE: you probably need to run `pushd ~/ && popd` ' 'to get a sane view of the filesystem if you ran this' 'from within a dataset folder') return path if not list(project_path.children): raise FileNotFoundError(f'{project_path} has no children.') # somehow clone failed # WARNING if rmeta failed you may get weirdness # FIXME from sparcur.simple.clone import _from_project_path_top_level _from_project_path_top_level(project_path) sparse_materialize(project_path, sparse_limit=sparse_limit) from_path_dataset_file_structure_all(project_path, time_now=time_now, n_jobs=n_jobs, exclude_uploaded=exclude_uploaded) return project_path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print)
The non-simple implementation of this is quite convoluted so here are links to the current implementation, from outside in. In reverse order the basic steps are pull from dataset packages endpoint, resolve hierarchy, convert to remote paths, and covert to cache paths which materialize the pull as folders or symlinks.
Fetch
def main(path=None, **kwargs): if path is not None: # FIXME this will fail if the remote for the file is not in # the current project, or if the cachedir is not a child of # the top level project directory e.g. in .operations/objects cache = path.cache cache.fetch(size_limit_mb=None) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main)
from itertools import chain from sparcur import exceptions as exc from sparcur.utils import log, logd from sparcur.paths import Path from sparcur.datasets import DatasetStructure from sparcur.simple.utils import fetch_paths_parallel, rglob # fetch metadata files fetch_prefixes = ( ('dataset_description', 'glob'), ('subjects', 'glob'), ('samples', 'glob'), ('submission', 'glob'), ('manifest', 'rglob'), # XXX NOTE the rglob here ) def _from_path_fetch_metadata_files_simple(path, fetch=True): """ transitive yield paths to all metadata files, fetch them from the remote if fetch == True """ for glob_prefix, glob_type in fetch_prefixes: if glob_type == 'rglob': gp0 = glob_prefix[0] pattern = f'[{gp0.upper()}{gp0}]{glob_prefix[1:]}*' yield from rglob(path, pattern) continue ds = DatasetStructure(path) for path_to_metadata in ds._abstracted_paths(glob_prefix, glob_type=glob_type, fetch=fetch): # FIXME fetch here is broken yield path_to_metadata def _from_path_fetch_metadata_files_parallel(path, n_jobs=12): """ Fetch all metadata files within the current path in parallel. """ paths_to_fetch = _from_path_fetch_metadata_files_simple(path, fetch=False) try: first = next(paths_to_fetch) paths_to_fetch = chain((first,), paths_to_fetch) except StopIteration: log.warning('No paths to fetch, did you pull the file system metadata?') return # FIXME passing in a generator here fundamentally limits the whole fetching # process to a single thread because the generator is stuck feeding from a # single process, IF you materialize the paths first then the parallel fetch # can actually run in parallel, but bugs/errors encountered late in collecting # the paths will force all previous work to be redone # XXX as a result of this we use the posix find command to implement rglob # in a way that is orders of magnitude faster paths_to_fetch = list(paths_to_fetch) fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs) def from_path_fetch_metadata_files(path, n_jobs=12): """ fetch metadata files located within a path """ #if n_jobs <= 1: #gen = _from_path_fetch_metadata_files_simple(path) # FIXME broken ??? somehow abstracted paths doesn't fetch when # we run in directly, or somehow fetch_paths_parallel does something # different #paths = list(gen) #else: _from_path_fetch_metadata_files_parallel(path, n_jobs=n_jobs) def main(path=Path.cwd(), n_jobs=12, **kwargs): if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.pull import main as pull path = pull(path=path, n_jobs=n_jobs, **kwargs) from_path_fetch_metadata_files(path, n_jobs=n_jobs) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print)
Fetch files by extension.
import os from sparcur.paths import Path from sparcur.utils import _find_command from sparcur.simple.utils import fetch_paths_parallel def _datasets_with_extension(path, extension): """ Hack around the absurd slowness of python's rglob """ # TODO query multiple extensions with -o at the same time command = fr"""for d in */; do {_find_command} "$d" \( -type l -o -type f \) -name '*.{extension}' \ -exec getfattr -n user.bf.id --only-values "$d" \; -printf '\n' -quit ; done""" with path: with os.popen(command) as p: string = p.read() has_extension = string.split('\n') datasets = [p for p in path.children if p.cache_id in has_extension] return datasets def _from_path_fetch_files_simple(path, filter_function, fetch=True): files = filter_function(path) if fetch: [f.fetch(size_limit_mb=None) for f in files if not f.exists()] #Async(rate=5)(deferred(f.fetch)(size_limit_mb=None) #for f in files if not f.exists()) return files def _from_path_fetch_files_parallel(path, filter_function, n_jobs=12): paths_to_fetch = _from_path_fetch_files_simple(path, filter_function, fetch=False) fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs) def filter_extensions(extensions): """ return a function that selects files in a path by extension """ def filter_function(path): cache = path.cache if cache.is_organization(): paths = set() for ext in extensions: ds = _datasets_with_extension(path, ext) paths.update(ds) else: # dataset_path paths = path, files = [matching # FIXME stream ? for path in paths for ext in extensions for matching in path.rglob(f'*.{ext}')] return files return filter_function def filter_manifests(dataset_blob): """ return a function that selects certain files listed in manifest records """ # FIXME this needs a way to handle organization level? # NOTE this filter is used during the second fetch phase after the inital # metadata has been ingested to the point where it can be use to guide further fetches # TODO this is going to require the implementation of partial fetching I think # TODO preprocessing here? def filter_function(path): # TODO check that the path and the dataset blob match cache = path.cache if cache.id != dataset_blob['id']: msg = f'Blob is not for this path! {dataset_blob["id"]} != {cache.id}' raise ValueError(msg) files = [] # TODO get_files_for_secondary_fetch(dataset_blob) return files return filter_function def from_path_fetch_files(path, filter_function, n_jobs=12): if n_jobs <= 1: _from_path_fetch_files_simple(path, filter_function) else: _from_path_fetch_files_parallel(path, filter_function, n_jobs=n_jobs) def main(path=Path.cwd(), n_jobs=12, extensions=('xml',), **kwargs): #breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.pull import main as pull path = pull(path=path, n_jobs=n_jobs, **kwargs) filter_function = filter_extensions(extensions) from_path_fetch_files(path, filter_function, n_jobs=n_jobs) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main)
Once the initial pass over the dataset is complete extract the list of additional files that need to be retrieved and fetch them.
from sparcur.paths import Path from sparcur.simple.fetch_files import from_path_fetch_files, filter_manifests def from_blob_fetch_files(dataset_blob, path=None): # should the blob contain a reference to the path # it was derived from? filter_function = filter_manifests(dataset_blob) from_path_fetch_files(path, filter_function, n_jobs=n_jobs) def main(path=Path.cwd(), n_jobs=12, **kwargs): #breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX # if not dataset_blob: get_blob vs path blob pairs? # starting from a partial blob means that this probably # should not kick off from the file system, but we know # that we will want to be able to kick it off from the # file system ... maybe the intermediate blobs can encode # the prov of where the file system reference they were # derived from lives ? dataset_blob = get_blob(path.cache_id) # FIXME TODO from_blob_fetch_files(dataset_blob, path) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main)
Retrieve
Putting it all together into a single command.
The behavior of retrieve works exactly as it does for clone the difference is that it runs for just a single dataset and the parent_path is made to be the dataset_id uuid if you are running a single dataset pipeline you will still need the project folder structure for logs and jobs etc. you can also still run all datasets together off of a single SPARC Consoritum folder, in which case all you need to do is pass the communal parent_path
Usage example.
python -m sparcur.simple.retrieve \ --dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \ --symlink-objects-to /mnt/str/tom/cache/bf-object-cache
Example python usage.
from sparcur.paths import Path from sparcur.utils import PennsieveId from sparcur.simple.retrieve import main as retrieve p = PennsieveId('N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0') d = PennsieveId('N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18') ppp = Path('~/files/sparc-datasets').expanduser().resolve() retrieve(id=d, dataset_id=d, project_id=p, parent_parent_path=ppp)
sparcur.simple.retrieve
implementation
from sparcur.paths import Path from sparcur.utils import symlink_latest from sparcur.simple.clone import main as clone from sparcur.simple.fetch_remote_metadata_all import main as remote_metadata from sparcur.simple.pull import main as pull from sparcur.simple.fetch_metadata_files import main as fetch_metadata_files from sparcur.simple.fetch_files import main as fetch_files def main(id=None, dataset_id=tuple(), parent_path=None, parent_parent_path=Path.cwd(), path=None, # keep path out of kwargs invariant_local_path='dataset', #extensions=('xml',), # not needed right now **kwargs): # FIXME parent_path and project_id seem like they probably need to # be passed here, it would be nice if project_path could just be # the current folder and if the xattrs are missing for the # project_id then ... it is somehow inject from somewhere else? # this doesn't really work, because that would mean that we would # have to carry the project id around in the xattr metadata for # all dataset folders, which might not be the worst thing, but # definitely redundant if id is None: raise TypeError('id is a required argument!') if parent_path is None: uuid = id.uuid # FIXME hardcoded backend assumption parent_path = parent_parent_path / uuid parent_path.mkdir(exist_ok=True) elif not parent_path.exists(): parent_path.mkdir() invariant_path = parent_path / invariant_local_path # XXX for now we do these steps in order here # rather than trusting that calling simple.pull.main will do # the right thing if there is no path ... it should but probably # doesn't right now due to assumptions about paths existing # remote metadata from path (could do from id too?) remote_metadata(id=id, **kwargs) # could run parallel to clone, easier in bash # clone single without organization parent somehow seems likely broken? path = clone(id=id, dataset_id=dataset_id, parent_path=parent_path, parent_parent_path=parent_parent_path, **kwargs) # XXX symlink_objects_to will just work if you pass it symlink_latest(path, invariant_path) # pull single pull(path=path, **kwargs) # fetch metadata files fetch_metadata_files(path=path, **kwargs) # FIXME symlink_to # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX fetch_metadata_files does NOT USE the extensions kwarg! # fetch additional files fetch_files(path=path) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print)
Validate
Protocols
Datasets
Extract
Usage example.
python -m sparcur.simple.extract \ --dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \ --export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports
from sparcur import datasets as dat from sparcur import pipelines as pipes from sparcur import exceptions as exc from sparcur.utils import log, logd from sparcur.paths import Path from sparcur.backends import PennsieveDatasetData from sparcur.simple.utils import combinate, multiple, early_failure, DataWrapper from sparcur.simple.fetch_metadata_files import fetch_prefixes class ManifestFiles(DataWrapper): """ wrapper for manifest files. """ def merge_manifests(vs): """ Merge individual manifest records into the same list """ # FIXME massive hack :/ k = 'manifest_file' # FIXME errors key ... ? is it allowed up there? it shouldn't be ... # FIXME {'content': m} return ManifestFiles([m for v in vs for m in v.data[k]]) def object_from_find_path(glob_prefix, object_from_path_function, glob_type='glob', onfail=None): """ Return a function that will find files that start with glob_prefix""" # FIXME should be in utils but depends on fetch_prefixes if glob_prefix not in dict(fetch_prefixes): raise ValueError('glob_prefix not in fetch_prefixes! ' f'{glob_prefix!r} not in {fetch_prefixes}') def func(path, *args, **kwargs): ds = dat.DatasetStructure(path) rpath = None for rpath in ds._abstracted_paths(glob_prefix, sandbox=True): yield object_from_path_function(rpath, *args, **kwargs) if rpath is None and onfail is not None: raise onfail(f'No match for {glob_prefix} in {path.name}') return func # file type -> dataset blob key indirection _TOP = object() # SIGH SIGH SIGH always need a escape hatch otkm = {ThingFilePath.obj:prefix + '_file' for prefix, ThingFilePath in dat.DatasetStructure.sections.items()} otkm[ManifestFiles] = 'manifest_file' otkm[PennsieveDatasetData] = 'remote_dataset_metadata' otkm[type(dat.DatasetStructure())] = 'structure' # hack around Pathlib type mangling otkm[type(dat.DatasetMetadata())] = _TOP # stream of objects -> place in dataset blob def dataset_raw(*objects, object_type_key_map=otkm): data = {} log.debug(objects) #path_structure, description, subjects, samples, submission, manifests, *rest = objects for obj in objects: log.debug(obj) key = object_type_key_map[type(obj)] try: if key is not _TOP: data.update({key: obj.data}) else: data.update(obj.data) except Exception as e: # FIXME current things that leak through # MalformedHeaderError # something in the objects list is a dict breakpoint() pass return data # path + version -> python object # TODO how to attach and validate schemas orthogonally in this setting? # e.g. so that we can write dataset_1_0_0 dataset_1_2_3 etc. # we capture version as early as possible in the process, yes we # could also gather all the files and folders and then pass the version # in as an argument when we validate their structure, but there are # elements of the locations or names of those files that might depend # on the template version, therefore we get maximum flexibility by only # need to look for the dataset description file def description(path): return dat.DatasetDescriptionFilePath(path).object def submission(path, version): return dat.SubmissionFilePath(path).object_new(version) def subjects(path, version): return dat.SubjectsFilePath(path).object_new(version) def samples(path, version): return dat.SamplesFilePath(path).object_new(version) def manifest(path, version): return dat.ManifestFilePath(path).object_new(version) # dataset path -> python object def from_path_remote_metadata(path): return PennsieveDatasetData(path.cache) def from_path_local_metadata(path): return dat.DatasetMetadata(path) from_path_dataset_description = object_from_find_path('dataset_description', description, onfail=exc.MissingFileError) comb_metadata = combinate( # dataset description is not included here because it is special # see from_path_dataset_raw for details from_path_remote_metadata, from_path_local_metadata, ) # dataset path + version -> python object def from_path_dataset_path_structure(path, version): return dat.DatasetStructure(path) from_path_subjects = object_from_find_path('subjects', subjects) from_path_samples = object_from_find_path('samples', samples) from_path_submission = object_from_find_path('submission', submission) from_path_manifests = multiple(object_from_find_path('manifest', manifest, 'rglob'), merge_manifests) # combinate all the individual dataset path + version -> data functions comb_dataset = combinate( #from_path_dataset_description, # must be run prior to combination to get version from_path_dataset_path_structure, from_path_subjects, from_path_samples, from_path_submission, from_path_manifests, #from_path_file_metadata, # this must wait until 2nd fetch phase ) # dataset path -> raw data def from_path_dataset_raw(dataset_path): """ Main entry point for getting dataset metadata from a path. """ gen = from_path_dataset_description(dataset_path) try: ddo = dataset_description_object = next(gen) except exc.MissingFileError as e: # TODO return a stub with embedded error logd.critical(e) dataset_blob = dataset_raw(*comb_metadata(dataset_path)) return early_failure(dataset_path, e, dataset_blob) try: next(gen) # TODO return a stub with embedded error except StopIteration: pass data = ddo.data ddod = type('ddod', tuple(), {'data': data}) dtsv = data['template_schema_version'] return dataset_raw(ddo, *comb_metadata(dataset_path), *comb_dataset(dataset_path, dtsv)) # unused def from_path_file_metadata(path, _version): # FIXME doesn't go in this file probably # FIXME this is going to depend on the manifests # and a second fetch step where we kind of cheat # and prefetch file files we know we will need pass def from_export_path_protocols_io_data(curation_export_json_path): pass def protocols_io_ids(datasets): pass def protocols_io_data(protocols_io_ids): pass def from_group_name_protcur(group_name): pass def protcur_output(): pass def summary(datasets, protocols_io_data, protcur_output): pass def from_path_summary(project_path): dataset_path_structure summary(( dataset( dataset_path_structure, dataset_description, subjects, samples, submission, manifests, *rest ))) def main(path=Path.cwd(), id=None, dataset_id=tuple(), time_now=None, export_path=None, export_parent_path=None, _entry_point=False, **kwargs): # TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output? # TODO parallelize if multiple paths # This assumes that all retrieve operations have # finished successfully for the current dataset from sparcur.simple.export import prepare_dataset_export, export_blob if id is not None: # XXX note that id should be dataset_id # TODO parent_path ?? uuid = id.uuid # FIXME hardcoded backend assumption path = path / uuid / 'dataset' # FIXME hardcoded see invariant_path path = path.resolve() # make sure that invariant_path is expanded as we expect. cache = path.cache if not cache.is_dataset(): raise TypeError('can only run on a single dataset') if _entry_point: if export_parent_path is None: export_parent_path = prepare_dataset_export(path, export_path) kwargs.update({'export_path': export_path, 'export_parent_path': export_parent_path, 'time_now': time_now,}) dataset_raw = from_path_dataset_raw(path) if _entry_point: export_blob_path = export_blob(dataset_raw, 'ir.json', **kwargs) return export_blob_path else: return dataset_raw if __name__ == '__main__': import pprint from sparcur.simple.utils import pipe_main pipe_main(main, after=pprint.pprint)
Transform
There is not a clean separation between transformation and validation because there are multiple transformation and validation steps that are interleaved.
from pathlib import Path from sparcur import schemas as sc from sparcur import pipelines as pipes from sparcur.core import DictTransformer as DT def __apply_step(step, spec, data, **kwargs): return step(data, spec, **kwargs) def popl(data, pops, source_key_optional=False): popped = list(DT.pop(data, pops, source_key_optional)) return data def simple_add(data, adds): pass def execute_pipeline(pipeline, data): for func, *args, kwargs in pipeline: # man variable arity is a pain to deal with here # where are lambda lists when you need them :/ # FIXME maybe we can make steps functional instead of mutating? if not kwargs: kwargs = {} func(data, *args, **kwargs) return data def __wd(transformer): # not needed atm since transformers do in place modification def inner(data, *args, **kwargs): transformer(data, *args, **kwargs) return data def schema_validate(data, schema, fail=False, pipeline_stage_name=None): if isinstance(schema, type): # we were passed a class so init it # doing it this way makes it easier to # use remote schemas that hit the network # since the network call doesn't have to # happen at the top level but can mutate # the class later before we init here schema = schema() ok, norm_or_error, data = schema.validate(data) if not ok: if fail: logd.error('schema validation has failed and fail=True') raise norm_or_error if 'errors' not in data: data['errors'] = [] if pipeline_stage_name is None: pipeline_stage_name = f'Unknown.checked_by.{schema.__class__.__name__}' data['errors'] += norm_or_error.json(pipeline_stage_name) # TODO make sure the step is noted even if the schema is the same simple_moves = ( [['structure', 'dirs',], ['meta', 'dirs']], # FIXME not quite right ... [['structure', 'files',], ['meta', 'files']], [['structure', 'size',], ['meta', 'size']], [['remote_dataset_metadata'], ['inputs', 'remote_dataset_metadata']], *pipes.SDSPipeline.moves[3:] ) # function, *args, **kwargs # functions should accept data as the first argument core_pipeline = ( # FIXME validation of dataset_raw is not being done right now (DT.copy, pipes.SDSPipeline.copies, dict(source_key_optional=True)), (DT.move, simple_moves, dict(source_key_optional=True)), # TODO clean has custom behavior (popl, pipes.SDSPipeline.cleans, dict(source_key_optional=True)), (DT.derive, pipes.SDSPipeline.derives, dict(source_key_optional=True)), #(DT.add, pipes.SDSPipeline.adds), # FIXME lifter issues (schema_validate, sc.DatasetOutSchema, None), # extras (missing) # xml files # contributors # submission (DT.copy, pipes.PipelineExtras.copies, dict(source_key_optional=True)), # TODO clean has custom behavior (DT.update, pipes.PipelineExtras.updates, dict(source_key_optional=True)), (DT.derive, pipes.PipelineExtras.derives, dict(source_key_optional=True)), #(DT.add, pipes.PipelineExtras.adds), # TODO and lots of evil custom behavior here # TODO filter_failures (schema_validate, sc.DatasetOutSchema, None), (pipes.PipelineEnd._indexes, None), # FIXME this is conditional and in adds # TODO pipeline end has a bunch of stuff in here (schema_validate, sc.PostSchema, dict(fail=True)), ) def main(path=Path.cwd(), path_dataset_raw=None, dataset_raw=None, **kwargs): # FIXME TODO need to follow the behavior of main in extract if dataset_raw is None: if path_dataset_raw is None: cache = path.cache if not cache.is_dataset(): raise TypeError('can only run on a single dataset') from sparcur.simple.extract import main as extract dataset_raw = extract(path) else: from sparcur.utils import path_ir dataset_raw = path_ir(path_dataset_raw) data = execute_pipeline(core_pipeline, dataset_raw) breakpoint() if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print)
Network resources
Dataset Path Metadata
This is a bit out of order since validation is run after an initial export.
# FIXME this is not really doing what we want yet from sparcur.paths import Path from sparcur.simple import path_metadata def from_path_validated_json_metadata(path): tm = path_metadata.from_path_transitive_metadata(path) from_blob_validated_json_metadata(tm) return tm def from_blob_validated_json_metadata(blob): """ Mutates in place. """ Path.validate_path_json_metadata(blob) # perferred # accepted # banned # known # unknown def main(_entry_point=False, **kwargs): # FIXME we want to be able to accept # --dataset-id, <json-file-to-validate>, and some other things probably? return path_metadata.main(validate=True, _entry_point=_entry_point, **kwargs) if export_path: from_blob_validate_path_json_metadata else: from_path_validate_path_json_metadata(path) if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main pipe_main(main)#, after=pprint.pprint)
Export
Setup for per-dataset export
import json from socket import gethostname import augpathlib as aug from pyontutils.utils import timeformat_friendly import sparcur from sparcur.core import JEncode from sparcur.paths import Path from sparcur.utils import loge, symlink_latest from sparcur.config import auth def prepare_dataset_export(path, export_path=None): # FIXME do we need export_base? if export_path is None: # FIXME confusing and breaks w/ convention -> Options maybe? export_path = Path(auth.get_path('export-path')) # FIXME allow alt? from sparcur.utils import PennsieveId identifier = PennsieveId(path.cache.id) uuid = identifier.uuid # we don't use cache._fs_safe_id here because we know the # identifier type from the folder structure # FIXME dataset metadata export setup basically needs to do all of this first # set latest run and then latest complete at the end, but complete is kind of arbitrary # from the atomic point of view tupdated = path.updated_cache_transitive() # FIXME this causes us to traverse all files twice # XXX TODO I think that the dataset updated date is now transitive as well? though the dataset # updated timestamp seems to happen a bit before the created/updated date on the file itself? # FIXME somehow tupdated can be None !??!?! XXX yes, it happens on empty sparse datasets export_dataset_folder = export_path / 'datasets' / uuid export_parent = export_dataset_folder / timeformat_friendly(tupdated) if not export_parent.exists(): export_parent.mkdir(parents=True) export_latest_run = export_dataset_folder / 'LATEST_RUN' symlink_latest(export_parent, export_latest_run) # FIXME need to symlink to LATEST return export_parent def export_blob(blob, blob_file_name, time_now=None, export_path=None, export_parent_path=None, **kwargs): if export_parent_path is None: # NOTE if export_parent_path is not None then it is up to the user # to make sure that the contents of the dataset directory do not change # resulting in confusion about mismatched transitive update dates export_parent_path = prepare_dataset_export(path, export_path) elif not export_parent_path.exists(): # safe to mkdir here since outside has a record of the path name export_parent_path.mkdir(parents=True) export_blob_path = export_parent_path / blob_file_name add_prov(blob, time_now) with open(export_blob_path, 'wt') as f: json.dump(blob, f, indent=2, cls=JEncode) loge.info(f'path metadata exported to {export_blob_path}') return export_blob_path def add_prov(blob, time_now): """ Mutate blob in place to add prov. """ # FIXME this will klobber cases where prov was embedded by the pipelines blob['prov'] = {'timestamp_export_start': time_now.START_TIMESTAMP, 'export_system_identifier': Path.sysid, 'export_hostname': gethostname(), 'sparcur_version': sparcur.__version__, 'sparcur_internal_version': sparcur.__internal_version__, } rp = aug.RepoPath(sparcur.core.__file__) if rp.working_dir is not None: blob['prov']['sparcur_commit'] = rp.repo.active_branch.commit.hexsha def main(path=Path.cwd(), export_path=None, **kwargs): return prepare_dataset_export(path, export_path=export_path) if __name__ == '__main__': import pprint from sparcur.simple.utils import pipe_main pipe_main(main, after=pprint.pprint)
Dataset Path Metadata
In principle this could run as part of the dataset metadata export, however since it produces separate files and can run at the same time, it is its own module.
Usage example
find -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -exec python -m sparcur.simple.path_metadata {} \;
xargs
variant.
find -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -print0 | \ xargs -0 -I{} -P8 -r -n 1 python -m sparcur.simple.path_metadata {}
Alternate example
python -m sparcur.simple.path_metadata \ 21957eae-0824-4fb5-b18f-04d6ed12ce18/dataset \ --export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports/
from pathlib import Path def from_path_transitive_metadata(path): tml = path._transitive_metadata() # FIXME TODO handle sparse cases return {'type': 'path-metadata', 'data': tml,} def main(path=Path.cwd(), id=None, time_now=None, parent_path=None, invariant_local_path='dataset', parent_parent_path=Path.cwd(), export_local=False, export_path=None, export_parent_path=None, _entry_point=False, validate=False, **kwargs): from sparcur.paths import Path from sparcur.simple.export import prepare_dataset_export, export_blob if path == Path.cwd() and (id is not None or parent_path is not None): if parent_path is None: uuid = id.uuid parent_path = parent_parent_path / uuid invariant_path = parent_path / invariant_local_path path = invariant_path.expanduser().resolve() else: parent_parent_path = None # TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output? # TODO parallelize if multiple paths # This assumes that all retrieve operations have # finished successfully for the current dataset # FIXME Options calls resolve on all paths but not if Path.cwd slips through ... path = Path(path) # FIXME even here some paths don't have caches ?! cache = path.cache # XXX this should have errored when Path was applied below !?!?!??! pipe_main wat ??? if not cache.is_dataset(): raise TypeError('can only run on a single dataset') if _entry_point: if export_parent_path is None: export_parent_path = prepare_dataset_export(path, export_path) kwargs.update({'export_path': export_path, 'export_parent_path': export_parent_path, 'time_now': time_now,}) tm = from_path_transitive_metadata(path) # FIXME TODO validate file formats, which means this also needs to support the remoteless case # FIXME TODO process metadata for each timepoint when things enter should go in prov I think # or we need to be collecting prov along the way, we don't have an overseer or conductor # so we can't keep everything embedded # FIXME TODO embed the transitive cache updated value that is used in prepare_dataset_export if validate: # FIXME raw vs validated and FIXME pipeline from sparcur import schemas as sc from sparcur.simple.transform import schema_validate Path.validate_path_json_metadata(tm) schema_validate(tm, sc.PathTransitiveMetadataSchema) if _entry_point: export_blob_path = export_blob(tm, 'path-metadata.json', **kwargs) # FIXME naming for raw vs validated return export_blob_path else: return tm if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main pipe_main(main)#, after=pprint.pprint)
Combine
This is equivalent to creating objects that match the summary schema for legacy organization level curation-export.json files.
FIXME it might be the case that individual datasets were exported under different schemas, which means that there would no longer be a single consistent schema for checking the merge, this is why we sometimes need to rerun all datasets when there is a schema change even if ther is not a schema change it very well may be the case that indvidiual datasets will have been run on different versions of the export pipeline, and slightly different images
import json import rdflib from pathlib import Path from dateutil import parser as dateparser from pyontutils.core import OntResPath from pyontutils.utils import TZLOCAL, timeformat_friendly from pyontutils.namespaces import TEMP, rdf, sparc #from sparcur.utils import fromJson, register_all_types # FIXME this should also go in sparcron from sparcur.export.triples import TriplesExportSummary from sparcur.export.published import _merge_graphs from sparcur.simple.export import add_prov tu = 'timestamp_updated' tuc = 'timestamp_updated_contents' ip = 'inputs' rmk = 'remote_dataset_metadata' class TriplesExportSummaryPublished(TriplesExportSummary): @property def ontid(self): return rdflib.URIRef(super().ontid.replace('ontologies/', 'ontologies/published/')) @property def header_label(self): return rdflib.Literal(f'{self.folder_name} curation export published graph') def max_dataset_or_contents_updated(datasets_list): return max(set.union( set([a['meta'][tuc] for a in datasets_list if tuc in a['meta'] and a['meta'][tuc]]), set([a['meta'][tu] for a in datasets_list if tu in a['meta'] and a['meta'][tu]]))) def from_dataset_export_path_snapshot(dataset_export_path, snapshots_path, time_now): derefs = [l.resolve() for l in [c / 'LATEST' for c in dataset_export_path.children] if l.exists()] snapshot_path = snapshots_path / time_now.START_TIMESTAMP_LOCAL_FRIENDLY snapshot_path.mkdir(parents=True) [(snapshot_path / d.parts[-2]).symlink_to((snapshot_path / 'asdf').relative_path_to(d)) for d in derefs] return snapshot_path def from_snapshot_path_datasets_lists(snapshot_path): alld = [] pubd = [] for uuid_path in snapshot_path.children: with open(uuid_path / 'curation-export.json', 'rt') as f: j = json.load(f) # TODO validate the load XXX this should probably # happen as a final roundtrip check during export # TODO filter by organization alld.append(j) if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]: pubd.append(j) return alld, pubd def from_snapshot_path_summary_json(snapshot_path, project_id, time_now): l_all, l_pub = from_snapshot_path_datasets_lists(snapshot_path) sum_all = from_datasets_list_summary_json(l_all, project_id, time_now) sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now) return sum_all, sum_pub def from_snapshot_path_summary_ttl(snapshot_path, project_id, time_now, blob): tes = TriplesExportSummary(blob) tesp = TriplesExportSummaryPublished(blob) orps = [OntResPath(uuid_path / 'curation-export.ttl') for uuid_path in sorted(snapshot_path.children, key=lambda p: p.name)] graphs_all = [o.graph for o in orps] graphs_pub = [ g for g, doi in [(g, list(g[ds:TEMP.hasDoi])) for g in graphs_all for ds in g[:rdf.type:sparc.Dataset]] if doi] merged_all = _merge_graphs(graphs_all) merged_all.populate_from_triples(tes.triples_header) merged_pub = _merge_graphs(graphs_pub) merged_pub.populate_from_triples(tesp.triples_header) return merged_all, merged_pub, graphs_all, graphs_pub def from_snapshot_path_summary_ttl_BAD(snapshot_path, project_id, time_now, blob): # this variant is too complex, trying to reuse the published graph as the all graph # and the implementation of the OntConjunctiveGraph is not far enough along to do it tes = TriplesExportSummary(blob) # FIXME nasty annoying dep graph_meta = OntGraph() graph_meta.populate_from_triples(tes._triples_header(tes.ontid, time_now._start_time)) rev_replace_pairs = _fix_for_pub(graph_meta, graph_meta) replace_pairs = tuple([(b, a) for a, b in rev_replace_pairs]) orps = [OntResPath(uuid_path / 'curation_export.ttl') for uuid_path in snapshot_path.children] graphs = [o.graph for o in orps] # FIXME should use id_published here as well, but that isn't being # propagated at the moment graphs_pub = [] graphs_nop = [] for g, doi in [(g, list(g[ds:TEMP.hasDoi])) for g in graphs for ds in g[:rdf.type:sparc.Dataset]]: if doi: graphs_pub.append(g) else: graphs_nop.add(g) graph_pub = _merge_graphs(published_graphs) graph_pub.populate_from(graph_meta) # FIXME this is manually aligned with TriplesExport.triples_header graph_pub.asdf for g in graphs_nop: graph_pub.namespace_manager.populate_from( {k:v for k, v in dict(g.namespace_manager).items() if k not in ('contributor', 'sample', 'subject')}) ttl_all = None ttl_pub = _populate_published(curation_export, graph) def from_dataset_export_path_datasets_lists(dataset_export_path): dep = dataset_export_path alld = [] pubd = [] derefs = [l.resolve() for l in [c / 'LATEST' for c in dep.children] if l.exists()] # TODO consider creating a folder that is just symlinks before this for lp in sorted(derefs, key=lambda p: p.name): with open(lp / 'curation-export.json', 'rt') as f: j = json.load(f) # TODO validate the load XXX this should probably # happen as a final roundtrip check during export # TODO filter by organization alld.append(j) if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]: pubd.append(j) return alld, pubd def from_datasets_list_summary_json(datasets_list, project_id, time_now): # XXX FIXME issue with datasets from multiple projects fn = Path(datasets_list[0]['prov']['export_project_path']).name out = { 'id': project_id.id, 'meta': { 'count': len(datasets_list), 'folder_name': fn, # WHAT A RELIEF we don't need network 'uri_api': project_id.uri_api, 'uri_human': project_id.uri_human(), }, 'datasets': datasets_list, } add_prov(out, time_now) # some potential prov summaries, but lets not do that here # most prov stats should be on the single dataset level #'export_timestamp_start_min': min(tes), #'export_timestamp_start_max': max(tes), return out def from_dataset_export_path_summary_json(dataset_export_path, project_id, time_now): l_all, l_pub = from_dataset_export_path_datasets_lists(dataset_export_path) #[a['meta']['timestamp_updated'] < a['meta']['timestamp_updated_contents'] #for a in l_all if a['meta']['timestamp_updated_contents']] sum_all = from_datasets_list_summary_json(l_all, project_id, time_now) sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now) return sum_all, sum_pub def main(project_id=None, export_path=None, time_now=None, project_id_auth_var='remote-organization', # FIXME move to clifun disco=False, **kwargs): from sparcur.paths import Path from sparcur.config import auth if project_id is None: from sparcur.config import auth from sparcur.utils import PennsieveId project_id = auth.get(project_id_auth_var) project_id = PennsieveId(project_id) # FIXME abstract the hardcoded backend if export_path is None: # XXX see issues mentioned above export_path = Path(auth.get_path('export-path')) dataset_export_path = export_path / 'datasets' snapshots_path = export_path / 'snapshots' snapshot_path = from_dataset_export_path_snapshot( dataset_export_path, snapshots_path, time_now) sum_all, sum_pub = from_snapshot_path_summary_json( snapshot_path, project_id, time_now) # write symlink LATEST_PARTIAL ttl_all, ttl_pub, graphs_all, graphs_pub = from_snapshot_path_summary_ttl( snapshot_path, project_id, time_now, sum_all) # write symlink LATEST maxdt = max_dataset_or_contents_updated(sum_all['datasets']) dt_maxdt = dateparser.parse(maxdt) dt_maxdt_local = dt_maxdt.astimezone(TZLOCAL()) friendly_maxdt_local = timeformat_friendly(dt_maxdt_local) # FIXME there are some bad assumptions in here that should be refactored out # at some point, but for now we implicitly assume that all datasets come from # the same organization, which can easily be violated because we don't check # however the existing internal schema requires an id for the summary which is # currenty the organization id # FIXME summary is a hardcoded path # XXX WARNING it is possible to overwrite since maxdt might not change between runs # this is desirable behavior for development, but could cause issues in other cases pexpath = export_path / 'summary' / project_id.uuid latest = pexpath / 'LATEST' npath = pexpath / friendly_maxdt_local snapshot_link = npath / 'snapshot' if not npath.exists(): npath.mkdir(parents=True) else: # FIXME not sure if this makes sense? if snapshot_link.is_symlink(): snapshot_link.unlink() snapshot_link.symlink_to(snapshot_link.relative_path_to(snapshot_path)) npath_ce = npath / 'curation-export.json' npath_cep = npath / 'curation-export-published.json' for path, blob in ((npath_ce, sum_all), (npath_cep, sum_pub)): with open(path, 'wt') as f: json.dump(blob, f, indent=2) npath_ttl = npath / 'curation-export.ttl' npath_ttlp = npath / 'curation-export-published.ttl' ttl_all.write(npath_ttl) ttl_pub.write(npath_ttlp) if disco: # export xml and tsv for disco from sparcur.simple.disco import from_curation_export_json_path_datasets_json_xml_and_disco from_curation_export_json_path_datasets_json_xml_and_disco( npath_ce, sum_all['datasets'], graphs_all) if latest.is_symlink(): latest.unlink() latest.symlink_to(friendly_maxdt_local) return npath_ce, sum_all, sum_pub, graphs_all, graphs_pub if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main # these are really big, don't print them # pipe_main(main, after=pprint.pprint) pipe_main(main)
disco
Legacy export for disco.
import json from pyontutils.core import OntGraph from sparcur import export as ex from sparcur.utils import PennsieveId from sparcur.utils import fromJson, register_all_types # FIXME this should also go in sparcron def from_curation_export_json_path_xml_and_disco(curation_export_json_path): with open(curation_export_json_path, 'rt') as f: summary = json.load(f) datasets_json = summary['datasets'] from_curation_export_json_path_datasets_json_xml_and_disco( curation_export_json_path, datasets_json) def from_curation_export_json_path_datasets_json_xml_and_disco( curation_export_json_path, datasets_json, graphs=None): # FIXME need the snapshot linked somehow, export time started if we are using summary # or summary prov timestamp_export_start will give us the snapshot path as well if we # parse it back to a date if not graphs: snapshot_path = curation_export_json_path.parent / 'snapshot' paths = [(snapshot_path / PennsieveId(d['id']).uuid / 'curation-export.ttl') for d in datasets_json] graphs = [OntGraph().parse(path) for path in paths] datasets_ir = fromJson(datasets_json) ex.export_xml(curation_export_json_path, datasets_ir) ex.export_disco(curation_export_json_path, datasets_ir, graphs) # XXX not doing jsonld here, it will be combined # from single dataset jsonld or similar def main(path=None, **kwargs): register_all_types() # FIXME the problem with this approach is that can cannot run # multiple downstream steps from the same upstream step, we would # need a compositional way to tell each downstream which upstreams # we wanted to run in any given situation, all to save additional # reads from disk if path is None: # assume the user wants to run combine first from sparcur.simple.combine import main as combine curation_export_json_path, summary_all, _, graphs_all, _ = combine(**kwargs) datasets_json = summary_all['datasets'] from_curation_export_json_path_datasets_json_xml_and_disco( curation_export_json_path, datasets_json, graphs) else: curation_export_json_path = path from_curation_export_json_path_xml_and_disco(curation_export_json_path) if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main # these are really big, don't print them # pipe_main(main, after=pprint.pprint) pipe_main(main)
compact
find all export dataset folders that are older than the current snapshot and are not in a snapshot that is in a summary compact them
""" Examples: # look at the second one and then run the first one since it is easier and safer to stop and resume # python -m sparcur.simple.compact | xargs -P4 -r -I{} sh -c 'tar -cvJf "{}.tar.xz" "{}" && rm -r "{}"' python -m sparcur.simple.compact | xargs -P12 -r -I{} echo tar -cvJf '{}.tar.xz' '{}' # python -m sparcur.simple.compact | xargs -P6 -r -I{} echo rm -r '{}' """ from sparcur.paths import Path from sparcur.config import auth __dep_cache = {} def latest_snapped(dataset_export_path, snapped): if dataset_export_path not in __dep_cache: cs = set(c for c in dataset_export_path.children if c.is_dir() and not c.is_symlink()) csnap = cs.intersection(snapped) if not csnap: # no snap, pretend that latest is snapped # this can happen if there is no LATEST because we # were e.g. just exporting path metadata and nothing else maxsnap = sorted(cs, key=lambda p: p.name)[-1] else: maxsnap = sorted(csnap, key=lambda p: p.name)[-1] __dep_cache[dataset_export_path] = maxsnap.name return __dep_cache[dataset_export_path] def main(): export_path = Path(auth.get_path('export-path')) summary_path = export_path / 'summary' snapshots_path = export_path / 'snapshots' datasets_path = export_path / 'datasets' snap_shotted = [ dss.resolve() for d in summary_path.rchildren_dirs for l in d.rchildren if l.is_symlink() and l.name == 'snapshot' for dss in l.resolve().children] snapped = set(snap_shotted) latest_sums = [ d.resolve() for c in summary_path.children for d in (c / 'LATEST',) if d.exists()] all_builds = [ build for date in datasets_path.children if date.is_dir() and not date.is_symlink() for build in date.children if build.is_dir() and not build.is_symlink()] older_not_snap = [ a for a in all_builds if a not in snapped and a.name < latest_snapped(a.parent, snapped)] assert not snapped.intersection(older_not_snap) # newer = set(all_builds) - snapped - set(older_not_snap) _ = [print(p) for p in older_not_snap] if __name__ == '__main__': main()
Utility
Init
Utils
from sparcur.config import auth __doc__ = f"""Common command line options for all sparcur.simple modules Usage: sparcur-simple manifest [options] [<path>...] sparcur-simple get-uuid <remote-id>... sparcur-simple datasets sparcur-simple for-racket sparcur-simple check-names sparcur-simple git-repos [update] [options] sparcur-simple [options] [<path>...] sparcur-simple [options] [--dataset-id=<ID>...] sparcur-simple [options] [--extension=<EXT>...] [<path>...] Commands: manifest generate manifest files for path for-racket print data for reading into Racket Options: -h --help show this --hypothesis-group-name=NAME the hypotheis group name --project-id=ID the project id --dataset-id=<ID>... one or more datset ids --project-id-auth-var=VAR name of the auth variable holding the project-id --project-path=PATH the project path, will be path if <path>... is empty --parent-path=PATH the parent path where the project will be cloned to --parent-parent-path=PATH parent in which a random tempdir is generated or the dataset uuid is used as the parent path, don't use this ... --invariant-local-path=PATH path relative to parent path for dataset [default: dataset] --export-local set export-parent-path to {{:parent-path}}/exports/ --export-path=PATH base export path containing the standard path structure [default: {auth.get_path('export-path')}] --cleaned-parent-path=PATH direct parent path into which cleaned paths will be placed --cleaned-path=PATH base cleaned path containing the standard path structure [default: {auth.get_path('cleaned-path')}] --export-parent-path=PATH direct parent path into which exports will be placed --extension=<EXT>... one or more file extensions to fetch -j --jobs=N number joblib jobs [default: 12] --exclude-uploaded do not pull files from remote marked as uploaded --sparse-limit=N package count that forces a sparse pull [default: {auth.get('sparse-limit')}] --symlink-objects-to=PATH path to an existing objects directory --log-level=LEVEL log level info [default: INFO] --open=PROGRAM show file with PROGRAM --show show file with xopen --pretend show what would be done for update """ import os import sys from types import GeneratorType from pyontutils import clifun as clif from sparcur import exceptions as exc from sparcur.utils import _find_command from sparcur.utils import log, logd, loge, GetTimeNow, PennsieveId from sparcur.paths import Path, PennsieveCache from sparcur.backends import PennsieveRemote def backend_pennsieve(project_id=None, Local=Path, Cache=PennsieveCache): # (def_babf) """return a configured pennsieve backend calling this is sufficient to get everything set up correclty You must call RemotePath.init(project_id) before using RemotePath. Passing the project_id argument to this function will do that for you. It is not required because there are cases where the project_id may not be known at the time that this function is called. """ RemotePath = PennsieveRemote._new(Local, Cache) if project_id is not None: RemotePath.init(project_id) return RemotePath class Options(clif.Options): @property def id(self): # dataset_id has priority since project_id can occure without a # dataset_id, but dataset_id may sometimes come with a project_id # in which case the dataset_id needs priority for functions that # branch on the most granular identifier type provided return (self.dataset_id[0] if self.dataset_id else (self.project_id if self.project_id else None)) @property def project_id(self): if not hasattr(self, '_cache_project_id'): id = self._args['--project-id'] if id is not None: identifier = PennsieveId(id, type='organization') self._cache_project_id = identifier else: return return self._cache_project_id @property def dataset_id(self): if not hasattr(self, '_cache_dataset_id'): ids = self._args['--dataset-id'] if ids: identifiers = [PennsieveId(id, type='dataset') for id in ids] self._cache_dataset_id = identifiers else: return ids return self._cache_dataset_id @property def remote_id(self): if not hasattr(self, '_cache_remote_id'): ids = self._args['<remote-id>'] if ids: identifiers = [PennsieveId(id) for id in ids] self._cache_remote_id = identifiers else: return ids return self._cache_remote_id @property def jobs(self): return int(self._args['--jobs']) n_jobs = jobs # match internal kwargs conventions @property def paths(self): return [Path(p).expanduser().resolve() for p in self._args['<path>']] @property def path(self): paths = self.paths if paths: return paths[0] elif self.project_path: return self.project_path else: # if no paths were listed default to cwd # consistent with how the default kwargs # are set on a number of mains # this is preferable to allow path=None # to be overwritten by the conventions of # individual pipeline mains return Path.cwd() @property def project_path(self): pp = self._args['--project-path'] if pp: return Path(pp).expanduser().resolve() @property def parent_parent_path(self): ppp = self._args['--parent-parent-path'] if ppp: return Path(ppp).expanduser().resolve() else: return Path.cwd() @property def parent_path(self): pap = self._args['--parent-path'] did = self.dataset_id if pap: return Path(pap).expanduser().resolve() elif did: id = self.id # dataset_id is a list so use id which handles that uuid = id.uuid return (self.parent_parent_path / uuid).expanduser().resolve() @property def export_path(self): ep = self._args['--export-path'] epp = self.export_parent_path if ep and epp: raise TypeError('Only one of --export-path and --export-parent-path may be set.') elif ep: return Path(ep).expanduser().resolve() else: raise Exception('should not happen') @property def export_parent_path(self): epp = self._args['--export-parent-path'] el = self.export_local pap = self.parent_path if epp and el: raise TypeError('Only one of --export-local and --export-parent-path may be set.') elif epp: return Path(epp).expanduser().resolve() elif el and pap: # it is ok to do this here becuase the TypeError above prevents # the case where both epp and el are set, so even though epp # is no longer always what was set on the command line, it is # it is the case that there won't be conflicting sources return pap / 'exports' @property def cleaned_path(self): cp = self._args['--cleaned-path'] cpp = self.export_parent_path if cp and cpp: raise TypeError('Only one of --cleaned-path and --cleaned-parent-path may be set.') elif cp: return Path(cp).expanduser().resolve() else: raise Exception('should not happen') @property def cleaned_parent_path(self): cpp = self._args['--cleaned-parent-path'] if cpp: return Path(cpp).expanduser().resolve() @property def extensions(self): return self.extension @property def symlink_objects_to(self): slot = self._args['--symlink-objects-to'] if slot: return Path(slot).expanduser() @property def sparse_limit(self): # FIXME not being pulled in by asKwargs ?? return int(self._args['--sparse-limit']) @property def time_now(self): # FIXME make it possible to pass this in? if not hasattr(self, '_time_now'): self._time_now = GetTimeNow() return self._time_now @property def log_level(self): # FIXME not being pulled in by asKwargs ?? ll = self._args['--log-level'] if ll.isdigit() or ll[0] == '-' and ll[1:].isdigit(): return int(ll) else: return ll def pipe_main(main, after=None, argv=None): options, args, defaults = Options.setup(__doc__, argv=argv) # _entry_point is used as a way to switch behavior when a # pipeline main is run directly or actually in a pipeline try: log.setLevel(options.log_level) logd.setLevel(options.log_level) loge.setLevel(options.log_level) out = main(_entry_point=True, **options.asKwargs()) except Exception as e: log.exception(e) log.error(options.path) raise e if after: after(out) return out def rglob(path, pattern): """ Hack around the absurd slowness of python's rglob """ if sys.platform == 'win32': log.warning('no findutils on windows, watch out for unexpected files') return list(path.rglob(pattern)) doig = (hasattr(path, 'cache') and path.cache and path.cache.cache_ignore) exclude = ' '.join([f"-not -path './{p}*'" for p in path.cache.cache_ignore]) if doig else '' command = f"""{_find_command} {exclude} -name {pattern!r}""" with path: with os.popen(command) as p: string = p.read() path_strings = string.split('\n') # XXX posix path names can contain newlines paths = [path / s for s in path_strings if s] return paths def _fetch(cache): # sigh joblib multiprocessing pickling # lambda functions are great right up until you have to handle an # error function inside of them ... thanks python for yet another # failure to be homogenous >_< meta = cache.meta try: size_mb = meta.size.mb except AttributeError as e: if meta.errors: logd.debug(f'remote errors {meta.errors} for {cache!r}') return else: raise exc.StopTheWorld(cache) from e return cache.fetch(size_limit_mb=size_mb + 1) # FIXME somehow this is not working !? def _fetch_path(path): # sigh joblib multiprocessing pickling path = Path(path) cache = path.cache if cache is None: raise exc.NoCachedMetadataError(path) # do not return to avoid cost of serialization back to the control process _fetch(cache) def fetch_paths_parallel(paths, n_jobs=12, use_async=True): if n_jobs <= 1: [_fetch_path(path) for path in paths] elif use_async: from pyontutils.utils import Async, deferred Async()(deferred(_fetch_path)(path) for path in paths) else: import pathlib from joblib import Parallel, delayed backend = 'multiprocessing' if hasattr(sys, 'pypy_version_info') else None # FIXME FIXME FIXME somehow this can result in samples.xlsx being fetched to subjects.xlsx !?!??!! # XXX either a race condition on our end or something insane from the remote Parallel(n_jobs=n_jobs, backend=backend)(delayed(_fetch_path)(pathlib.Path(path)) for path in paths) #Parallel(n_jobs=n_jobs)(delayed(fetch_path)(path) for path in paths) def combinate(*functions): def combinator(*args, **kwargs): for f in functions: # NOTE no error handling is needed here # in no cases should the construction of # the python object version of a path fail # all failures should happen _after_ construction # the way we have implemented this they fail when # the data attribute is accessed obj = f(*args, **kwargs) if isinstance(obj, GeneratorType): yield from obj # FIXME last one wins, vs yield tuple vs ...? # FIXME manifests are completely broken for this else: yield obj return combinator def multiple(func, merge=None): """ combine multiple results """ def express(*args, **kwargs): vs = tuple(func(*args, **kwargs)) if merge is not None: yield merge(vs) else: yield vs return express def early_failure(path, error, dataset_blob=None): # these are the 9999 5555 and 4444 errors # TODO match the minimal schema reqs as # we did in pipelines if dataset_blob is None: cache = path.cache return {'id': cache.id, 'meta': {'uri_api': cache.uri_api, 'uri_human': cache.uri_human,}, #'status': 'early_failure', # XXX note that status is not requried # if we cannot compute it, but if there are inputs there should be # a status 'errors': [error], # FIXME format errro } else: if 'errors' not in datset_blob: dataset_blob['errors'] = [] datset_blob['errors'].append(error) return dataset_blob class DataWrapper: # sigh patterns are stupid, move this to elsewhere so it doesn't taint everything def __init__(self, data): self.data = data def main(id=None, **kwargs): def ik(key): return key in kwargs and kwargs[key] if id is not None: print(id.uuid) if ik('get_uuid'): for id in kwargs['remote_id']: print(id.uuid) return if (ik('datasets') or ik('for_racket') or ik('check_names')): log.setLevel(60) # logging.CRITICAL = 50 from sparcur.config import auth from sparcur.simple.utils import backend_pennsieve if ik('project_id'): pass # project_id from command line else: project_id = auth.get('remote-organization') PennsieveRemote = backend_pennsieve(project_id) root = PennsieveRemote(project_id) datasets = list(root.children) if ik('datasets'): print('\n'.join([d.id for d in datasets])) if ik('for_racket'): import json _dsmeta = '\n'.join([f"({json.dumps(d.id)} {json.dumps(d.name)})" for d in datasets]) dsmeta = f"({_dsmeta})" # lab pi last name should be cached in some other way print(dsmeta) if ik('check_names'): # you will want to run sparcur.simple.fetch_remote_metadata_all from pathlib import PurePosixPath def report(pid, exp, pub): pname = pub['name'] name_mismatch = ( False if exp['basename'] == pname else (exp['basename'], pname)) # [6:] to strip files/ ppname = PurePosixPath(pub['path']).name pathname_mismatch = ( False if exp['basename'] == ppname else (exp['basename'], ppname)) eppp = PurePosixPath(exp['dataset_relative_path']).parent.as_posix() pppp = PurePosixPath(pub['path'][6:]).parent.as_posix() parent_path_mismatch = ( False if eppp == pppp else (eppp, pppp)) # once we fix things on our end names should match # parent paths should match # name and pathname might match but do not have to match pid = f' {pid}' nsp = '\n ' if name_mismatch: if pathname_mismatch and pname != ppname: return (f'{pid} name mismatch and pathname mismatch ' f'{nsp}{nsp.join(name_mismatch)}{nsp}{nsp.join(pathname_mismatch)}') else: return (f'{pid} name mismatch ' f'{nsp}{nsp.join(name_mismatch)}') if parent_path_mismatch: if True: return (f'{pid} parent mismatch' f'{nsp}{nsp.join(parent_path_mismatch)}') if True: if True: return '' #return (f'{pid} ok ') import json from sparcur.backends import PennsieveDatasetData export_path = kwargs['export_path'] epd = export_path / 'datasets' for dataset in datasets: latest = epd / dataset.identifier.uuid / 'LATEST' export_path_metadata = latest / 'path-metadata.json' exported = export_path_metadata.exists() # pass the remote not just the id so that bfobject is # accessible to the RemoteDatasetData class pdd = PennsieveDatasetData(dataset) rmeta = pdd.fromCache() published = 'id_published' in rmeta if exported and published: with open(export_path_metadata, 'rt') as f: j = json.load(f) epni = {'N:' + o['remote_id']:o for o in j['data'] if o['remote_id'].startswith('package:')} ppni = pdd._published_package_name_index() se, sp = set(epni), set(ppni) e_missing = sp - se p_missing = se - sp s_common = sp & se rep = [report(c, epni[c], ppni[c]) for c in s_common] repstr = '\n'.join([r for r in rep if r]) if repstr: print(dataset.id, 'bad') print(repstr) else: print(dataset.id, 'ok') elif exported: print(dataset.id, 'unpublished') elif published: print(dataset.id, 'unexported') else: print(dataset.id, 'unpublished and unexported') if ik('git_repos'): import augpathlib as aug import sparcur from sparcur.config import auth from importlib import metadata d = metadata.distribution(sparcur.__package__) rps = [p for p in [aug.RepoPath(d._path) for d in d.discover()] if p.working_dir] setups = [p for p in [p.parent / 'setup.py' for p in rps] if p.exists()] wds = sorted(set([p.working_dir for p in rps])) never_update = auth.get('never-update') pretend=ik('pretend') or never_update if pretend: if never_update: print(f'never-update: true is set in {auth.user_config._path}') print('These are the commands that would be run.') def doupdate(rp): if pretend: print(f'git -C {rp.as_posix()} stash') print(f'git -C {rp.as_posix()} pull') return print(f'Pulling {rp.as_posix()}') print(rp.repo.git.stash()) # TODO checkout to a safety branch and tag for rollback print(rp.repo.git.pull()) if ik('update'): for wd in wds: doupdate(wd) # indescriminately run setup.py with --release set to tangle from importlib import util as imu oldargv = sys.argv try: for s in setups: if pretend: print(f'pushd {s.parent.as_posix()}; python setup.py --release; popd') continue sys.argv = ['setup.py', '--release'] # reset every time since most will mutate print(f'Maybe tangling via {s.as_posix()}') spec = imu.spec_from_file_location(f'setup_{s.parent.name}', s) mod = imu.module_from_spec(spec) try: with s.parent: # ah relative paths spec.loader.exec_module(mod) except SystemExit: pass except Exception as e: log.exception(e) finally: sys.argv = oldargv if ik('manifest'): from sparcur.utils import write_manifests paths = kwargs['paths'] if not paths: paths = [Path.cwd()] manifests_rendered = write_manifests(parents=paths) manifests, rendered = zip(*manifests_rendered) nl = '\n' print(f'generated manifests at:\n{nl.join([m.as_posix() for m in manifests])}') if ik('open'): cmd = kwargs['open'] [m.xopen(cmd) for m in manifests] elif ik('show'): [m.xopen() for m in manifests] if __name__ == '__main__': pipe_main(main)
Test
from sparcur.simple.utils import pipe_main def test_pipe_main(): def main(id=None, project_path=None, **kwargs): print(id, project_path, kwargs) pipe_main(main, argv=['sparcur-simple'])
Clean metadata files
dataset id dataset path dataset relative paths
cleaned file root metadata file objects cleaned objects write objects cleaned files at cleaned file paths
from sparcur.paths import Path from sparcur import datasets as dat from sparcur.utils import symlink_latest from sparcur.config import auth from sparcur.simple.utils import rglob def prepare_dataset_cleaned(dataset_path, cleaned_path=None, time_now=None): if cleaned_path is None: # FIXME confusing and breaks w/ convention -> Options maybe? cleaned_path = Path(auth.get_path('cleaned-path')) from sparcur.utils import PennsieveId identifier = PennsieveId(dataset_path.cache.id) uuid = identifier.uuid cleaned_dataset_folder = cleaned_path / uuid cleaned_parent = cleaned_dataset_folder / time_now.START_TIMESTAMP_LOCAL_FRIENDLY if not cleaned_parent.exists(): cleaned_parent.mkdir(parents=True) cleaned_latest = cleaned_dataset_folder / 'LATEST' # FIXME do we symlink before we know things have succeeded ??? symlink_latest(cleaned_parent, cleaned_latest) return cleaned_parent def from_dataset_path_metadata_file_paths(dataset_path): matches = [] for candidate in rglob(dataset_path, '*.xlsx'): rp = candidate.relative_path_from(dataset_path) if not rp.parent.name or 'anifest' in rp.name: matches.append(candidate) return matches def from_path_cleaned_object(path): t = dat.Tabular(path) sheet, wb, sparse_empty_rows = t._openpyxl_fixes() return wb def from_file_paths_objects(paths): for path in paths: if path.suffix == '.xlsx': cleaned = from_path_cleaned_object(path) yield cleaned else: yield None def from_dataset_path_cleaned_files(dataset_path, cleaned_parent): "NOTE this actually does the cleaning" paths = from_dataset_path_metadata_file_paths(dataset_path) for path, obj in zip(paths, from_file_paths_objects(paths)): if obj is not None: drp = path.dataset_relative_path target = cleaned_parent / drp if not target.parent.exists(): target.parent.mkdir(parents=True) obj.save(target) def main(path=Path.cwd(), id=None, time_now=None, parent_path=None, invariant_local_path='dataset', parent_parent_path=Path.cwd(), cleaned_path=None, cleaned_parent_path=None, **kwargs): # setup taken from path_metadata.py::main so check the notes there if path == Path.cwd() and (id is not None or parent_path is not None): if parent_path is None: uuid = id.uuid parent_path = parent_parent_path / uuid invariant_path = parent_path / invariant_local_path path = invariant_path.expanduser().resolve() else: parent_parent_path = None path = Path(path) cache = path.cache if not cache.is_dataset(): raise TypeError('can only run on a single dataset') cleaned_parent = prepare_dataset_cleaned(path, cleaned_path, time_now) from_dataset_path_cleaned_files(path, cleaned_parent) if __name__ == '__main__': import pprint from sparcur.simple.utils import pipe_main pipe_main(main, after=pprint.pprint)