Index > sparcur developer guide Edit on GitHub

sparcur developer guide

Table of Contents

Demos

Remote only connection

If you have sparcur.simple this is the simplest way to get a remote only connection to Pennsieve.

from sparcur.config import auth
from sparcur.simple.utils import backend_pennsieve
project_id = auth.get('remote-organization')
PennsieveRemote = backend_pennsieve(project_id)
root = PennsieveRemote(project_id)
datasets = list(root.children)

Otherwise you have to do what backend_pennsieve does for you

from sparcur.paths import PennsieveCache, Path
from sparcur.config import auth
from sparcur.backends import PennsieveRemote
project_id = auth.get('remote-organization')
PennsieveRemote = PennsieveRemote._new(Path, PennsieveCache)
PennsieveRemote.init(project_id)
root = PennsieveRemote(PennsieveRemote.root)
datasets = list(root.children)

Validate a dataset

This code is an example of how to use sparcur to get structured metadata from a local dataset.

You can run this example block and it will validate the DatasetTemplate.

Example usage

rsync -r path/to/some/curation/dataset test
pushd test/dataset
SCIGRAPH_API_KEY=$(python -c 'from pyontutils.config import auth; print(auth.get("scigraph-api-key"))') \
HOME=/tmp/test-home python -m sparcur.simple.validate

Example python usage

from sparcur.simple.validate import main as validate
from pathlib import Path

path = Path('../resources/DatasetTemplate')
blob = validate(path)

Implementation of sparcur.simple.validate.

import augpathlib as aug
from sparcur import pipelines as pipes
from sparcur.paths import PathL, CacheL
from sparcur.utils import GetTimeNow


def makeValidator(dataset_path, time_now=None):

    if time_now is None:
        time_now = GetTimeNow()

    class CacheX(CacheL):
        def __new__(cls, *args, **kwargs):
            return super().__new__(cls, *args, **kwargs)

    CacheX._bind_flavours()

    class PathX(PathL):
        """ Workaround absense of cache. """

        _cache_class = CacheX

        def __new__(cls, *args, **kwargs):
            return super().__new__(cls, *args, **kwargs)

        # TODO likely will also need to rebind the cache class as well

        #@property
        #def dataset_relative_path(self, __drp=dataset_path):
            #return self.relative_path_from(self.__class__(__drp))

    CacheX._local_class = PathX
    PathX._bind_flavours()

    # XXX monkey patch TODO sigh FIXME DatasetStructure calls Path directly inside
    #PathL.dataset_relative_path = Path.dataset_relative_path

    # must caste before anything else is done so that anchor and
    # datasets are known
    dataset_path = PathX(dataset_path)

    CacheX._dataset_dirs = [CacheX(dataset_path)]
    # FIXME this is very much not ideal because we don't actually want
    # the parent in this case
    CacheX._asserted_anchor = CacheX(dataset_path.parent)

    class context:
        path = dataset_path.resolve()
        id = path.id
        uri_api = path.as_uri()
        uri_human = path.as_uri()

    class lifters:
        id = context.id
        remote = 'local'
        folder_name = context.path.name
        uri_api = context.uri_api
        uri_human = context.uri_human
        timestamp_export_start = time_now.START_TIMESTAMP

        affiliations = lambda *args, **kwargs: None
        techniques = tuple()
        modality = None
        organ_term = None
        protocol_uris = tuple()

        award_manual = None

    return pipes.PipelineEnd(dataset_path, lifters, context)
    return pipes.SDSPipeline(dataset_path, lifters, context)  # shouldn't need network


def main(path=PathL.cwd(), time_now=None,
         export_local=False, export_parent_path=None,
         _entry_point=False, validate=False, **kwargs):

    # ('../resources/DatasetTemplate')
    pipeline = makeValidator(path, time_now=time_now)
    data = pipeline.data

    if _entry_point:
        from sparcur.simple.export import export_blob
        export_blob_path = export_blob(
            data, 'curation-export.json', time_now=time_now,
            export_parent_path=export_parent_path if export_parent_path is not None else path,
            **kwargs)
        return export_blob_path
    else:
        return data


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main)

Load Python IR

from sparcur.utils import path_ir
ir = path_ir('curation-export.json')
def main():
    from sparcur.reports import Report
    from sparcur.utils import path_ir
    from pyontutils.core import OntResIri
    ori = OntResIri('https://cassava.ucsd.edu/sparc/preview/exports/curation-export.ttl')
    graph = ori.graph

    ir = path_ir('/tmp/curation-export-test.json')

    rows = Report._hubmap_terms(graph, ir)

    anat = [r for r in rows if r[1].prefix in ('UBERON', 'FMA', 'ILX')]
    all_r = expand_label_curie(rows)
    ana_r = expand_label_curie(anat)
    return all_r, ana_r


if __name__ == '__main__':
    return main()

Retrieve protocols

Create a protocols.io account and get API keys.

Then run the following to register with protocols.io. NOTE this is broken at the moment. Manual steps can be found in /docs/sparc-curation/docs/setup.html#config-templates

python -c "import idlib; idlib.Pio._setup()"

You can then run the following to retrieve protocol data.

import idlib
from pyontutils.core import OntResIri
from pyontutils.namespaces import sparc, rdf


def getpio(i):
    try:
        return idlib.Pio(i)
    except idlib.exc.IdlibError as e:
        pass


def getdata(s):
    try:
        return s.data()
    except (idlib.exc.NotAuthorizedError) as e:
        print(e)
    except (idlib.exc.IdDoesNotExistError) as e:
        print(e)


def main():
    ori = OntResIri("https://cassava.ucsd.edu/sparc/preview/exports/protcur.ttl")
    g = ori.graph
    pids = list(g[:rdf.type:sparc.Protocol])
    streams = [s for i in pids for s in (getpio(i),) if s]
    datas = [getdata(s) for s in streams]
    return datas


if __name__ == '__main__':
    main()

Extending implementation

Adding a new xml type

def new_xml_format(path):
    from sparcur.extract import xml
    ex = xml.XmlSource(path)
    top_tag = ex.e.getroot().tag

Workflow

All datasets

Retrieve

The dependency DAG is as follows.

graph-retrieve-all.png

function sparc-time-friendly () {

    local UTC_OFFSET_START
    local TIME_START_NO_OFFSET

    # gnu coreutils gdate needed for osx support/freebsd
    # gdate on darwin only has millisecond resolution?
    # this also won't work on freebsd without gnu coreutils
    iso8601millis="+%FT%T,%6N"  # FIXME do we _really_ need millis!? yemaybe? concurrent startups?
    utcoffset="+%:z"
    # I really hope the utc offset doesn't change between start & end
    # but laptops and airplains do exist, so it could
    # also how utterly annoying that the date separator and the
    # negative utc offset share the same symbol ... talk about
    # an annoying design flaw that is going to haunt humanity
    # with double the number of calls to date for # ... as
    # long as anoyone is writing code to deal with time
    TIME_START_NO_OFFSET=$(date ${iso8601millis} || gdate ${iso8601millis})
    UTC_OFFSET_START=$(date ${utcoffset} || gdate ${utcoffset})
    local TIME_START="${TIME_START_NO_OFFSET}${UTC_OFFSET_START}"  # XXX unused

    local TIME_START_NO_OFFSET_FS_OK=${TIME_START_NO_OFFSET//:/}
    local UTC_OFFSET_START_FS_OK=${UTC_OFFSET_START//:/}
    local TIME_START_FRIENDLY=${TIME_START_NO_OFFSET_FS_OK}${UTC_OFFSET_START_FS_OK}
    # So. iso8601 guidance on what to do about subsecond time and the utc offset in the compact
    # representation is not entirely clear, however I _think_ that %FT%T%H%M%S,%6N%z is ok but
    # the -/+ must stay between the timezone and the rest, so we will have to grab tz by itself
    local TIME_START_SAFE=${TIME_START_NO_OFFSET_FS_OK//-/}${UTC_OFFSET_START_FS_OK}  # XXX unused
    mv "$(mktemp --directory sparcur-all-XXXXXX)" "${TIME_START_FRIENDLY}" || \
        { CODE=$?; echo 'mv failed'; return $CODE; }
    echo "${TIME_START_FRIENDLY}"

}

function sparc-get-all-remote-data () {
    # NOTE not quite all the remote data, the google sheets
    # don't have caching functionality yet

    # parse args
    local POSITIONAL=()
    while [[ $# -gt 0 ]]
    do
    key="$1"
    case $key in # (ref:(((((((sigh)
        --project-id)         local PROJECT_ID="${2}"; shift; shift ;;
        --symlink-objects-to) local SYMLINK_OBJECTS_TO="${2}"; shift; shift ;;
        --log-path)           local LOG_PATH="${2}"; shift; shift ;;
        --parent-path)        local PARENT_PATH="${2}"; shift; shift ;;
        --only-filesystem)    local ONLY_FILESYSTEM="ONLY_FS"; shift ;;
        -h|--help)            echo "${HELP}"; return ;;
        *)                    POSITIONAL+=("$1"); shift ;;
    esac
    done

    # Why, you might be asking, are we declaring a local project path here without assignment?
    # Well. Let me tell you. Because local is a command with an exist status. So it _always_
    # returns zero. So if you need to check the output of the command running in a subshell
    # that you are assigning to a local variable _ALWAYS_ set local separately first.
    # Yes, shellcheck does warn about this. See also https://superuser.com/a/1103711
    local PROJECT_PATH

    if [[ -z "${PARENT_PATH}" ]]; then
        local PARENT_PATH
        set -o pipefail
        PARENT_PATH=$(sparc-time-friendly) || {
        CODE=$?;
        echo "Creating "'${PARENT_PATH}'" failed!"
        set +o pipefail
        return $CODE;
        }
        set +o pipefail
    fi

    local LOG_PATH=${LOG_PATH:-"${PARENT_PATH}/logs"}

    #local LOG_PATH=$(python -c "from sparcur.config import auth; print(auth.get_path('log-path'))")
    local PROJECT_ID=${PROJECT_ID:-$(python -c "from sparcur.config import auth; print(auth.get('remote-organization'))")}

    local maybe_slot=()
    if [[ -n "${SYMLINK_OBJECTS_TO}" ]]; then
        # MUST use arrays to capture optional arguments like this otherwise
        # arg values with spaces in them will destroy your sanity
        maybe_slot+=(--symlink-objects-to "${SYMLINK_OBJECTS_TO}")
    fi

    echo "${PARENT_PATH}"  # needed to be able to follow logs

    if [ ! -d "${LOG_PATH}" ]; then
        mkdir "${LOG_PATH}" || { CODE=$?; echo 'mkdir of ${LOG_PATH} failed'; return $CODE; }
    fi

    if [[ -z "${ONLY_FILESYSTEM}" ]]; then
        # fetch annotations (ref:bash-pipeline-fetch-annotations)
        echo "Fetching annotations metadata"
        python -m sparcur.simple.fetch_annotations > "${LOG_PATH}/fetch-annotations.log" 2>&1 &
        local pids_final[0]=$!

        # fetch remote metadata (ref:bash-pipeline-fetch-remote-metadata-all)
        # if this fails with 503 errors, check the
        # remote-backoff-factor config variable
        echo "Fetching remote metadata"
        python -m sparcur.simple.fetch_remote_metadata_all \
            --project-id "${PROJECT_ID}" \
            > "${LOG_PATH}/fetch-remote-metadata.log" 2>&1 &
        local pids[0]=$!
    fi

    local FAIL=0

    # clone aka fetch top level

    # we do not background this assignment because it runs quickly
    # and everything that follows depends on it finishing, plus we
    # need it to finish to set the PROJECT_PATH variable here
    echo python -m sparcur.simple.clone --project-id "${PROJECT_ID}" --parent-path "${PARENT_PATH}" "${maybe_slot[@]}"
    echo "Cloning top level"
    set -o pipefail
    PROJECT_PATH=$(python -m sparcur.simple.clone \
                          --project-id "${PROJECT_ID}" \
                          --parent-path "${PARENT_PATH}" \
                          "${maybe_slot[@]}" \
                          2>&1 | tee "${LOG_PATH}/clone.log" | tail -n 1) || {
        # TODO tee the output when verbose is passed
        CODE=$?;
        tail -n 100 "${LOG_PATH}/clone.log";
        echo "Clone failed! The last 100 lines of ${LOG_PATH}/clone.log are listed above.";
        apids=( "${pids[@]}" "${pids_final[@]}" );
        for pid in "${apids[@]}"; do
            kill $pid;
        done;
        set +o pipefail
        return $CODE;
    }
    set +o pipefail

    # explicit export of the current project path for pipelines
    # ideally we wouldn't need this, and when this pipeline
    # finished the export pipeline would kick off, or the export
    # pipeline would search for ... an existing project path ...
    # by ... oh right, looking for an environment variable or
    # checksing some other persistent state ... so this is the one
    # unless some controlling process sets it top down from the start
    # but we can't assume that
    export SPARCUR_PROJECT_PATH="${PROJECT_PATH}"

    for pid in "${pids[@]}"; do
        wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; }
    done
    if [[ $FAIL -ne 0 || -z "${PROJECT_PATH}" ]]; then
        echo "${FAIL} commands failed. Cannot continue."
        echo "${PROJECT_PATH}"
        return 1
    fi

    # pull aka fetch file system metadata
    echo "Fetching file system metadata"
    echo python -m sparcur.simple.pull --project-path "${PROJECT_PATH}"
    python -m sparcur.simple.pull \
           --project-path "${PROJECT_PATH}" \
           > "${LOG_PATH}/pull.log" 2>&1 || {
        CODE=$?;
        tail -n 100 "${LOG_PATH}/pull.log";
        echo "Pull failed! The last 100 lines of ${LOG_PATH}/pull.log are listed above.";
        echo "${PROJECT_PATH}";
        return $CODE; }

    # fetch metadata files
    echo "Fetching metadata files"
    # have to pass project path as a position argument here so that it
    # does not try to pull aka fetch the file system metadata again
    echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}"
    python -m sparcur.simple.fetch_metadata_files \
           --project-path "${PROJECT_PATH}" \
           > "${LOG_PATH}/fetch-metadata-files.log" 2>&1 &

    pids_final[1]=$!

    # fetch files
    echo "Fetching files"
    # XXX at some point this will probably also depend on the manifests
    # so we don't fetch everything with a matching extension
    # TODO derive --extension from manifests or all it to be passed in
    echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}" --extension xml

    # FIXME fetch_files fails silently here :/
    python -m sparcur.simple.fetch_files \
           --project-path "${PROJECT_PATH}" \
           --extension xml \
           > "${LOG_PATH}/fetch-files.log" 2>&1 &

    pids_final[2]=$!

    local FAIL=0
    for pid in "${pids_final[@]}"; do
        wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; }
    done

    # FIXME HACK
    #find -type f -size 0 -exec getfattr -d {} \;
    #find -type f -size 0 -exec spc fetch --limit=-1 {} \;

    if [[ $FAIL -ne 0 ]]; then
        echo "${FAIL} commands failed. Cannot continue."
        echo "${PROJECT_PATH}"
        return 1
    fi
    echo "All fetching completed successfully."

}

Validate

This is the graph of the existing approach more or less as implemented by spc export.

A slightly more sane version is being implemented as part of sparcur.simple which will sandbox the network dependencies.

graph-validate-all.png

Export

In the current implementation validation and export are conflated. This is bad, and will be changed.

spc export must only be run after sparc-get-all-remote-data, otherwise there will be network sandbox violations.

For the record there are multiple way invoke spc export.

# pushd to the project location
pushd "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}"
spc export
popd
# pass the project location as a positional argument
spc export "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}"
# pass the project location as an option
spc export --project-path "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}"

At the moment sparc-export-all is just a wrapper around spc export. This will change as we move to a single dataset export model. There will then likely be a function that checks for datasets that have changed since last export, updates only those and then collects the outputs.

function sparc-export-all () {
    # parse args
    local POSITIONAL=()
    while [[ $# -gt 0 ]]
    do
    key="$1"
    case $key in # (ref:(((sigh)
        --project-path) local PROJECT_PATH="${2}"; shift; shift ;;
        -h|--help)      echo "${HELP}"; return ;;
        *)              POSITIONAL+=("$1"); shift ;;
    esac
    done

    local PROJECT_PATH="${PROJECT_PATH:-$SPARCUR_PROJECT_PATH}"
    spc export --project-path "${PROJECT_PATH}"
}

Single dataset

Retrieve

graph-retrieve-single.png

Desired invocation.

sparc-fexport ${UUID}
sparc-fexport dataset:${UUID}
sparc-fexport N:dataset:${UUID}
sparc-fexport https://api.pennsieve.io/datasets/N:dataset:${UUID}
sparc-fexport https://app.pennsieve.io/N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0/datasets/N:dataset:${UUID}

Desired behavior.

dataset state objects state action outcome
not-existing do symlink to retrieve spc export; path-metadata
existing either don't fail retrieve or equiv spc export; path-metadata

Desired output. Dataset to exports/datasets/${UUID}/${TIMESTAMP_FRIENDLY}/ and is symlinked to exports/${UUID}/LATEST. The last run dataset itself goes to exports/datasets/LATEST. We don't have to care about the project identifier because each UUID is unique. TODO Logging goes ????

This is a quick and dirty version that should just do the right thing given only the dataset id as an input.

Usage sparc-fexport N:dataset:totally-not-a-uuid. The id provided may be any of the variants, url, curie, api, human, uuid, etc.

function sparc-export () {
    echo TODO not ready yet
    return 1
}

function sparc-fexport () {

    local DATASET_ID="${1}"
    local DATASET_UUID
    local DATASET_PATH
    local EXPORT_PATH

    DATASET_UUID="$(python -m sparcur.simple.utils --dataset-id ${DATASET_ID})"

    python -m sparcur.simple.retrieve --dataset-id ${DATASET_UUID} &&

    EXPORT_PATH="$(realpath "${DATASET_UUID}/exports")" &&
    DATASET_PATH="$(realpath "${DATASET_UUID}/dataset")" &&
    pushd "${DATASET_PATH}" &&

    # FIXME we shouldn't need this refetch so I think that retrieve is
    # broken if files/folders already exist
    python -m sparcur.cli find \
        --name '*.xlsx' \
        --name '*.xml' \
        --name 'submission*' \
        --name 'code_description*' \
        --name 'dataset_description*' \
        --name 'subjects*' \
        --name 'samples*' \
        --name 'manifest*' \
        --name 'resources*' \
        --name 'README*' \
        --no-network \
        --limit -1 \
        --fetch
    wait $!
    python -m sparcur.cli export --export-path "${EXPORT_PATH}" &  # FIXME TODO this conflates phases
    local pids[0]=$!
    # FIXME TODO for now export_single_dataset produces this so we don't run it independently
    # FIXME there is also a difference in the export folder because the path metadata targets
    # the last updated data and thus overwrites if the data has not changed but the code has
    #python -m sparcur.simple.path_metadata_validate --export-path "${EXPORT_PATH}" &
    #local pids[1]=$!
    local FAIL=0
    # TODO log/notify export failure

    for pid in "${pids[@]}"; do
        wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; }
    done
    if [[ $FAIL -ne 0 ]]; then
        echo "${FAIL} commands failed. Cannot continue."
        echo "${DATASET_UUID}"
        echo "${DATASET_PATH}"
        return 1
    fi
    popd # or do it yourself because we might need to explore??
}
for d in $(ls *-* -d); do refetch ${d}; done

for d in $(ls *-* -d); do
    find ~/".local/share/sparcur/export/datasets/${d}/LATEST/curation-export.json" -name 'curation-export.json';
done

Validate

See ref:graph-validate-all for all the related bits.

graph-validate-single.png

Extract
python -m sparcur.simple.extract "${DATASET_PATH}"
Retrieve 2

Export

Future correct single dataset workflow

Network access should only be possible during the retrieve phase.

The validate step may happen during extract and transform as well since structure or data content issues may be easier to detect during certain phases. Ideally this would not be the case, but practically it will take more work than necessary given our use cases.

We have to be careful to separate basic validation of the structure of the dataset data from the validation that the identifiers provided in that structure point to values known to their respective remotes.

For example we need to be able to say you are missing a protocol reference at a separate point in time from saying the remote(s) we asked had no record of the protocol reference you provided.

Hypothes.is

This is pulled in bulk independently in a different workflow but it is probably worth checking to see if we need to run it again whenever we run a dataset.

Structure metadata

Retrieve

This is ref:clone.py, ref:fetch_remote_metadata_all.py, and ref:pull.py.

Extract

This is now called ref:path_metadata, but it is also dealt with as part of specimen_dirs.

Transform
Validate

We can't do this right now because the current dataset template cannot be statically validated. Only some of it can be validated when we have the data from the subjects and samples sheets. In this future pipeline the type prefixes will be required so that the structure can be statically verified.

Dataset metadata

Run this if the structure metadata is in a state where we can proceed (i.e. that there is a dataset_description file).

Retrieve

This is ref:fetch_metadata_files.py.

Extract
Transform
Validate

File metadata

Retrieve

This is ref:fetch_files.py. Depends on the manifest files and the dataset structure.

Extract
Transform
Validate

This is local validation, not remote networked validation.

Identifiers and mapping

Retrieve

This is the retrieval/dereferencing of identifier metadata.

It must happen after the file metadata step has been completed so that e.g. identifiers used in MBF segmentation files can be validated. In this step in particular validation and retrieval are essentially the same step. If there is an error during retrieval then it must produce a validation error.

Protocols

Checking and/or retrieving these depends on 3 things. The protocols.io group, the hypothesis group, and the dataset metadata.

Protc

Needs hypothesis and protocols.

Export

Convert
Serialize

Probably also includes load in some cases e.g. for the file level metadata that will be attached to package-id file-id pairs.

Protocols

graph-protocols.png

Release

Since we are moving to run individual datasets the aggregate release process is decouple, and mediated via git:36a749b5c321cdb81ba81f9d35e050ceb8479976

Testing

Code
Data

Reporting

After a release

SCKAN

Overview

sckan-ideal-run.png

Implementation

It should be possible to run all of these steps in a container derived from a tgbugs/musl:kg-dev-user docker image. See https://github.com/tgbugs/dockerfiles/blob/master/source.org#kg-dev-user https://hub.docker.com/repository/docker/tgbugs/musl/tags?name=kg-dev-user

Data synchronization

All of these things either run manually or are independent of the SCKAN release process. In most cases manual intervention is needed to ensure that various component sources are up to date.

SPARC curation export

See /docs/sparc-curation/docs/setup.html#export-v4 for the most recent workflow for batch release.

NPO
python -m neurondm.build release
python -m neurondm.models.apinat_pops_more

Super manual curation, identifier sync, review, merge, commit, push, etc.

ApiNATOMY
~/git/sparc-curation/docs/apinatomy.org --all

deploy to remote

apinat-build --deploy  # XXX manual and not fully implemented

For new ApiNATOMY models also update https://github.com/SciCrunch/sparc-curation/blob/master/resources/scigraph/sparc-data.ttl.

NIF-Ontology dev branch

Maybe update sparc community terms (sparc community termset now).

# sh ~/git/pyontutils/nifstd/scigraph/README.org --sct  # XXX old use prrequaestor.el approach instead

Currently, run /docs/pyontutils/nifstd/scigraph/README.html#sparc-community-terms-update, review, merge, push to NIF-Ontology

Use ~/ni/sparc/prrequaestor.el with ~/ni/sparc/sync-specs.el once they are published (still trying to figure out the best way to do that … orgstrap repo? new repo? here?).

Blazegraph and SciGraph builds
### make sure all code is up-to-date

# scigraph/README.org
sh ~/git/pyontutils/nifstd/scigraph/README.org --tangle

### release artifacts

# TODO prepare all ontology files first up here

# release.org to retrieve
~/git/sparc-curation/docs/release.org --build --sckan --no-blaze --no-load
# XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc

# we have a circular dependency issue here, which is that
# we want to use the apinatomy models from blazegraph for scigraph
# but we also want to use the configured NIF-Ontology repo from
# scigraph for blazegraph, too many layers ...
~/git/pyontutils/nifstd/scigraph/bin/run-load-graph-sparc-sckan
# if you are iterating hard on apinat model development, this is the point
# at which you usually want to deploy to dev scigraph and test

# release.org
~/git/sparc-curation/docs/release.org --build --sckan --no-blaze --resume
# XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc

### testing phase 1, probably move to docker

# XXX manual
echo manual step waiting for signal from human to start next automated portion
Initial testing

Deploy to dev scigraph.

~/ni/dev/bin/run-deploy-graph-selene

Deploy to local dev blazegraph. /docs/sparc-curation/docs/release.html#*Deploy journal to local server

critical checks:

  1. run the ApiNATOMY dashboard queries and inspect the numbers
  2. check files in data folder for empty classes e.g. via grep 'a owl:Class \.$' (can do this before scigraph load)
  3. check prov-record matches expected
  4. future run the NPO dashboard
Deploy to staging
### deploy to sparc-scigraph
# XXX NOTE we NEVER deploy to sckan-scigraph directly always via
# sparc-scigraph and then once that passes we do the release
~/git/pyontutils/nifstd/scigraph/bin/run-deploy-graph-sparc-sckan

### docker image build

echo manual step waiting for signal from human to start next automated portion
Test staging

Use mapknowledge to test over all neurons where all neurons is the list from some other query

(defun advise--obe-python-path (command &rest args)
  (let* ((params (cadr args))
         (_ (message "%s" params))
         (python-path (cdr (assq :python-path params)))
         (process-environment
          (or (and python-path
                   (cons (format "PYTHONPATH=%s" python-path)
                         process-environment))
              process-environment)))
    (apply command args)))

(advice-add #'org-babel-execute:python :around
            #'advise--obe-python-path)
#import sys
#return [[p] for p in sys.path]
from mapknowledge import KnowledgeStore
from mapknowledge.scicrunch import SCICRUNCH_PRODUCTION, SCICRUNCH_STAGING, SciCrunch
from pyontutils.scigraph_codegen import moduleDirect
from pyontutils.config import auth
from sparcur.utils import log


def test_prod(curie):
    return test_endpoint(curie, SCICRUNCH_PRODUCTION)


def test_stage(curie):
    return test_endpoint(curie, SCICRUNCH_STAGING)


def test_endpoint(curie, endpoint):
    store = KnowledgeStore(scicrunch_release=endpoint)
    store._KnowledgeStore__scicrunch._SciCrunch__scicrunch_key = auth.get('scigraph-api-key')  # XXX SSIIIIIGH
    try:
        wat, *rest = curie.rsplit('/', 1)
        result = store.entity_knowledge(wat)
        log.info(result)
    finally:
        store.close()
    #
    skip = ('id', 'label', 'long-label', 'phenotypes', 'references')
    out = {k:v for k, v in result.items() if v and k not in skip}
    return out  # !?!??!?!!?! apparently a newline before this breaks everything !??!


def testn(curie):
    p = test_prod(curie)
    s = test_stage(curie)
    # XXX probably not quite right given ps and s values
    if p and not s:
        return False, ('removed', p, s)
    elif not s:
        return False, ('missing', p, s)

    if s and not p:
        log.info(f'new neuron {curie}')

    #
    return True, ('ok', p, s)


def testnall(curies):
    bads = []
    for curie in curies:
        try:
            ok, data = testn(curie)
        except Exception as e:
            ok, data = False, ('error', e, e)

        if not ok:
            bads.append((curie, data))
    if bads:
        log.error(bads)
    #
    return bads


def dewit(endpoint):
    base = f'https://scicrunch.org/api/1/{endpoint}'  # XXX no trailing path
    scigraphd = moduleDirect(base, 'scigraphd')
    scigraphd.restService._api_key = auth.get('scigraph-api-key')
    sgc = scigraphd.Cypher()
    #sgd = scigraphd.Dynamic()  # XXX FIXME all pops no in the dynamic endpoints atm
    curies_raw = sgc.execute(
    # XXX must use `org-babel-lob-ingest' on queries.org for this to work
    """
    OPTIONAL MATCH (start:Ontology)
    <-[:isDefinedBy]-(graph:NamedIndividual)
    -[:type]->({iri: "https://apinatomy.org/uris/elements/Graph"})
    , (start)
    <-[:isDefinedBy]-(external:Class)
    -[:subClassOf*]->(:Class {iri: "http://uri.interlex.org/tgbugs/uris/readable/NeuronEBM"})
    return external
    """, limit=99999, output='application/json')
    #curies_raw = sgd
    curies = [c['id'] for c in curies_raw['nodes']]
    #breakpoint()
    count = 2
    sample_size = 15
    bads = []
    for n in range(count):
        # randomly sample the subset to test
        test_set = random.sample(curies, sample_size)
        _bads = testnall(test_set)
        bads.extend(_bads)

    return bads


result = dewit(SCICRUNCH_STAGING)

Future: similar for NPO

Build docker images

run the docker build blocks in /docs/dockerfiles/source.html for the sckan release that do all the copying of the build state to the requisite locations order is bottom to top (maybe use ob-lob) /docs/dockerfiles/source.html#&musl-build-sckan-base /docs/dockerfiles/source.html#&musl-build-sckan-services

dump the docker image for the zenodo release /docs/dockerfiles/source.html#&sckan-save-image

Archive release artifacts
# archive all publication artifacts at one of the usual locations e.g. ~/nas/data
# FIXME hardcoded paths
# FIXME need to set -e on ALL of these blocks I think?
_archive=~/"nas/data/"
_sckanrz="$(ls -d /tmp/build/release-*-sckan.zip | sort -u | tail -n 1)"
_sckansz=/tmp/scigraph-build/sparc-sckan/$(readlink /tmp/scigraph-build/sparc-sckan/LATEST)
_sckandz="$(ls -d /tmp/docker-sckan-data-*Z.tar.gz | sort -u | tail -n 1)"
declare -a _arts=(${_sckanrz} ${_sckansz} ${_sckandz})
for _art in "${_arts[@]}"; do
echo $(basename ${_art})
rsync -a ${_art} ${_archive}
done
GitHub release

Create a new pre-release at https://github.com/SciCrunch/NIF-Ontology/releases tag: sckan-%Y-%m-pre-{n} target: dev title: SCKAN %Y-%m-pre-{n} Upload the blazegraph and scigraph zips.

Push docker images
# push docker images
docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:base-*Z | sort | tail -n 1 | awk '{ print $2 }')
docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:data-*Z | sort | tail -n 1 | awk '{ print $2 }')
docker push tgbugs/sckan:base-latest
docker push tgbugs/sckan:latest
Test docker images

Most of the testing at this stage is of the functionality in tgbugs/musl:kg-release-user so it is ok to push the data images first.

### testing

echo manual step waiting for signal from human to start next automated portion

TODO automate. Create the new docker container volume for sckan data run it with the latest version of tgbugs/musl:kg-release-user run all the examples etc. Use the shebang block in queries.org or similar to execute all the blocks and check to make sure they are working out as expected. Better yet if we can write some internal consistency checks i.e. between NPO and ApiNATOMY.

Update changelog
### publication

echo manual step waiting for signal from human to start next automated portion
### changelog

# XXX manual

One way to generate a changelog, not the best, but possible.

pushd ~/git/apinatomy-models
git diff $(the last commit before the previous release)..HEAD -- models
pushd ~/git/NIF-Ontology
git diff $(the last commit before the previous release)..HEAD -- *.ttl ttl
Zenodo release
# prepare the new zenodo release

# deploy scigraph image to production
Promote to production

Once all tests have passed and we receive the ok from MAP we promote sparc-scigraph to sckan-scigraph.

  1. back up existing prod services.yaml
  2. copy data from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph
    1. alternately curl from github release or rsync from internal stores, the issue is the release id is not predictable right now
  3. copy services.yaml (and the raw) from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph
  4. unzip data
  5. stop service
  6. unlink/ln -s
  7. start service
echo "not running to avoid accidental deployment"
echo "if you want to do this comment out these lines and the exit line"
exit 1
source "$(eval echo ~/git/pyontutils/nifstd/scigraph/bin/scigraph-functions.sh)"
host_stage=aws-scigraph-scigraph
host_prod=aws-scigraph-sckan-scigraph
host_prod_sudo=aws-scigraph-sckan

path_stamped="$(ssh ${host_stage} "realpath graph")"
path_zip="${path_stamped}.zip"
path_yaml="$(ssh ${host_stage} "realpath services.yaml")"
path_yaml_raw="$(ssh ${host_stage} 'realpath $(head -n 1 services.yaml | cut -d" " -f 2)')"

__TEST='echo ${USER}@${HOSTNAME} '
__TEST=''

# prod backup existing services
ssh ${host_prod} "${__TEST}"'cp services.yaml services.yaml-'"$(date +%s)" || exit $?

# stage>prod data and services
# XXX no rsync because the service users should not have ssh access to anything
# FIXME the egress on this is stupid, ideally run this whole script from within aws
paths=("${path_zip}" "${path_yaml}" "${path_yaml_raw}")
for path in "${paths[@]}"; do
    ssh ${host_stage} "${__TEST}"'sha256sum '"${path}";
    # XXX FIXME BEWARE cannot test the pipe and redirection easily
    ssh ${host_stage} "${__TEST}"'cat '"${path}" |\
        ssh ${host_prod} 'cat - > '"${path}";
    ssh ${host_prod} "${__TEST}"'sha256sum '"${path}";
done

# prod unzip
ssh ${host_prod} "${__TEST}"'unzip -n '"${path_zip}" || exit $?

# prod stop relink start
ssh -t ${host_prod_sudo} "$(typeset -f service-manager);"\
"${__TEST}"'service-manager scigraph stop &&'\
"${__TEST}"'sudo unlink /var/lib/scigraph/graph;'\
"${__TEST}"'sudo ln -s '"${path_stamped}"' /var/lib/scigraph/graph;'\
"${__TEST}"'service-manager scigraph start'

Internal Structure

Pipelines

Easier to read, harder to debug. The python paradox.

Retrieve

Protocols

Cache annotations. See for usage.

from pathlib import Path
from hyputils import hypothesis as hyp
from sparcur.config import auth


def from_group_name_fetch_annotations(group_name):
    """ pull hypothesis annotations from remote to local """
    group_id = auth.user_config.secrets('hypothesis', 'group', group_name)
    cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur'))
    get_annos = hyp.Memoizer(cache_file, group=group_id)
    get_annos.api_token = auth.get('hypothesis-api-key')  # FIXME ?
    annos = get_annos()
    return cache_file  # needed for next phase, annos are not


def from_group_name_cached_annos(group_name):
    group_id = auth.user_config.secrets('hypothesis', 'group', group_name)
    cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur'))
    get_annos = hyp.AnnoReader(cache_file, group=group_id)
    annos = get_annos()
    return annos


def from_user_name_group_name_h(user_name, group_name):
    group_id = auth.user_config.secrets('hypothesis', 'group', group_name)
    h = hyp.HypothesisUtils(username=user_name, group=group_id)
    h.token = auth.user_config.secrets('hypothesis', 'api', user_name)
    return h


def main(hypothesis_group_name=None, **kwargs):
    if hypothesis_group_name is None:
        hypothesis_group_name = 'sparc-curation'

    from_group_name_fetch_annotations(hypothesis_group_name)


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main)

Temporary location for some helper code to clone protcur annos to a new group.

from sparcur.simple.fetch_annotations import (
    from_group_name_fetch_annotations,
    from_group_name_cached_annos,
    from_user_name_group_name_h)
from protcur import document as ptcdoc


#def main():
annos = from_group_name_cached_annos('sparc-curation')

bannos = [ptcdoc.Annotation(a) for a in annos]
pool = ptcdoc.Pool(bannos)
anno_counts = ptcdoc.AnnoCounts(pool)

if False: # sort debug
    ta = pool.topological_annos
    def asdf(ta):
        hrm = [a.id for a in ta]
        de = [(i, [(hrm.index(pid) if pid in hrm else 'oops') for pid in a.text_parent_ids]) for i, a in enumerate(ta) if a.text_parent_ids]
        qq = [(i, l) for i, l in de if [_ for _ in l if _ != 'oops' and i < _]]
        sigh = sorted(set([x for i, l in qq for x in [i] + l if x != 'oops']))
        return qq, sigh

    qq, sigh = asdf(ta)
    bads = [ta[f] for f in sigh]
    sbads = ptcdoc.toposort(bads)
    qq2, sigh2 = asdf(sbads)

group_name = 'fixed-sparc-curation'
cache_file = from_group_name_fetch_annotations(group_name)
html_annos = from_group_name_cached_annos(group_name)
html_bannos = [ptcdoc.Annotation(a) for a in html_annos]
#html_bannos = []
html_pool = ptcdoc.Pool(html_bannos)

h_p_f = from_user_name_group_name_h('protbot', group_name)

#pool.clone_to(html_pool, h_p_f)

test = False
if test:
    test_bannos = list(pool.bySlugTail('wzuff6w'))  # published with most annos
    #test_bannos = list(pool.bySlugTail('ba8hiht6'))  # published with low number of annos
    test_pool = ptcdoc.Pool(test_bannos)
    test_pool.clone_to(html_pool, h_p_f, test=False)
else:
    pool.clone_to(html_pool, h_p_f, test=test)

[a._row for a in html_pool._annos]

# [h_p_f.delete_annotation(a.id) for a in html_pool._annos]



if __name__ == '__main__':
    main()
Datasets
Clone

This is an example of how to clone the top level of a project. See ref:utils.py for a good way to instantiate RemotePath.

from pathlib import Path


# clone top level
def from_path_id_and_backend_project_top_level(parent_path,
                                               project_id,
                                               RemotePath,
                                               symlink_objects_to=None,):
    """ given the enclosing path to clone to, the project_id, and a fully
        configured (with Local and Cache) backend remote path, anchor the
        project pointed to by project_id along with the first level of children """

    project_path = _from_path_id_remote_project(parent_path,
                                                project_id,
                                                RemotePath,
                                                symlink_objects_to)
    return _from_project_path_top_level(project_path)


def from_path_project_backend_id_dataset(parent_path,
                                         project_id,
                                         dataset_id,
                                         RemotePath,
                                         symlink_objects_to=None,):
    project_path = _from_path_id_remote_project(parent_path,
                                                project_id,
                                                RemotePath,
                                                symlink_objects_to)
    return _from_project_path_id_dataset(project_path, dataset_id)


def _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to):
    RemotePath.init(project_id)  # calling init is required to bind RemotePath._api
    anchor = RemotePath.smartAnchor(parent_path)
    anchor.local_data_dir_init(symlink_objects_to=symlink_objects_to)
    project_path = anchor.local
    return project_path


def _from_project_path_top_level(project_path):
    """ given a project path with existing cached metadata
        pull the top level children

        WARNING: be VERY careful about using this because it
        does not gurantee that rmeta is available to mark
        sparse datasets. It may be the case that the process
        will fail if the rmeta is missing, or it may not. Until
        we are clear on the behavior this warning will stay
        in place. """
    # this is a separate function in case the previous step fails
    # which is also why it is hidden, it makes too many assuptions
    # to be used by itself

    anchor = project_path.cache
    list(anchor.children)  # this fetchs data from the remote path to the local path
    return project_path  # returned instead of anchor & children because it is needed by next phase


def _from_project_path_id_dataset(project_path, dataset_id):
    anchor = project_path.cache
    remote = anchor._remote_class(dataset_id)
    cache = anchor / remote
    return cache.local


def main(parent_path=None,
         project_id=None,
         parent_parent_path=Path.cwd(),
         project_id_auth_var='remote-organization',  # FIXME move default to clifun
         symlink_objects_to=None,
         id=None,
         dataset_id=None,
         **kwargs):
    """ clone a project into a random subfolder of the current folder
        or specify the parent path to clone into """

    from sparcur.simple.utils import backend_pennsieve

    if parent_path is None:
        breakpoint()  # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX FIXME
        import tempfile
        parent_path = Path(tempfile.mkdtemp(dir=parent_parent_path))

    if project_id is None:
        from sparcur.config import auth
        from sparcur.utils import PennsieveId
        project_id = auth.get(project_id_auth_var)
        project_id = PennsieveId(project_id)  # FIXME abstract the hardcoded backend

    RemotePath = backend_pennsieve()

    if id and dataset_id:
        # FIXME doesn't check for existing so if the name changes we get duped folders
        # this issue possibly upstream in retrieve, clone just clones whatever you tell
        # it to clone, but maybe it should check the existing metadata and fail or warn?
        dataset_path = from_path_project_backend_id_dataset(
            parent_path,
            project_id,
            id,  # FIXME multiple datasets
            RemotePath,
            symlink_objects_to,)

        return dataset_path

    project_path = from_path_id_and_backend_project_top_level(
        parent_path,
        project_id,
        RemotePath,
        symlink_objects_to,)

    return project_path


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=print)
Remote metadata

Remote metadata must be retrieved prior to the first pull in order to ensure that large datasets can be marked as sparse datasets before they are pulled.

From id

Remote metadata can be retrieved using only a project_id. However, for all retrieval after the first pull it is usually more effective to retrieve it at the same time as fetching metadata files since it runs in parallel per dataset.

See for usage.

from joblib import Parallel, delayed
from sparcur.backends import PennsieveDatasetData
from sparcur.simple.utils import backend_pennsieve


def from_id_fetch_remote_metadata(id, project_id=None, n_jobs=12):
    """ given an dataset id fetch its associated dataset metadata """
    if id.type == 'organization':
        RemotePath = backend_pennsieve()
        project = RemotePath(id)
        prepared = [PennsieveDatasetData(r) for r in project.children]
        if n_jobs <= 1:
            [p() for p in prepared]
        else:
            # FIXME Paralle isn't really parallel here ...
            # can't use multiprocessing due to broken aug.RemotePath implementation
            # LOL PYTHON everything is an object, except when you want to pickle it
            # then some objects are more equal than others
            Parallel(n_jobs=n_jobs)(delayed(p._no_return)() for p in prepared)

    elif id.type == 'dataset':
        RemotePath = backend_pennsieve(project_id)
        dataset = RemotePath(id)
        bdd = PennsieveDatasetData(dataset)
        bdd()
    else:
        raise NotImplementedError(id)


def main(id=None,
         project_id=None,
         project_id_auth_var='remote-organization',  # FIXME move to clifun
         n_jobs=12,
         **kwargs):

    if project_id is None:
        from sparcur.utils import PennsieveId
        from sparcur.config import auth
        project_id = auth.get(project_id_auth_var)
        project_id = PennsieveId(project_id)  # FIXME abstract the hardcoded backend

    if id is None:
        id = project_id

    from_id_fetch_remote_metadata(id,
                                  project_id=project_id,
                                  n_jobs=n_jobs,)


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main)  # nothing to print or do after
From path

The implementation of sparcur.backends.PennsieveDatasetData supports the ability to retrieve metadata directly from the remote without the need for an intervening local path. However this functionality is obscured here because we want to derive a consistent view of the data from the file system snapshot.

from joblib import Parallel, delayed
from sparcur.paths import Path
from sparcur.backends import PennsieveDatasetData


def _from_project_path_fetch_remote_metadata(project_path, n_jobs=12, cached_ok=False):
    if n_jobs <= 1:
        prepared = [PennsieveDatasetData(dataset_path.cache)
                    for dataset_path in project_path.children]
        [bdd() for bdd in prepared if not (cached_ok and bdd.cache_path.exists())]
    else:
        fetch = lambda bdd: bdd() if not (cached_ok and bdd.cache_path.exists()) else None
        fetch_path = (lambda path: fetch(PennsieveDatasetData(path.cache)))
        Parallel(n_jobs=n_jobs)(delayed(fetch_path)(dataset_path)
                 for dataset_path in project_path.children)


# fetch remote metadata
def from_path_fetch_remote_metadata(path, n_jobs=12, cached_ok=False):
    """ Given a path fetch remote metadata associated with that path. """

    cache = path.cache
    if cache.is_organization():
        _from_project_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=cached_ok)
    else:  # dataset_path
        # TODO more granular rather than roll up to dataset if inside?
        bdd = PennsieveDatasetData(cache)
        if not (cached_ok and bdd.cache_path.exists()):
            bdd()


def main(path=Path.cwd(), n_jobs=12, rmeta_cached_ok=False, **kwargs):
    if path is None or path.find_cache_root() not in (path, *path.parents):
        from sparcur.simple.clone import main as clone
        path = clone(path=path, n_jobs=n_jobs, **kwargs)
        # NOTE path is passed along here, but kwargs is expected to contain
        # parent_path or parent_parent_path and project_id note that if that
        # happens then the path returned from clone will change accordingly

    from_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=rmeta_cached_ok)
    return path


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main)  # we probably don't print here?
Pull

Pull a single dataset or pull all datasets or clone and pull all datasets.

from joblib import Parallel, delayed
from sparcur.paths import Path
from sparcur.utils import GetTimeNow


# pull dataset
def from_path_dataset_file_structure(path, time_now=None, exclude_uploaded=False):
    """ pull the file structure and file system metadata for a single dataset
        right now only works from a dataset path """

    if time_now is None:
        time_now = GetTimeNow()

    path._pull_dataset(time_now, exclude_uploaded)


# pull all in parallel
def from_path_dataset_file_structure_all(project_path,
                                         *args,
                                         paths=None,
                                         time_now=None,
                                         n_jobs=12,
                                         exclude_uploaded=False):
    """ pull all of the file structure and file system metadata for a project
        paths is a keyword argument that accepts a list/tuple of the subset of
        paths that should be pulled """

    if time_now is None:
        time_now = GetTimeNow()

    project_path.pull(
        paths=paths,
        time_now=time_now,  # TODO
        debug=False,  # TODO
        n_jobs=n_jobs,
        log_level='DEBUG' if False else 'INFO',  # TODO
        Parallel=Parallel,
        delayed=delayed,
        exclude_uploaded=exclude_uploaded,)


# mark datasets as sparse 
def sparse_materialize(path, sparse_limit:int=None):
    """ given a path mark it as sparse if it is a dataset and
        beyond the sparse limit """

    cache = path.cache
    if cache.is_organization():
        # don't iterate over cache children because that pulls remote data
        for child in path.children:
            sparse_materialize(child, sparse_limit=sparse_limit)
    else:
        cache._sparse_materialize(sparse_limit=sparse_limit)


def main(path=Path.cwd(),
         time_now=None,
         sparse_limit:int=None,
         n_jobs=12,
         exclude_uploaded=False,
         **kwargs):
    if path != path.resolve():
        raise ValueError(f'Path not resolved! {path}')

    project_path = None  # path could be None so can't find_cache_root here
    if path is None or path.find_cache_root() not in (path, *path.parents):
        from sparcur.simple.fetch_remote_metadata import main as remote_metadata
        project_path = remote_metadata(path=path, **kwargs)  # transitively calls clone
    else:
        project_path = path.find_cache_root()
        if path != project_path:
            # dataset_path case
            sparse_materialize(path, sparse_limit=sparse_limit)
            from_path_dataset_file_structure(path, time_now=time_now, exclude_uploaded=exclude_uploaded)
            if path == Path.cwd():
                print('NOTE: you probably need to run `pushd ~/ && popd` '
                      'to get a sane view of the filesystem if you ran this'
                      'from within a dataset folder')
            return path

    if not list(project_path.children):
        raise FileNotFoundError(f'{project_path} has no children.')
        # somehow clone failed
        # WARNING if rmeta failed you may get weirdness  # FIXME
        from sparcur.simple.clone import _from_project_path_top_level
        _from_project_path_top_level(project_path)

    sparse_materialize(project_path,
                       sparse_limit=sparse_limit)
    from_path_dataset_file_structure_all(project_path,
                                         time_now=time_now,
                                         n_jobs=n_jobs,
                                         exclude_uploaded=exclude_uploaded)
    return project_path


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=print)
Internal implementation

The non-simple implementation of this is quite convoluted so here are links to the current implementation, from outside in. In reverse order the basic steps are pull from dataset packages endpoint, resolve hierarchy, convert to remote paths, and covert to cache paths which materialize the pull as folders or symlinks.

  1. entry point to pull
    1. the call in pull that actually retrieves data
  2. the implementation of rchildren for Pennsieve
  3. looping over packages to covert them to paths
  4. transform dataset packages endpoint json into Pennsieve api objects
Fetch
Individual file

def main(path=None, **kwargs):
    if path is not None:
        # FIXME this will fail if the remote for the file is not in
        # the current project, or if the cachedir is not a child of
        # the top level project directory e.g. in .operations/objects
        cache = path.cache
        cache.fetch(size_limit_mb=None)


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main)
Metadata files
from itertools import chain
from sparcur import exceptions as exc
from sparcur.utils import log, logd
from sparcur.paths import Path
from sparcur.datasets import DatasetStructure
from sparcur.simple.utils import fetch_paths_parallel, rglob

# fetch metadata files
fetch_prefixes = (
    ('dataset_description', 'glob'),
    ('subjects',            'glob'),
    ('samples',             'glob'),
    ('submission',          'glob'),
    ('manifest',           'rglob'),  # XXX NOTE the rglob here
)


def _from_path_fetch_metadata_files_simple(path, fetch=True):
    """ transitive yield paths to all metadata files, fetch them from
        the remote if fetch == True """
    for glob_prefix, glob_type in fetch_prefixes:
        if glob_type == 'rglob':
            gp0 = glob_prefix[0]
            pattern = f'[{gp0.upper()}{gp0}]{glob_prefix[1:]}*'
            yield from rglob(path, pattern)
            continue
        ds = DatasetStructure(path)
        for path_to_metadata in ds._abstracted_paths(glob_prefix,
                                                     glob_type=glob_type,
                                                     fetch=fetch):  # FIXME fetch here is broken
            yield path_to_metadata


def _from_path_fetch_metadata_files_parallel(path, n_jobs=12):
    """ Fetch all metadata files within the current path in parallel. """
    paths_to_fetch = _from_path_fetch_metadata_files_simple(path, fetch=False)
    try:
        first = next(paths_to_fetch)
        paths_to_fetch = chain((first,), paths_to_fetch)
    except StopIteration:
        log.warning('No paths to fetch, did you pull the file system metadata?')
        return

    # FIXME passing in a generator here fundamentally limits the whole fetching
    # process to a single thread because the generator is stuck feeding from a
    # single process, IF you materialize the paths first then the parallel fetch
    # can actually run in parallel, but bugs/errors encountered late in collecting
    # the paths will force all previous work to be redone
    # XXX as a result of this we use the posix find command to implement rglob
    # in a way that is orders of magnitude faster
    paths_to_fetch = list(paths_to_fetch)
    fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs)


def from_path_fetch_metadata_files(path, n_jobs=12):
    """ fetch metadata files located within a path """
    #if n_jobs <= 1:
        #gen = _from_path_fetch_metadata_files_simple(path)
        # FIXME broken ??? somehow abstracted paths doesn't fetch when
        # we run in directly, or somehow fetch_paths_parallel does something
        # different
        #paths = list(gen)
    #else:
    _from_path_fetch_metadata_files_parallel(path, n_jobs=n_jobs)


def main(path=Path.cwd(), n_jobs=12, **kwargs):
    if path is None or path.find_cache_root() not in (path, *path.parents):
        from sparcur.simple.pull import main as pull
        path = pull(path=path, n_jobs=n_jobs, **kwargs)

    from_path_fetch_metadata_files(path, n_jobs=n_jobs)
    return path


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=print)
File level metadata extraction

Fetch files by extension.

import os
from sparcur.paths import Path
from sparcur.utils import _find_command
from sparcur.simple.utils import fetch_paths_parallel


def _datasets_with_extension(path, extension):
    """ Hack around the absurd slowness of python's rglob """

    # TODO query multiple extensions with -o at the same time
    command = fr"""for d in */; do
    {_find_command} "$d" \( -type l -o -type f \) -name '*.{extension}' \
    -exec getfattr -n user.bf.id --only-values "$d" \; -printf '\n' -quit ;
done"""

    with path:
        with os.popen(command) as p:
            string = p.read()

    has_extension = string.split('\n')
    datasets = [p for p in path.children if p.cache_id in has_extension]
    return datasets


def _from_path_fetch_files_simple(path, filter_function, fetch=True):
    files = filter_function(path)
    if fetch:
        [f.fetch(size_limit_mb=None) for f in files if not f.exists()]
        #Async(rate=5)(deferred(f.fetch)(size_limit_mb=None)
                      #for f in files if not f.exists())

    return files


def _from_path_fetch_files_parallel(path, filter_function, n_jobs=12):
    paths_to_fetch = _from_path_fetch_files_simple(path, filter_function, fetch=False)
    fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs)


def filter_extensions(extensions):
    """ return a function that selects files in a path by extension """
    def filter_function(path):
        cache = path.cache
        if cache.is_organization():
            paths = set()
            for ext in extensions:
                ds = _datasets_with_extension(path, ext)
                paths.update(ds)

        else:  # dataset_path
            paths = path,

        files = [matching  # FIXME stream ?
                for path in paths
                for ext in extensions
                for matching in path.rglob(f'*.{ext}')]
        return files

    return filter_function


def filter_manifests(dataset_blob):
    """ return a function that selects certain files listed in manifest records """
    # FIXME this needs a way to handle organization level?
    # NOTE this filter is used during the second fetch phase after the inital
    # metadata has been ingested to the point where it can be use to guide further fetches
    # TODO this is going to require the implementation of partial fetching I think
    # TODO preprocessing here?
    def filter_function(path):
        # TODO check that the path and the dataset blob match
        cache = path.cache
        if cache.id != dataset_blob['id']:
            msg = f'Blob is not for this path! {dataset_blob["id"]} != {cache.id}'
            raise ValueError(msg)

        files = []  # TODO get_files_for_secondary_fetch(dataset_blob)
        return files

    return filter_function


def from_path_fetch_files(path, filter_function, n_jobs=12):
    if n_jobs <= 1:
        _from_path_fetch_files_simple(path, filter_function)
    else:
        _from_path_fetch_files_parallel(path, filter_function, n_jobs=n_jobs)


def main(path=Path.cwd(), n_jobs=12, extensions=('xml',), **kwargs):
    #breakpoint()  # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    if path is None or path.find_cache_root() not in (path, *path.parents):
        from sparcur.simple.pull import main as pull
        path = pull(path=path, n_jobs=n_jobs, **kwargs)

    filter_function = filter_extensions(extensions)

    from_path_fetch_files(path, filter_function, n_jobs=n_jobs)
    return path


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main)
Second fetch

Once the initial pass over the dataset is complete extract the list of additional files that need to be retrieved and fetch them.

from sparcur.paths import Path
from sparcur.simple.fetch_files import from_path_fetch_files, filter_manifests


def from_blob_fetch_files(dataset_blob, path=None):
    # should the blob contain a reference to the path
    # it was derived from?
    filter_function = filter_manifests(dataset_blob)
    from_path_fetch_files(path, filter_function, n_jobs=n_jobs)


def main(path=Path.cwd(), n_jobs=12, **kwargs):
    #breakpoint()  # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    # if not dataset_blob: get_blob vs path blob pairs?
    # starting from a partial blob means that this probably
    # should not kick off from the file system, but we know
    # that we will want to be able to kick it off from the
    # file system ... maybe the intermediate blobs can encode
    # the prov of where the file system reference they were
    # derived from lives ?
    dataset_blob = get_blob(path.cache_id)  # FIXME TODO
    from_blob_fetch_files(dataset_blob, path)


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main)
Retrieve

Putting it all together into a single command.

The behavior of retrieve works exactly as it does for clone the difference is that it runs for just a single dataset and the parent_path is made to be the dataset_id uuid if you are running a single dataset pipeline you will still need the project folder structure for logs and jobs etc. you can also still run all datasets together off of a single SPARC Consoritum folder, in which case all you need to do is pass the communal parent_path

Usage example.

python -m sparcur.simple.retrieve \
--dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \
--symlink-objects-to /mnt/str/tom/cache/bf-object-cache

Example python usage.

from sparcur.paths import Path
from sparcur.utils import PennsieveId
from sparcur.simple.retrieve import main as retrieve
p = PennsieveId('N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0')
d = PennsieveId('N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18')
ppp = Path('~/files/sparc-datasets').expanduser().resolve()
retrieve(id=d, dataset_id=d, project_id=p, parent_parent_path=ppp)

sparcur.simple.retrieve implementation

from sparcur.paths import Path
from sparcur.utils import symlink_latest
from sparcur.simple.clone import main as clone
from sparcur.simple.fetch_remote_metadata_all import main as remote_metadata
from sparcur.simple.pull import main as pull
from sparcur.simple.fetch_metadata_files import main as fetch_metadata_files
from sparcur.simple.fetch_files import main as fetch_files

def main(id=None,
         dataset_id=tuple(),
         parent_path=None,
         parent_parent_path=Path.cwd(),
         path=None,  # keep path out of kwargs
         invariant_local_path='dataset',
         #extensions=('xml',),  # not needed right now
         **kwargs):
    # FIXME parent_path and project_id seem like they probably need to
    # be passed here, it would be nice if project_path could just be
    # the current folder and if the xattrs are missing for the
    # project_id then ... it is somehow inject from somewhere else?
    # this doesn't really work, because that would mean that we would
    # have to carry the project id around in the xattr metadata for
    # all dataset folders, which might not be the worst thing, but
    # definitely redundant
    if id is None:
        raise TypeError('id is a required argument!')

    if parent_path is None:
        uuid = id.uuid  # FIXME hardcoded backend assumption
        parent_path = parent_parent_path / uuid
        parent_path.mkdir(exist_ok=True)
    elif not parent_path.exists():
        parent_path.mkdir()

    invariant_path = parent_path / invariant_local_path

    # XXX for now we do these steps in order here
    # rather than trusting that calling simple.pull.main will do
    # the right thing if there is no path ... it should but probably
    # doesn't right now due to assumptions about paths existing

    # remote metadata from path (could do from id too?)
    remote_metadata(id=id, **kwargs)  # could run parallel to clone, easier in bash
    # clone single without organization parent somehow seems likely broken?
    path = clone(id=id,
                 dataset_id=dataset_id,
                 parent_path=parent_path,
                 parent_parent_path=parent_parent_path,
                 **kwargs)  # XXX symlink_objects_to will just work if you pass it

    symlink_latest(path, invariant_path)

    # pull single
    pull(path=path, **kwargs)
    # fetch metadata files
    fetch_metadata_files(path=path, **kwargs)  # FIXME symlink_to
    # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX fetch_metadata_files does NOT USE the extensions kwarg!
    # fetch additional files
    fetch_files(path=path)

    return path


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=print)

Validate

Protocols
Datasets
Extract

Usage example.

python -m sparcur.simple.extract \
--dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \
--export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports
from sparcur import datasets as dat
from sparcur import pipelines as pipes
from sparcur import exceptions as exc
from sparcur.utils import log, logd
from sparcur.paths import Path
from sparcur.backends import PennsieveDatasetData
from sparcur.simple.utils import combinate, multiple, early_failure, DataWrapper
from sparcur.simple.fetch_metadata_files import fetch_prefixes


class ManifestFiles(DataWrapper):
    """ wrapper for manifest files. """


def merge_manifests(vs):
    """ Merge individual manifest records into the same list """
    # FIXME massive hack :/
    k = 'manifest_file'
    # FIXME errors key ... ? is it allowed up there? it shouldn't be ...
    # FIXME {'content': m}
    return ManifestFiles([m for v in vs for m in v.data[k]])


def object_from_find_path(glob_prefix, object_from_path_function, glob_type='glob', onfail=None):
    """ Return a function that will find files that start with glob_prefix"""
    # FIXME should be in utils but depends on fetch_prefixes
    if glob_prefix not in dict(fetch_prefixes):
        raise ValueError('glob_prefix not in fetch_prefixes! '
                         f'{glob_prefix!r} not in {fetch_prefixes}')
    def func(path, *args, **kwargs):
        ds = dat.DatasetStructure(path)
        rpath = None
        for rpath in ds._abstracted_paths(glob_prefix, sandbox=True):
            yield object_from_path_function(rpath, *args, **kwargs)

        if rpath is None and onfail is not None:
            raise onfail(f'No match for {glob_prefix} in {path.name}')

    return func

# file type -> dataset blob key indirection

_TOP = object()  # SIGH SIGH SIGH always need a escape hatch

otkm = {ThingFilePath.obj:prefix + '_file' for prefix, ThingFilePath
        in dat.DatasetStructure.sections.items()}
otkm[ManifestFiles] = 'manifest_file'
otkm[PennsieveDatasetData] = 'remote_dataset_metadata'
otkm[type(dat.DatasetStructure())] = 'structure'  # hack around Pathlib type mangling
otkm[type(dat.DatasetMetadata())] = _TOP

# stream of objects -> place in dataset blob

def dataset_raw(*objects, object_type_key_map=otkm):
    data = {}
    log.debug(objects)
    #path_structure, description, subjects, samples, submission, manifests, *rest = objects
    for obj in objects:
        log.debug(obj)
        key = object_type_key_map[type(obj)]
        try:
            if key is not _TOP:
                data.update({key: obj.data})
            else:
                data.update(obj.data)
        except Exception as e:
            # FIXME current things that leak through
            # MalformedHeaderError
            # something in the objects list is a dict
            breakpoint()
            pass

    return data


# path + version -> python object

# TODO how to attach and validate schemas orthogonally in this setting?
# e.g. so that we can write dataset_1_0_0 dataset_1_2_3 etc.

# we capture version as early as possible in the process, yes we
# could also gather all the files and folders and then pass the version
# in as an argument when we validate their structure, but there are
# elements of the locations or names of those files that might depend
# on the template version, therefore we get maximum flexibility by only
# need to look for the dataset description file
def description(path):          return dat.DatasetDescriptionFilePath(path).object
def submission(path, version):  return dat.SubmissionFilePath(path).object_new(version)
def subjects(path, version):    return dat.SubjectsFilePath(path).object_new(version)
def samples(path, version):     return dat.SamplesFilePath(path).object_new(version)
def manifest(path, version):    return dat.ManifestFilePath(path).object_new(version)

# dataset path -> python object

def from_path_remote_metadata(path): return PennsieveDatasetData(path.cache)
def from_path_local_metadata(path): return dat.DatasetMetadata(path)
from_path_dataset_description = object_from_find_path('dataset_description', description,
                                                      onfail=exc.MissingFileError)

comb_metadata = combinate(
    # dataset description is not included here because it is special
    # see from_path_dataset_raw for details
    from_path_remote_metadata,
    from_path_local_metadata,
)

# dataset path + version -> python object

def from_path_dataset_path_structure(path, version): return dat.DatasetStructure(path)
from_path_subjects   = object_from_find_path('subjects',            subjects)
from_path_samples    = object_from_find_path('samples',             samples)
from_path_submission = object_from_find_path('submission',          submission)
from_path_manifests  = multiple(object_from_find_path('manifest',   manifest,
                                                      'rglob'),
                                merge_manifests)

# combinate all the individual dataset path + version -> data functions

comb_dataset = combinate(
    #from_path_dataset_description,  # must be run prior to combination to get version
    from_path_dataset_path_structure,
    from_path_subjects,
    from_path_samples,
    from_path_submission,
    from_path_manifests,
    #from_path_file_metadata,  # this must wait until 2nd fetch phase
    )

# dataset path -> raw data

def from_path_dataset_raw(dataset_path):
    """ Main entry point for getting dataset metadata from a path. """
    gen  = from_path_dataset_description(dataset_path)
    try:
        ddo = dataset_description_object = next(gen)
    except exc.MissingFileError as e:
        # TODO return a stub with embedded error
        logd.critical(e)
        dataset_blob = dataset_raw(*comb_metadata(dataset_path))
        return early_failure(dataset_path, e, dataset_blob)

    try:
       next(gen)
       # TODO return a stub with embedded error
    except StopIteration:
        pass

    data = ddo.data
    ddod = type('ddod', tuple(), {'data': data})
    dtsv = data['template_schema_version']
    return dataset_raw(ddo, *comb_metadata(dataset_path), *comb_dataset(dataset_path, dtsv))


# unused

def from_path_file_metadata(path, _version):  # FIXME doesn't go in this file probably
    # FIXME this is going to depend on the manifests
    # and a second fetch step where we kind of cheat
    # and prefetch file files we know we will need
    pass


def from_export_path_protocols_io_data(curation_export_json_path): pass
def protocols_io_ids(datasets): pass
def protocols_io_data(protocols_io_ids): pass

def from_group_name_protcur(group_name): pass
def protcur_output(): pass

def summary(datasets, protocols_io_data, protcur_output): pass

def from_path_summary(project_path):
    dataset_path_structure
    summary((
        dataset(
            dataset_path_structure,
            dataset_description,
            subjects,
            samples,
            submission,
            manifests,
            *rest
)))


def main(path=Path.cwd(), id=None, dataset_id=tuple(), time_now=None,
         export_path=None, export_parent_path=None, _entry_point=False, **kwargs):
    # TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output?
    # TODO parallelize if multiple paths
    # This assumes that all retrieve operations have
    # finished successfully for the current dataset
    from sparcur.simple.export import prepare_dataset_export, export_blob
    if id is not None:  # XXX note that id should be dataset_id # TODO parent_path ??
        uuid = id.uuid  # FIXME hardcoded backend assumption
        path = path / uuid / 'dataset'  # FIXME hardcoded see invariant_path
        path = path.resolve()  # make sure that invariant_path is expanded as we expect.

    cache = path.cache
    if not cache.is_dataset():
        raise TypeError('can only run on a single dataset')

    if _entry_point:
        if export_parent_path is None:
            export_parent_path = prepare_dataset_export(path, export_path)

        kwargs.update({'export_path': export_path,
                       'export_parent_path': export_parent_path,
                       'time_now': time_now,})

    dataset_raw = from_path_dataset_raw(path)

    if _entry_point:
        export_blob_path = export_blob(dataset_raw, 'ir.json', **kwargs)
        return export_blob_path
    else:
        return dataset_raw


if __name__ == '__main__':
    import pprint
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=pprint.pprint)

Transform

There is not a clean separation between transformation and validation because there are multiple transformation and validation steps that are interleaved.

from pathlib import Path
from sparcur import schemas as sc
from sparcur import pipelines as pipes
from sparcur.core import DictTransformer as DT


def __apply_step(step, spec, data, **kwargs):
    return step(data, spec, **kwargs)


def popl(data, pops, source_key_optional=False):
    popped = list(DT.pop(data, pops, source_key_optional))
    return data


def simple_add(data, adds):
    pass


def execute_pipeline(pipeline, data):
    for func, *args, kwargs in pipeline:
        # man variable arity is a pain to deal with here
        # where are lambda lists when you need them :/
        # FIXME maybe we can make steps functional instead of mutating?
        if not kwargs:
            kwargs = {}

        func(data, *args, **kwargs)

    return data


def __wd(transformer):  # not needed atm since transformers do in place modification
    def inner(data, *args, **kwargs):
        transformer(data, *args, **kwargs)
        return data


def schema_validate(data, schema, fail=False, pipeline_stage_name=None):
    if isinstance(schema, type):
        # we were passed a class so init it
        # doing it this way makes it easier to
        # use remote schemas that hit the network
        # since the network call doesn't have to
        # happen at the top level but can mutate
        # the class later before we init here
        schema = schema()

    ok, norm_or_error, data = schema.validate(data)
    if not ok:
        if fail:
            logd.error('schema validation has failed and fail=True')
            raise norm_or_error

        if 'errors' not in data:
            data['errors'] = []

        if pipeline_stage_name is None:
            pipeline_stage_name = f'Unknown.checked_by.{schema.__class__.__name__}'

        data['errors'] += norm_or_error.json(pipeline_stage_name)
        # TODO make sure the step is noted even if the schema is the same


simple_moves = (
    [['structure', 'dirs',], ['meta', 'dirs']],  # FIXME not quite right ...
    [['structure', 'files',], ['meta', 'files']],
    [['structure', 'size',], ['meta', 'size']],
    [['remote_dataset_metadata'], ['inputs', 'remote_dataset_metadata']],
    *pipes.SDSPipeline.moves[3:]
)

# function, *args, **kwargs
# functions should accept data as the first argument
core_pipeline = (
    # FIXME validation of dataset_raw is not being done right now
    (DT.copy, pipes.SDSPipeline.copies, dict(source_key_optional=True)),
    (DT.move, simple_moves, dict(source_key_optional=True)),
    # TODO clean has custom behavior
    (popl, pipes.SDSPipeline.cleans, dict(source_key_optional=True)),
    (DT.derive, pipes.SDSPipeline.derives, dict(source_key_optional=True)),
    #(DT.add, pipes.SDSPipeline.adds),  # FIXME lifter issues
    (schema_validate, sc.DatasetOutSchema, None),
    # extras (missing)
    # xml files
    # contributors
    # submission
    (DT.copy, pipes.PipelineExtras.copies, dict(source_key_optional=True)),
    # TODO clean has custom behavior
    (DT.update, pipes.PipelineExtras.updates, dict(source_key_optional=True)),
    (DT.derive, pipes.PipelineExtras.derives, dict(source_key_optional=True)),
    #(DT.add, pipes.PipelineExtras.adds),  # TODO and lots of evil custom behavior here
    # TODO filter_failures
    (schema_validate, sc.DatasetOutSchema, None),
    (pipes.PipelineEnd._indexes, None),  # FIXME this is conditional and in adds
    # TODO pipeline end has a bunch of stuff in here
    (schema_validate, sc.PostSchema, dict(fail=True)),
)


def main(path=Path.cwd(), path_dataset_raw=None, dataset_raw=None, **kwargs):
    # FIXME TODO need to follow the behavior of main in extract
    if dataset_raw is None:
        if path_dataset_raw is None:
            cache = path.cache
            if not cache.is_dataset():
                raise TypeError('can only run on a single dataset')
            from sparcur.simple.extract import main as extract
            dataset_raw = extract(path)
        else:
            from sparcur.utils import path_ir
            dataset_raw = path_ir(path_dataset_raw)

    data = execute_pipeline(core_pipeline, dataset_raw)
    breakpoint()


if __name__ == '__main__':
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=print)
Network resources
Dataset Path Metadata

This is a bit out of order since validation is run after an initial export.

# FIXME this is not really doing what we want yet
from sparcur.paths import Path
from sparcur.simple import path_metadata


def from_path_validated_json_metadata(path):
    tm = path_metadata.from_path_transitive_metadata(path)
    from_blob_validated_json_metadata(tm)
    return tm


def from_blob_validated_json_metadata(blob):
    """ Mutates in place. """
    Path.validate_path_json_metadata(blob)
    # perferred
    # accepted
    # banned
    # known
    # unknown


def main(_entry_point=False, **kwargs):
    # FIXME we want to be able to accept
    # --dataset-id, <json-file-to-validate>, and some other things probably?
    return path_metadata.main(validate=True, _entry_point=_entry_point, **kwargs)
    if export_path:
        from_blob_validate_path_json_metadata
    else:
        from_path_validate_path_json_metadata(path)


if __name__ == '__main__':
    #import pprint
    from sparcur.simple.utils import pipe_main
    pipe_main(main)#, after=pprint.pprint)


Export

Setup for per-dataset export
import json
from socket import gethostname
import augpathlib as aug
from pyontutils.utils import timeformat_friendly
import sparcur
from sparcur.core import JEncode
from sparcur.paths import Path
from sparcur.utils import loge, symlink_latest
from sparcur.config import auth


def prepare_dataset_export(path, export_path=None):  # FIXME do we need export_base?
    if export_path is None:  # FIXME confusing and breaks w/ convention -> Options maybe?
        export_path = Path(auth.get_path('export-path'))  # FIXME allow alt?

    from sparcur.utils import PennsieveId
    identifier = PennsieveId(path.cache.id)
    uuid = identifier.uuid
    # we don't use cache._fs_safe_id here because we know the
    # identifier type from the folder structure
    # FIXME dataset metadata export setup basically needs to do all of this first
    # set latest run and then latest complete at the end, but complete is kind of arbitrary
    # from the atomic point of view
    tupdated = path.updated_cache_transitive()  # FIXME this causes us to traverse all files twice
    # XXX TODO I think that the dataset updated date is now transitive as well? though the dataset
    # updated timestamp seems to happen a bit before the created/updated date on the file itself?
    # FIXME somehow tupdated can be None !??!?! XXX yes, it happens on empty sparse datasets
    export_dataset_folder = export_path / 'datasets' / uuid
    export_parent = export_dataset_folder / timeformat_friendly(tupdated)
    if not export_parent.exists():
        export_parent.mkdir(parents=True)

    export_latest_run = export_dataset_folder / 'LATEST_RUN'
    symlink_latest(export_parent, export_latest_run)
    # FIXME need to symlink to LATEST
    return export_parent


def export_blob(blob, blob_file_name, time_now=None,
                export_path=None, export_parent_path=None, **kwargs):
    if export_parent_path is None:
        # NOTE if export_parent_path is not None then it is up to the user
        # to make sure that the contents of the dataset directory do not change
        # resulting in confusion about mismatched transitive update dates
        export_parent_path = prepare_dataset_export(path, export_path)
    elif not export_parent_path.exists():
        # safe to mkdir here since outside has a record of the path name
        export_parent_path.mkdir(parents=True)

    export_blob_path = export_parent_path / blob_file_name

    add_prov(blob, time_now)

    with open(export_blob_path, 'wt') as f:
        json.dump(blob, f, indent=2, cls=JEncode)

    loge.info(f'path metadata exported to {export_blob_path}')
    return export_blob_path


def add_prov(blob, time_now):
    """ Mutate blob in place to add prov. """
    # FIXME this will klobber cases where prov was embedded by the pipelines
    blob['prov'] = {'timestamp_export_start': time_now.START_TIMESTAMP,
                    'export_system_identifier': Path.sysid,
                    'export_hostname': gethostname(),
                    'sparcur_version': sparcur.__version__,
                    'sparcur_internal_version': sparcur.__internal_version__,
                    }
    rp = aug.RepoPath(sparcur.core.__file__)
    if rp.working_dir is not None:
        blob['prov']['sparcur_commit'] = rp.repo.active_branch.commit.hexsha


def main(path=Path.cwd(), export_path=None, **kwargs):
    return prepare_dataset_export(path, export_path=export_path)


if __name__ == '__main__':
    import pprint
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=pprint.pprint)

Dataset Path Metadata

In principle this could run as part of the dataset metadata export, however since it produces separate files and can run at the same time, it is its own module.

Usage example

find  -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -exec python -m sparcur.simple.path_metadata {} \;

xargs variant.

find  -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -print0 | \
xargs -0 -I{} -P8 -r -n 1 python -m sparcur.simple.path_metadata {}

Alternate example

python -m sparcur.simple.path_metadata \
21957eae-0824-4fb5-b18f-04d6ed12ce18/dataset \
--export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports/
from pathlib import Path


def from_path_transitive_metadata(path):
    tml = path._transitive_metadata()
    # FIXME TODO handle sparse cases
    return {'type': 'path-metadata',
            'data': tml,}


def main(path=Path.cwd(), id=None, time_now=None,
         parent_path=None, invariant_local_path='dataset',
         parent_parent_path=Path.cwd(),
         export_local=False, export_path=None, export_parent_path=None,
         _entry_point=False, validate=False, **kwargs):
    from sparcur.paths import Path
    from sparcur.simple.export import prepare_dataset_export, export_blob

    if path == Path.cwd() and (id is not None or parent_path is not None):
        if parent_path is None:
            uuid = id.uuid
            parent_path = parent_parent_path / uuid

        invariant_path = parent_path / invariant_local_path
        path = invariant_path.expanduser().resolve()
    else:
        parent_parent_path = None

    # TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output?
    # TODO parallelize if multiple paths
    # This assumes that all retrieve operations have
    # finished successfully for the current dataset
    # FIXME Options calls resolve on all paths but not if Path.cwd slips through ...
    path = Path(path)  # FIXME even here some paths don't have caches ?!
    cache = path.cache  # XXX this should have errored when Path was applied below !?!?!??! pipe_main wat ???
    if not cache.is_dataset():
        raise TypeError('can only run on a single dataset')

    if _entry_point:
        if export_parent_path is None:
            export_parent_path = prepare_dataset_export(path, export_path)

        kwargs.update({'export_path': export_path,
                       'export_parent_path': export_parent_path,
                       'time_now': time_now,})

    tm = from_path_transitive_metadata(path)
    # FIXME TODO validate file formats, which means this also needs to support the remoteless case
    # FIXME TODO process metadata for each timepoint when things enter should go in prov I think
    #    or we need to be collecting prov along the way, we don't have an overseer or conductor
    #    so we can't keep everything embedded
    # FIXME TODO embed the transitive cache updated value that is used in prepare_dataset_export
    if validate:  # FIXME raw vs validated and FIXME pipeline
        from sparcur import schemas as sc
        from sparcur.simple.transform import schema_validate
        Path.validate_path_json_metadata(tm)
        schema_validate(tm, sc.PathTransitiveMetadataSchema)

    if _entry_point:
        export_blob_path = export_blob(tm, 'path-metadata.json', **kwargs)  # FIXME naming for raw vs validated
        return export_blob_path
    else:
        return tm


if __name__ == '__main__':
    #import pprint
    from sparcur.simple.utils import pipe_main
    pipe_main(main)#, after=pprint.pprint)

Combine

This is equivalent to creating objects that match the summary schema for legacy organization level curation-export.json files.

FIXME it might be the case that individual datasets were exported under different schemas, which means that there would no longer be a single consistent schema for checking the merge, this is why we sometimes need to rerun all datasets when there is a schema change even if ther is not a schema change it very well may be the case that indvidiual datasets will have been run on different versions of the export pipeline, and slightly different images

import json
import rdflib
from pathlib import Path
from dateutil import parser as dateparser
from pyontutils.core import OntResPath
from pyontutils.utils import TZLOCAL, timeformat_friendly
from pyontutils.namespaces import TEMP, rdf, sparc
#from sparcur.utils import fromJson, register_all_types  # FIXME this should also go in sparcron
from sparcur.export.triples import TriplesExportSummary
from sparcur.export.published import _merge_graphs
from sparcur.simple.export import add_prov

tu = 'timestamp_updated'
tuc = 'timestamp_updated_contents'
ip = 'inputs'
rmk = 'remote_dataset_metadata'


class TriplesExportSummaryPublished(TriplesExportSummary):

    @property
    def ontid(self):
        return rdflib.URIRef(super().ontid.replace('ontologies/', 'ontologies/published/'))

    @property
    def header_label(self):
        return rdflib.Literal(f'{self.folder_name} curation export published graph')


def max_dataset_or_contents_updated(datasets_list):
    return max(set.union(
        set([a['meta'][tuc] for a in datasets_list
            if tuc in a['meta'] and a['meta'][tuc]]),
        set([a['meta'][tu] for a in datasets_list
            if tu in a['meta'] and a['meta'][tu]])))


def from_dataset_export_path_snapshot(dataset_export_path, snapshots_path, time_now):
    derefs = [l.resolve() for l in [c / 'LATEST' for c in dataset_export_path.children] if l.exists()]
    snapshot_path = snapshots_path / time_now.START_TIMESTAMP_LOCAL_FRIENDLY
    snapshot_path.mkdir(parents=True)
    [(snapshot_path / d.parts[-2]).symlink_to((snapshot_path / 'asdf').relative_path_to(d)) for d in derefs]
    return snapshot_path


def from_snapshot_path_datasets_lists(snapshot_path):
    alld = []
    pubd = []
    for uuid_path in snapshot_path.children:
        with open(uuid_path / 'curation-export.json', 'rt') as f:
            j = json.load(f)
            # TODO validate the load XXX this should probably
            # happen as a final roundtrip check during export
            # TODO filter by organization
            alld.append(j)
            if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]:
                pubd.append(j)

    return alld, pubd


def from_snapshot_path_summary_json(snapshot_path, project_id, time_now):
    l_all, l_pub = from_snapshot_path_datasets_lists(snapshot_path)
    sum_all = from_datasets_list_summary_json(l_all, project_id, time_now)
    sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now)
    return sum_all, sum_pub


def from_snapshot_path_summary_ttl(snapshot_path, project_id, time_now, blob):
    tes = TriplesExportSummary(blob)
    tesp = TriplesExportSummaryPublished(blob)

    orps = [OntResPath(uuid_path / 'curation-export.ttl')
            for uuid_path in sorted(snapshot_path.children, key=lambda p: p.name)]

    graphs_all = [o.graph for o in orps]
    graphs_pub = [
        g for g, doi in [(g, list(g[ds:TEMP.hasDoi]))
                         for g in graphs_all
                         for ds in g[:rdf.type:sparc.Dataset]]
        if doi]
    merged_all = _merge_graphs(graphs_all)
    merged_all.populate_from_triples(tes.triples_header)
    merged_pub = _merge_graphs(graphs_pub)
    merged_pub.populate_from_triples(tesp.triples_header)
    return merged_all, merged_pub, graphs_all, graphs_pub

def from_snapshot_path_summary_ttl_BAD(snapshot_path, project_id, time_now, blob):
    # this variant is too complex, trying to reuse the published graph as the all graph
    # and the implementation of the OntConjunctiveGraph is not far enough along to do it
    tes = TriplesExportSummary(blob)  # FIXME nasty annoying dep
    graph_meta = OntGraph()
    graph_meta.populate_from_triples(tes._triples_header(tes.ontid, time_now._start_time))
    rev_replace_pairs = _fix_for_pub(graph_meta, graph_meta)
    replace_pairs = tuple([(b, a) for a, b in rev_replace_pairs])

    orps = [OntResPath(uuid_path / 'curation_export.ttl')
            for uuid_path in snapshot_path.children]

    graphs = [o.graph for o in orps]
    # FIXME should use id_published here as well, but that isn't being
    # propagated at the moment
    graphs_pub = []
    graphs_nop = []
    for g, doi in [(g, list(g[ds:TEMP.hasDoi]))
                   for g in graphs for ds in g[:rdf.type:sparc.Dataset]]:
        if doi:
            graphs_pub.append(g)
        else:
            graphs_nop.add(g)
    graph_pub = _merge_graphs(published_graphs)
    graph_pub.populate_from(graph_meta)
    # FIXME this is manually aligned with TriplesExport.triples_header
    graph_pub.asdf
    for g in graphs_nop:
        graph_pub.namespace_manager.populate_from(
            {k:v for k, v in dict(g.namespace_manager).items()
            if k not in ('contributor', 'sample', 'subject')})

    ttl_all = None
    ttl_pub = _populate_published(curation_export, graph)


def from_dataset_export_path_datasets_lists(dataset_export_path):
    dep = dataset_export_path
    alld = []
    pubd = []
    derefs = [l.resolve() for l in [c / 'LATEST' for c in dep.children] if l.exists()]
    # TODO consider creating a folder that is just symlinks before this
    for lp in sorted(derefs, key=lambda p: p.name):
        with open(lp / 'curation-export.json', 'rt') as f:
            j = json.load(f)
            # TODO validate the load XXX this should probably
            # happen as a final roundtrip check during export
            # TODO filter by organization
            alld.append(j)
            if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]:
                pubd.append(j)

    return alld, pubd


def from_datasets_list_summary_json(datasets_list, project_id, time_now):
    # XXX FIXME issue with datasets from multiple projects
    fn = Path(datasets_list[0]['prov']['export_project_path']).name
    out = {
        'id': project_id.id,
        'meta': {
            'count': len(datasets_list),
            'folder_name': fn,  # WHAT A RELIEF we don't need network
            'uri_api': project_id.uri_api,
            'uri_human': project_id.uri_human(),
        },
        'datasets': datasets_list,
    }
    add_prov(out, time_now)
    # some potential prov summaries, but lets not do that here
    # most prov stats should be on the single dataset level
    #'export_timestamp_start_min': min(tes),
    #'export_timestamp_start_max': max(tes),
    return out


def from_dataset_export_path_summary_json(dataset_export_path, project_id, time_now):
    l_all, l_pub = from_dataset_export_path_datasets_lists(dataset_export_path)
    #[a['meta']['timestamp_updated'] < a['meta']['timestamp_updated_contents']
    #for a in l_all if a['meta']['timestamp_updated_contents']]
    sum_all = from_datasets_list_summary_json(l_all, project_id, time_now)
    sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now)
    return sum_all, sum_pub


def main(project_id=None, export_path=None, time_now=None,
         project_id_auth_var='remote-organization',  # FIXME move to clifun
         disco=False, **kwargs):
    from sparcur.paths import Path
    from sparcur.config import auth

    if project_id is None:
        from sparcur.config import auth
        from sparcur.utils import PennsieveId
        project_id = auth.get(project_id_auth_var)
        project_id = PennsieveId(project_id)  # FIXME abstract the hardcoded backend

    if export_path is None:  # XXX see issues mentioned above
        export_path = Path(auth.get_path('export-path'))

    dataset_export_path = export_path / 'datasets'
    snapshots_path = export_path / 'snapshots'
    snapshot_path = from_dataset_export_path_snapshot(
        dataset_export_path, snapshots_path, time_now)
    sum_all, sum_pub = from_snapshot_path_summary_json(
        snapshot_path, project_id, time_now)
    # write symlink LATEST_PARTIAL
    ttl_all, ttl_pub, graphs_all, graphs_pub = from_snapshot_path_summary_ttl(
        snapshot_path, project_id, time_now, sum_all)
    # write symlink LATEST
    maxdt = max_dataset_or_contents_updated(sum_all['datasets'])
    dt_maxdt = dateparser.parse(maxdt)
    dt_maxdt_local = dt_maxdt.astimezone(TZLOCAL())
    friendly_maxdt_local = timeformat_friendly(dt_maxdt_local)
    # FIXME there are some bad assumptions in here that should be refactored out
    # at some point, but for now we implicitly assume that all datasets come from
    # the same organization, which can easily be violated because we don't check 
    # however the existing internal schema requires an id for the summary which is
    # currenty the organization id
    # FIXME summary is a hardcoded path
    # XXX WARNING it is possible to overwrite since maxdt might not change between runs
    # this is desirable behavior for development, but could cause issues in other cases
    pexpath = export_path / 'summary' / project_id.uuid
    latest = pexpath / 'LATEST'
    npath = pexpath / friendly_maxdt_local
    snapshot_link = npath / 'snapshot'
    if not npath.exists():
        npath.mkdir(parents=True)
    else:
        # FIXME not sure if this makes sense?
        if snapshot_link.is_symlink():
            snapshot_link.unlink()

    snapshot_link.symlink_to(snapshot_link.relative_path_to(snapshot_path))

    npath_ce = npath / 'curation-export.json'
    npath_cep = npath / 'curation-export-published.json'
    for path, blob in ((npath_ce, sum_all),
                       (npath_cep, sum_pub)):
        with open(path, 'wt') as f:
            json.dump(blob, f, indent=2)

    npath_ttl = npath / 'curation-export.ttl'
    npath_ttlp = npath / 'curation-export-published.ttl'
    ttl_all.write(npath_ttl)
    ttl_pub.write(npath_ttlp)

    if disco:
        # export xml and tsv for disco
        from sparcur.simple.disco import from_curation_export_json_path_datasets_json_xml_and_disco
        from_curation_export_json_path_datasets_json_xml_and_disco(
            npath_ce, sum_all['datasets'], graphs_all)

    if latest.is_symlink():
        latest.unlink()

    latest.symlink_to(friendly_maxdt_local)

    return npath_ce, sum_all, sum_pub, graphs_all, graphs_pub


if __name__ == '__main__':
    #import pprint
    from sparcur.simple.utils import pipe_main
    # these are really big, don't print them
    # pipe_main(main, after=pprint.pprint)
    pipe_main(main)

disco

Legacy export for disco.

import json
from pyontutils.core import OntGraph
from sparcur import export as ex
from sparcur.utils import PennsieveId
from sparcur.utils import fromJson, register_all_types  # FIXME this should also go in sparcron


def from_curation_export_json_path_xml_and_disco(curation_export_json_path):
    with open(curation_export_json_path, 'rt') as f:
        summary = json.load(f)

    datasets_json = summary['datasets']
    from_curation_export_json_path_datasets_json_xml_and_disco(
        curation_export_json_path, datasets_json)


def from_curation_export_json_path_datasets_json_xml_and_disco(
        curation_export_json_path, datasets_json, graphs=None):
    # FIXME need the snapshot linked somehow, export time started if we are using summary
    # or summary prov timestamp_export_start will give us the snapshot path as well if we
    # parse it back to a date
    if not graphs:
        snapshot_path = curation_export_json_path.parent / 'snapshot'
        paths = [(snapshot_path /
                  PennsieveId(d['id']).uuid /
                  'curation-export.ttl')
                 for d in datasets_json]
        graphs = [OntGraph().parse(path) for path in paths]

    datasets_ir = fromJson(datasets_json)
    ex.export_xml(curation_export_json_path, datasets_ir)
    ex.export_disco(curation_export_json_path, datasets_ir, graphs)
    # XXX not doing jsonld here, it will be combined
    # from single dataset jsonld or similar


def main(path=None, **kwargs):
    register_all_types()
    # FIXME the problem with this approach is that can cannot run
    # multiple downstream steps from the same upstream step, we would
    # need a compositional way to tell each downstream which upstreams
    # we wanted to run in any given situation, all to save additional
    # reads from disk
    if path is None:  # assume the user wants to run combine first
        from sparcur.simple.combine import main as combine
        curation_export_json_path, summary_all, _, graphs_all, _ = combine(**kwargs)
        datasets_json = summary_all['datasets']
        from_curation_export_json_path_datasets_json_xml_and_disco(
            curation_export_json_path, datasets_json, graphs)
    else:
        curation_export_json_path = path
        from_curation_export_json_path_xml_and_disco(curation_export_json_path)


if __name__ == '__main__':
    #import pprint
    from sparcur.simple.utils import pipe_main
    # these are really big, don't print them
    # pipe_main(main, after=pprint.pprint)
    pipe_main(main)

compact

find all export dataset folders that are older than the current snapshot and are not in a snapshot that is in a summary compact them

"""
Examples:
    # look at the second one and then run the first one since it is easier and safer to stop and resume
    # python -m sparcur.simple.compact | xargs -P4 -r -I{} sh -c 'tar -cvJf "{}.tar.xz" "{}" && rm -r "{}"'
    python -m sparcur.simple.compact | xargs -P12 -r -I{} echo tar -cvJf '{}.tar.xz' '{}'
    # python -m sparcur.simple.compact | xargs -P6 -r -I{} echo rm -r '{}'
"""
from sparcur.paths import Path
from sparcur.config import auth


__dep_cache = {}
def latest_snapped(dataset_export_path, snapped):
    if dataset_export_path not in __dep_cache:
        cs = set(c for c in dataset_export_path.children if c.is_dir() and not c.is_symlink())
        csnap = cs.intersection(snapped)
        if not csnap:  # no snap, pretend that latest is snapped
            # this can happen if there is no LATEST because we
            # were e.g. just exporting path metadata and nothing else
            maxsnap = sorted(cs, key=lambda p: p.name)[-1]
        else:
            maxsnap = sorted(csnap, key=lambda p: p.name)[-1]

        __dep_cache[dataset_export_path] = maxsnap.name

    return __dep_cache[dataset_export_path]


def main():
    export_path = Path(auth.get_path('export-path'))
    summary_path = export_path / 'summary'
    snapshots_path = export_path / 'snapshots'
    datasets_path = export_path / 'datasets'

    snap_shotted = [
        dss.resolve()
        for d in summary_path.rchildren_dirs
        for l in d.rchildren if l.is_symlink() and l.name == 'snapshot'
        for dss in l.resolve().children]

    snapped = set(snap_shotted)

    latest_sums = [
        d.resolve()
        for c in summary_path.children
        for d in (c / 'LATEST',) if d.exists()]

    all_builds = [
        build
        for date in datasets_path.children if date.is_dir() and not date.is_symlink()
        for build in date.children if build.is_dir() and not build.is_symlink()]

    older_not_snap = [
        a for a in all_builds
        if a not in snapped and a.name < latest_snapped(a.parent, snapped)]

    assert not snapped.intersection(older_not_snap)

    # newer = set(all_builds) - snapped - set(older_not_snap)

    _ = [print(p) for p in older_not_snap]


if __name__ == '__main__':
    main()

Utility

Init

Utils

from sparcur.config import auth
__doc__ = f"""Common command line options for all sparcur.simple modules
Usage:
    sparcur-simple manifest  [options]  [<path>...]
    sparcur-simple get-uuid  <remote-id>...
    sparcur-simple datasets
    sparcur-simple for-racket
    sparcur-simple check-names
    sparcur-simple git-repos [update] [options]
    sparcur-simple [options] [<path>...]
    sparcur-simple [options] [--dataset-id=<ID>...]
    sparcur-simple [options] [--extension=<EXT>...] [<path>...]

Commands:
    manifest                        generate manifest files for path
    for-racket                      print data for reading into Racket

Options:
    -h --help                       show this

    --hypothesis-group-name=NAME    the hypotheis group name

    --project-id=ID                 the project id
    --dataset-id=<ID>...            one or more datset ids
    --project-id-auth-var=VAR       name of the auth variable holding the project-id

    --project-path=PATH             the project path, will be path if <path>... is empty
    --parent-path=PATH              the parent path where the project will be cloned to
    --parent-parent-path=PATH       parent in which a random tempdir is generated
                                    or the dataset uuid is used as the parent path, don't use this ...
    --invariant-local-path=PATH     path relative to parent path for dataset [default: dataset]
    --export-local                  set export-parent-path to {{:parent-path}}/exports/
    --export-path=PATH              base export path containing the standard path structure [default: {auth.get_path('export-path')}]
    --cleaned-parent-path=PATH      direct parent path into which cleaned paths will be placed
    --cleaned-path=PATH             base cleaned path containing the standard path structure [default: {auth.get_path('cleaned-path')}]
    --export-parent-path=PATH       direct parent path into which exports will be placed
    --extension=<EXT>...            one or more file extensions to fetch

    -j --jobs=N                     number joblib jobs [default: 12]
    --exclude-uploaded              do not pull files from remote marked as uploaded
    --sparse-limit=N                package count that forces a sparse pull [default: {auth.get('sparse-limit')}]
    --symlink-objects-to=PATH       path to an existing objects directory
    --log-level=LEVEL               log level info [default: INFO]
    --open=PROGRAM                  show file with PROGRAM
    --show                          show file with xopen
    --pretend                       show what would be done for update
"""

import os
import sys
from types import GeneratorType
from pyontutils import clifun as clif
from sparcur import exceptions as exc
from sparcur.utils import _find_command
from sparcur.utils import log, logd, loge, GetTimeNow, PennsieveId
from sparcur.paths import Path, PennsieveCache
from sparcur.backends import PennsieveRemote


def backend_pennsieve(project_id=None, Local=Path, Cache=PennsieveCache):  # (def_babf)
    """return a configured pennsieve backend
        calling this is sufficient to get everything set up correclty

        You must call RemotePath.init(project_id) before using
        RemotePath.  Passing the project_id argument to this function
        will do that for you. It is not required because there are
        cases where the project_id may not be known at the time that
        this function is called. """

    RemotePath = PennsieveRemote._new(Local, Cache)

    if project_id is not None:
        RemotePath.init(project_id)

    return RemotePath


class Options(clif.Options):

    @property
    def id(self):
        # dataset_id has priority since project_id can occure without a
        # dataset_id, but dataset_id may sometimes come with a project_id
        # in which case the dataset_id needs priority for functions that
        # branch on the most granular identifier type provided
        return (self.dataset_id[0]
                if self.dataset_id else
                (self.project_id
                 if self.project_id else
                 None))

    @property
    def project_id(self):
        if not hasattr(self, '_cache_project_id'):
            id = self._args['--project-id']
            if id is not None:
                identifier = PennsieveId(id, type='organization')
                self._cache_project_id = identifier
            else:
                return

        return self._cache_project_id

    @property
    def dataset_id(self):
        if not hasattr(self, '_cache_dataset_id'):
            ids = self._args['--dataset-id']
            if ids:
                identifiers = [PennsieveId(id, type='dataset') for id in ids]
                self._cache_dataset_id = identifiers
            else:
                return ids

        return self._cache_dataset_id

    @property
    def remote_id(self):
        if not hasattr(self, '_cache_remote_id'):
            ids = self._args['<remote-id>']
            if ids:
                identifiers = [PennsieveId(id) for id in ids]
                self._cache_remote_id = identifiers
            else:
                return ids

        return self._cache_remote_id

    @property
    def jobs(self):
        return int(self._args['--jobs'])

    n_jobs = jobs  # match internal kwargs conventions

    @property
    def paths(self):
        return [Path(p).expanduser().resolve() for p in self._args['<path>']]

    @property
    def path(self):
        paths = self.paths
        if paths:
            return paths[0]
        elif self.project_path:
            return self.project_path
        else:
            # if no paths were listed default to cwd
            # consistent with how the default kwargs
            # are set on a number of mains
            # this is preferable to allow path=None
            # to be overwritten by the conventions of
            # individual pipeline mains
            return Path.cwd()

    @property
    def project_path(self):
        pp = self._args['--project-path']
        if pp:
            return Path(pp).expanduser().resolve()

    @property
    def parent_parent_path(self):
        ppp = self._args['--parent-parent-path']
        if ppp:
            return Path(ppp).expanduser().resolve()
        else:
            return Path.cwd()

    @property
    def parent_path(self):
        pap = self._args['--parent-path']
        did = self.dataset_id
        if pap:
            return Path(pap).expanduser().resolve()
        elif did:
            id = self.id  # dataset_id is a list so use id which handles that
            uuid = id.uuid
            return (self.parent_parent_path / uuid).expanduser().resolve()

    @property
    def export_path(self):
        ep = self._args['--export-path']
        epp = self.export_parent_path
        if ep and epp:
            raise TypeError('Only one of --export-path and --export-parent-path may be set.')
        elif ep:
            return Path(ep).expanduser().resolve()
        else:
            raise Exception('should not happen')

    @property
    def export_parent_path(self):
        epp = self._args['--export-parent-path']
        el = self.export_local
        pap = self.parent_path
        if epp and el:
            raise TypeError('Only one of --export-local and --export-parent-path may be set.')
        elif epp:
            return Path(epp).expanduser().resolve()
        elif el and pap:
            # it is ok to do this here becuase the TypeError above prevents
            # the case where both epp and el are set, so even though epp
            # is no longer always what was set on the command line, it is
            # it is the case that there won't be conflicting sources
            return pap / 'exports'

    @property
    def cleaned_path(self):
        cp = self._args['--cleaned-path']
        cpp = self.export_parent_path
        if cp and cpp:
            raise TypeError('Only one of --cleaned-path and --cleaned-parent-path may be set.')
        elif cp:
            return Path(cp).expanduser().resolve()
        else:
            raise Exception('should not happen')

    @property
    def cleaned_parent_path(self):
        cpp = self._args['--cleaned-parent-path']
        if cpp:
            return Path(cpp).expanduser().resolve()

    @property
    def extensions(self):
        return self.extension

    @property
    def symlink_objects_to(self):
        slot = self._args['--symlink-objects-to']
        if slot:
            return Path(slot).expanduser()

    @property
    def sparse_limit(self):  # FIXME not being pulled in by asKwargs ??
        return int(self._args['--sparse-limit'])

    @property
    def time_now(self):  # FIXME make it possible to pass this in?
        if not hasattr(self, '_time_now'):
            self._time_now = GetTimeNow()

        return self._time_now

    @property
    def log_level(self):  # FIXME not being pulled in by asKwargs ??
        ll = self._args['--log-level']
        if ll.isdigit() or ll[0] == '-' and ll[1:].isdigit():
            return int(ll)
        else:
            return ll


def pipe_main(main, after=None, argv=None):
    options, args, defaults = Options.setup(__doc__, argv=argv)
    # _entry_point is used as a way to switch behavior when a
    # pipeline main is run directly or actually in a pipeline
    try:
        log.setLevel(options.log_level)
        logd.setLevel(options.log_level)
        loge.setLevel(options.log_level)
        out = main(_entry_point=True, **options.asKwargs())
    except Exception as e:
        log.exception(e)
        log.error(options.path)
        raise e

    if after:
        after(out)

    return out


def rglob(path, pattern):
    """ Hack around the absurd slowness of python's rglob """

    if sys.platform == 'win32':
        log.warning('no findutils on windows, watch out for unexpected files')
        return list(path.rglob(pattern))

    doig = (hasattr(path, 'cache') and
            path.cache and
            path.cache.cache_ignore)
    exclude = ' '.join([f"-not -path './{p}*'" for p in path.cache.cache_ignore]) if doig else ''
    command = f"""{_find_command} {exclude} -name {pattern!r}"""

    with path:
        with os.popen(command) as p:
            string = p.read()

    path_strings = string.split('\n')  # XXX posix path names can contain newlines
    paths = [path / s for s in path_strings if s]
    return paths


def _fetch(cache):  # sigh joblib multiprocessing pickling
    # lambda functions are great right up until you have to handle an
    # error function inside of them ... thanks python for yet another
    # failure to be homogenous >_<
    meta = cache.meta
    try:
        size_mb = meta.size.mb
    except AttributeError as e:
        if meta.errors:
            logd.debug(f'remote errors {meta.errors} for {cache!r}')
            return
        else:
            raise exc.StopTheWorld(cache) from e

    return cache.fetch(size_limit_mb=size_mb + 1)  # FIXME somehow this is not working !?


def _fetch_path(path):  # sigh joblib multiprocessing pickling
    path = Path(path)
    cache = path.cache
    if cache is None:
        raise exc.NoCachedMetadataError(path)

    # do not return to avoid cost of serialization back to the control process
    _fetch(cache)


def fetch_paths_parallel(paths, n_jobs=12, use_async=True):
    if n_jobs <= 1:
        [_fetch_path(path) for path in paths]
    elif use_async:
        from pyontutils.utils import Async, deferred
        Async()(deferred(_fetch_path)(path) for path in paths)
    else:
        import pathlib
        from joblib import Parallel, delayed
        backend = 'multiprocessing' if hasattr(sys, 'pypy_version_info') else None
        # FIXME FIXME FIXME somehow this can result in samples.xlsx being fetched to subjects.xlsx !?!??!!
        # XXX either a race condition on our end or something insane from the remote
        Parallel(n_jobs=n_jobs, backend=backend)(delayed(_fetch_path)(pathlib.Path(path)) for path in paths)
        #Parallel(n_jobs=n_jobs)(delayed(fetch_path)(path) for path in paths)


def combinate(*functions):
    def combinator(*args, **kwargs):
        for f in functions:
            # NOTE no error handling is needed here
            # in no cases should the construction of
            # the python object version of a path fail
            # all failures should happen _after_ construction
            # the way we have implemented this they fail when
            # the data attribute is accessed
            obj = f(*args, **kwargs)
            if isinstance(obj, GeneratorType):
                yield from obj
                # FIXME last one wins, vs yield tuple vs ...?
                # FIXME manifests are completely broken for this
            else:
                yield obj

    return combinator


def multiple(func, merge=None):
    """ combine multiple results """
    def express(*args, **kwargs):
        vs = tuple(func(*args, **kwargs))
        if merge is not None:
            yield merge(vs)
        else:
            yield vs

    return express


def early_failure(path, error, dataset_blob=None):
    # these are the 9999 5555 and 4444 errors
    # TODO match the minimal schema reqs as
    # we did in pipelines
    if dataset_blob is None:
        cache = path.cache
        return {'id': cache.id,
                'meta': {'uri_api': cache.uri_api,
                         'uri_human': cache.uri_human,},
                #'status': 'early_failure',  # XXX note that status is not requried
                # if we cannot compute it, but if there are inputs there should be
                # a status
                'errors': [error],  # FIXME format errro
                }

    else:
        if 'errors' not in datset_blob:
            dataset_blob['errors'] = []

        datset_blob['errors'].append(error)
        return dataset_blob


class DataWrapper:
    # sigh patterns are stupid, move this to elsewhere so it doesn't taint everything
    def __init__(self, data):
        self.data = data


def main(id=None, **kwargs):
    def ik(key):
        return key in kwargs and kwargs[key]

    if id is not None:
        print(id.uuid)

    if ik('get_uuid'):
        for id in kwargs['remote_id']:
            print(id.uuid)

        return

    if (ik('datasets') or ik('for_racket') or ik('check_names')):
        log.setLevel(60)  # logging.CRITICAL = 50
        from sparcur.config import auth
        from sparcur.simple.utils import backend_pennsieve
        if ik('project_id'):
            pass  # project_id from command line
        else:
            project_id = auth.get('remote-organization')

        PennsieveRemote = backend_pennsieve(project_id)
        root = PennsieveRemote(project_id)
        datasets = list(root.children)

    if ik('datasets'):
        print('\n'.join([d.id for d in datasets]))

    if ik('for_racket'):
        import json
        _dsmeta = '\n'.join([f"({json.dumps(d.id)} {json.dumps(d.name)})"
                             for d in datasets])
        dsmeta = f"({_dsmeta})"
        # lab pi last name should be cached in some other way
        print(dsmeta)

    if ik('check_names'):
        # you will want to run sparcur.simple.fetch_remote_metadata_all
        from pathlib import PurePosixPath
        def report(pid, exp, pub):
            pname = pub['name']
            name_mismatch = (
                False if exp['basename'] == pname
                else (exp['basename'], pname))
            # [6:] to strip files/
            ppname = PurePosixPath(pub['path']).name
            pathname_mismatch = (
                False if exp['basename'] == ppname
                else (exp['basename'], ppname))

            eppp = PurePosixPath(exp['dataset_relative_path']).parent.as_posix()
            pppp = PurePosixPath(pub['path'][6:]).parent.as_posix()
            parent_path_mismatch = (
                False if eppp == pppp
                else (eppp, pppp))

            # once we fix things on our end names should match
            # parent paths should match
            # name and pathname might match but do not have to match
            pid = f'  {pid}'
            nsp = '\n        '
            if name_mismatch:
                if pathname_mismatch and pname != ppname:
                    return (f'{pid}    name mismatch and pathname mismatch                '
                            f'{nsp}{nsp.join(name_mismatch)}{nsp}{nsp.join(pathname_mismatch)}')
                else:
                    return (f'{pid}    name mismatch                                      '
                            f'{nsp}{nsp.join(name_mismatch)}')

            if parent_path_mismatch:
                if True:
                    return (f'{pid}                                        parent mismatch'
                            f'{nsp}{nsp.join(parent_path_mismatch)}')

            if True:
                if True:
                    return ''
                    #return (f'{pid} ok                                                    ')

        import json
        from sparcur.backends import PennsieveDatasetData
        export_path = kwargs['export_path']
        epd = export_path / 'datasets'
        for dataset in datasets:
            latest = epd / dataset.identifier.uuid / 'LATEST'
            export_path_metadata =  latest / 'path-metadata.json'
            exported = export_path_metadata.exists()

            # pass the remote not just the id so that bfobject is
            # accessible to the RemoteDatasetData class
            pdd = PennsieveDatasetData(dataset)
            rmeta = pdd.fromCache()
            published = 'id_published' in rmeta
            if exported and published:
                with open(export_path_metadata, 'rt') as f:
                    j = json.load(f)
                epni = {'N:' + o['remote_id']:o for o in j['data']
                        if o['remote_id'].startswith('package:')}
                ppni = pdd._published_package_name_index()
                se, sp = set(epni), set(ppni)
                e_missing = sp - se
                p_missing = se - sp
                s_common = sp & se
                rep = [report(c, epni[c], ppni[c]) for c in s_common]
                repstr = '\n'.join([r for r in rep if r])
                if repstr:
                    print(dataset.id, 'bad')
                    print(repstr)
                else:
                    print(dataset.id, 'ok')
            elif exported:
                print(dataset.id, 'unpublished')
            elif published:
                print(dataset.id, 'unexported')
            else:
                print(dataset.id, 'unpublished and unexported')

    if ik('git_repos'):
        import augpathlib as aug
        import sparcur
        from sparcur.config import auth
        from importlib import metadata
        d = metadata.distribution(sparcur.__package__)
        rps = [p for p in [aug.RepoPath(d._path) for d in d.discover()] if p.working_dir]
        setups = [p for p in [p.parent / 'setup.py' for p in rps] if p.exists()]
        wds = sorted(set([p.working_dir for p in rps]))
        never_update = auth.get('never-update')
        pretend=ik('pretend') or never_update
        if pretend:
            if never_update:
                print(f'never-update: true is set in {auth.user_config._path}')
            print('These are the commands that would be run.')
        def doupdate(rp):
            if pretend:
                print(f'git -C {rp.as_posix()} stash')
                print(f'git -C {rp.as_posix()} pull')
                return
            print(f'Pulling {rp.as_posix()}')
            print(rp.repo.git.stash())
            # TODO checkout to a safety branch and tag for rollback
            print(rp.repo.git.pull())
        if ik('update'):
            for wd in wds:
                doupdate(wd)
            # indescriminately run setup.py with --release set to tangle
            from importlib import util as imu
            oldargv = sys.argv
            try:
                for s in setups:
                    if pretend:
                        print(f'pushd {s.parent.as_posix()}; python setup.py --release; popd')
                        continue
                    sys.argv = ['setup.py', '--release']  # reset every time since most will mutate
                    print(f'Maybe tangling via {s.as_posix()}')
                    spec = imu.spec_from_file_location(f'setup_{s.parent.name}', s)
                    mod = imu.module_from_spec(spec)
                    try:
                        with s.parent:  # ah relative paths
                            spec.loader.exec_module(mod)
                    except SystemExit:
                        pass
                    except Exception as e:
                        log.exception(e)
            finally:
                sys.argv = oldargv

    if ik('manifest'):
        from sparcur.utils import write_manifests
        paths = kwargs['paths']
        if not paths:
            paths = [Path.cwd()]

        manifests_rendered = write_manifests(parents=paths)
        manifests, rendered = zip(*manifests_rendered)
        nl = '\n'
        print(f'generated manifests at:\n{nl.join([m.as_posix() for m in manifests])}')
        if ik('open'):
            cmd = kwargs['open']
            [m.xopen(cmd) for m in manifests]
        elif ik('show'):
            [m.xopen() for m in manifests]


if __name__ == '__main__':
    pipe_main(main)

Test

from sparcur.simple.utils import pipe_main

def test_pipe_main():
    def main(id=None, project_path=None, **kwargs):
        print(id, project_path, kwargs)

    pipe_main(main, argv=['sparcur-simple'])

Clean metadata files

dataset id dataset path dataset relative paths

cleaned file root metadata file objects cleaned objects write objects cleaned files at cleaned file paths

from sparcur.paths import Path
from sparcur import datasets as dat
from sparcur.utils import symlink_latest
from sparcur.config import auth
from sparcur.simple.utils import rglob


def prepare_dataset_cleaned(dataset_path, cleaned_path=None, time_now=None):
    if cleaned_path is None:  # FIXME confusing and breaks w/ convention -> Options maybe?
        cleaned_path = Path(auth.get_path('cleaned-path'))

    from sparcur.utils import PennsieveId
    identifier = PennsieveId(dataset_path.cache.id)
    uuid = identifier.uuid
    cleaned_dataset_folder = cleaned_path / uuid
    cleaned_parent = cleaned_dataset_folder / time_now.START_TIMESTAMP_LOCAL_FRIENDLY
    if not cleaned_parent.exists():
        cleaned_parent.mkdir(parents=True)

    cleaned_latest = cleaned_dataset_folder / 'LATEST'
    # FIXME do we symlink before we know things have succeeded ???
    symlink_latest(cleaned_parent, cleaned_latest)
    return cleaned_parent


def from_dataset_path_metadata_file_paths(dataset_path):
    matches = []
    for candidate in rglob(dataset_path, '*.xlsx'):
        rp = candidate.relative_path_from(dataset_path)
        if not rp.parent.name or 'anifest' in rp.name:
            matches.append(candidate)

    return matches


def from_path_cleaned_object(path):
    t = dat.Tabular(path)
    sheet, wb, sparse_empty_rows = t._openpyxl_fixes()
    return wb


def from_file_paths_objects(paths):
    for path in paths:
        if path.suffix == '.xlsx':
            cleaned = from_path_cleaned_object(path)
            yield cleaned
        else:
            yield None


def from_dataset_path_cleaned_files(dataset_path, cleaned_parent):
    "NOTE this actually does the cleaning"

    paths = from_dataset_path_metadata_file_paths(dataset_path)
    for path, obj in zip(paths, from_file_paths_objects(paths)):
        if obj is not None:
            drp = path.dataset_relative_path
            target = cleaned_parent / drp
            if not target.parent.exists():
                target.parent.mkdir(parents=True)

            obj.save(target)


def main(path=Path.cwd(), id=None, time_now=None,
         parent_path=None, invariant_local_path='dataset',
         parent_parent_path=Path.cwd(),
         cleaned_path=None, cleaned_parent_path=None, **kwargs):
    # setup taken from path_metadata.py::main so check the notes there
    if path == Path.cwd() and (id is not None or parent_path is not None):
        if parent_path is None:
            uuid = id.uuid
            parent_path = parent_parent_path / uuid

        invariant_path = parent_path / invariant_local_path
        path = invariant_path.expanduser().resolve()
    else:
        parent_parent_path = None

    path = Path(path)
    cache = path.cache
    if not cache.is_dataset():
        raise TypeError('can only run on a single dataset')

    cleaned_parent = prepare_dataset_cleaned(path, cleaned_path, time_now)
    from_dataset_path_cleaned_files(path, cleaned_parent)


if __name__ == '__main__':
    import pprint
    from sparcur.simple.utils import pipe_main
    pipe_main(main, after=pprint.pprint)

Date: 2022-12-21T23:47:11-05:00

Author: Tom Gillespie

Created: 2022-12-22 Thu 01:38

Validate