synapse package

The synapse intelligence analysis framework.

Subpackages

Submodules

synapse.axon module

class synapse.axon.Axon[source]

Bases: synapse.lib.cell.Cell

cellapi

alias of synapse.axon.AxonApi

confdefs = {'http:proxy': {'description': 'An aiohttp-socks compatible proxy URL to use in the wget API.', 'type': 'string'}, 'max:bytes': {'description': 'The maximum number of bytes that can be stored in the Axon.', 'hidecmdl': True, 'minimum': 1, 'type': 'integer'}, 'max:count': {'description': 'The maximum number of files that can be stored in the Axon.', 'hidecmdl': True, 'minimum': 1, 'type': 'integer'}, 'tls:ca:dir': {'description': 'An optional directory of CAs which are added to the TLS CA chain for wget and wput APIs.', 'type': 'string'}}
async del_(sha256)[source]

Remove the given bytes from the Axon by sha256.

Parameters

sha256 (bytes) – The sha256, in bytes, to remove from the Axon.

Returns

True if the file is removed; false if the file is not present.

Return type

boolean

async dels(sha256s)[source]

Given a list of sha256 hashes, delete the files from the Axon.

Parameters

sha256s (list) – A list of sha256 hashes in bytes form.

Returns

A list of booleans, indicating if the file was deleted or not.

Return type

list

async get(sha256)[source]

Get bytes of a file.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Examples

Get the bytes from an Axon and process them:

buf = b''
async for bytz in axon.get(sha256):
    buf =+ bytz

await dostuff(buf)
Yields

bytes – Chunks of the file bytes.

Raises

synapse.exc.NoSuchFile – If the file does not exist.

async has(sha256)[source]

Check if the Axon has a file.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns

True if the Axon has the file; false otherwise.

Return type

boolean

async hashes(offs, wait=False, timeout=None)[source]

Yield hash rows for files that exist in the Axon in added order starting at an offset.

Parameters
  • offs (int) – The index offset.

  • wait (boolean) – Wait for new results and yield them in realtime.

  • timeout (int) – Max time to wait for new results.

Yields

(int, (bytes, int)) – An index offset and the file SHA-256 and size.

Note

If the same hash was deleted and then added back, the same hash will be yielded twice.

async hashset(sha256)[source]

Calculate additional hashes for a file in the Axon.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns

A dictionary containing hashes of the file.

Return type

dict

async history(tick, tock=None)[source]

Yield hash rows for files that existing in the Axon after a given point in time.

Parameters
  • tick (int) – The starting time (in epoch milliseconds).

  • tock (int) – The ending time to stop iterating at (in epoch milliseconds).

Yields

(int, (bytes, int)) – A tuple containing time of the hash was added and the file SHA-256 and size.

holdHashLock(hashbyts)[source]

A context manager that synchronizes edit access to a blob.

Parameters

hashbyts (bytes) – The blob to hold the lock for.

async iterMpkFile(sha256)[source]

Yield items from a MsgPack (.mpk) file in the Axon.

Parameters

sha256 (str) – The sha256 hash of the file as a string.

Yields

Unpacked items from the bytes.

async jsonlines(sha256)[source]
async metrics()[source]

Get the runtime metrics of the Axon.

Returns

A dictionary of runtime data about the Axon.

Return type

dict

async postfiles(fields, url, params=None, headers=None, method='POST', ssl=True, timeout=None)[source]

Send files from the axon as fields in a multipart/form-data HTTP request.

Parameters
  • fields (list) – List of dicts containing the fields to add to the request as form-data.

  • url (str) – The URL to retrieve.

  • params (dict) – Additional parameters to add to the URL.

  • headers (dict) – Additional HTTP headers to add in the request.

  • method (str) – The HTTP method to use.

  • ssl (bool) – Perform SSL verification.

  • timeout (int) – The timeout of the request, in seconds.

Notes

The dictionaries in the fields list may contain the following values:

{
    'name': <str> - Name of the field.
    'sha256': <str> - SHA256 hash of the file to submit for this field.
    'value': <str> - Value for the field. Ignored if a sha256 has been specified.
    'filename': <str> - Optional filename for the field.
    'content_type': <str> - Optional content type for the field.
    'content_transfer_encoding': <str> - Optional content-transfer-encoding header for the field.
}

The dictionary returned by this may contain the following values:

{
    'ok': <boolean> - False if there were exceptions retrieving the URL.
    'err': <str> - An error message if there was an exception when retrieving the URL.
    'url': <str> - The URL retrieved (which could have been redirected)
    'code': <int> - The response code.
    'body': <bytes> - The response body.
    'headers': <dict> - The response headers as a dictionary.
}
Returns

An information dictionary containing the results of the request.

Return type

dict

async put(byts)[source]

Store bytes in the Axon.

Parameters

byts (bytes) – The bytes to store in the Axon.

Notes

This API should not be used for files greater than 128 MiB in size.

Returns

A tuple with the file size and sha256 hash of the bytes.

Return type

tuple(int, bytes)

async puts(files)[source]

Store a set of bytes in the Axon.

Parameters

files (list) – A list of bytes to store in the Axon.

Notes

This API should not be used for storing more than 128 MiB of bytes at once.

Returns

A list containing tuples of file size and sha256 hash of the saved bytes.

Return type

list(tuple(int, bytes))

async readlines(sha256)[source]
async save(sha256, genr)[source]

Save a generator of bytes to the Axon.

Parameters
  • sha256 (bytes) – The sha256 hash of the file in bytes.

  • genr – The bytes generator function.

Returns

The size of the bytes saved.

Return type

int

async size(sha256)[source]

Get the size of a file in the Axon.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns

The size of the file, in bytes. If not present, None is returned.

Return type

int

async upload()[source]

Get an Upload object.

Notes

The UpLoad object should be used to manage uploads greater than 128 MiB in size.

Examples

Use an UpLoad object to upload a file to the Axon:

async with await axon.upload() as upfd:
    # Assumes bytesGenerator yields bytes
    async for byts in bytsgenerator():
        await upfd.write(byts)
    await upfd.save()

Use a single UpLoad object to save multiple files:

async with await axon.upload() as upfd:
    for fp in file_paths:
        # Assumes bytesGenerator yields bytes
        async for byts in bytsgenerator(fp):
            await upfd.write(byts)
        await upfd.save()
Returns

An Upload manager object.

Return type

UpLoad

async wants(sha256s)[source]

Get a list of sha256 values the axon does not have from a input list.

Parameters

sha256s (list) – A list of sha256 values as bytes.

Returns

A list of bytes containing the sha256 hashes the Axon does not have.

Return type

list

async wget(url, params=None, headers=None, json=None, body=None, method='GET', ssl=True, timeout=None)[source]

Stream a file download directly into the Axon.

Parameters
  • url (str) – The URL to retrieve.

  • params (dict) – Additional parameters to add to the URL.

  • headers (dict) – Additional HTTP headers to add in the request.

  • json – A JSON body which is included with the request.

  • body – The body to be included in the request.

  • method (str) – The HTTP method to use.

  • ssl (bool) – Perform SSL verification.

  • timeout (int) – The timeout of the request, in seconds.

Notes

The response body will be stored, regardless of the response code. The ok value in the reponse does not reflect that a status code, such as a 404, was encountered when retrieving the URL.

The dictionary returned by this may contain the following values:

{
    'ok': <boolean> - False if there were exceptions retrieving the URL.
    'url': <str> - The URL retrieved (which could have been redirected)
    'code': <int> - The response code.
    'mesg': <str> - An error message if there was an exception when retrieving the URL.
    'headers': <dict> - The response headers as a dictionary.
    'size': <int> - The size in bytes of the response body.
    'hashes': {
        'md5': <str> - The MD5 hash of the response body.
        'sha1': <str> - The SHA1 hash of the response body.
        'sha256': <str> - The SHA256 hash of the response body.
        'sha512': <str> - The SHA512 hash of the response body.
    }
}
Returns

An information dictionary containing the results of the request.

Return type

dict

async wput(sha256, url, params=None, headers=None, method='PUT', ssl=True, timeout=None, filename=None, filemime=None)[source]

Stream a blob from the axon as the body of an HTTP request.

class synapse.axon.AxonApi[source]

Bases: synapse.lib.cell.CellApi, synapse.lib.share.Share

async del_(sha256)[source]

Remove the given bytes from the Axon by sha256.

Parameters

sha256 (bytes) – The sha256, in bytes, to remove from the Axon.

Returns

True if the file is removed; false if the file is not present.

Return type

boolean

async dels(sha256s)[source]

Given a list of sha256 hashes, delete the files from the Axon.

Parameters

sha256s (list) – A list of sha256 hashes in bytes form.

Returns

A list of booleans, indicating if the file was deleted or not.

Return type

list

async get(sha256)[source]

Get bytes of a file.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Examples

Get the bytes from an Axon and process them:

buf = b''
async for bytz in axon.get(sha256):
    buf =+ bytz

await dostuff(buf)
Yields

bytes – Chunks of the file bytes.

Raises

synapse.exc.NoSuchFile – If the file does not exist.

async has(sha256)[source]

Check if the Axon has a file.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns

True if the Axon has the file; false otherwise.

Return type

boolean

async hashes(offs, wait=False, timeout=None)[source]

Yield hash rows for files that exist in the Axon in added order starting at an offset.

Parameters
  • offs (int) – The index offset.

  • wait (boolean) – Wait for new results and yield them in realtime.

  • timeout (int) – Max time to wait for new results.

Yields

(int, (bytes, int)) – An index offset and the file SHA-256 and size.

async hashset(sha256)[source]

Calculate additional hashes for a file in the Axon.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns

A dictionary containing hashes of the file.

Return type

dict

async history(tick, tock=None)[source]

Yield hash rows for files that existing in the Axon after a given point in time.

Parameters
  • tick (int) – The starting time (in epoch milliseconds).

  • tock (int) – The ending time to stop iterating at (in epoch milliseconds).

Yields

(int, (bytes, int)) – A tuple containing time of the hash was added and the file SHA-256 and size.

async iterMpkFile(sha256)[source]

Yield items from a MsgPack (.mpk) file in the Axon.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Yields

Unpacked items from the bytes.

async jsonlines(sha256)[source]

Yield JSON objects from JSONL (JSON lines) file.

Parameters

sha256 (bytes) – The sha256 hash of the file.

Yields

object – Decoded JSON objects.

async metrics()[source]

Get the runtime metrics of the Axon.

Returns

A dictionary of runtime data about the Axon.

Return type

dict

async postfiles(fields, url, params=None, headers=None, method='POST', ssl=True, timeout=None)[source]
async put(byts)[source]

Store bytes in the Axon.

Parameters

byts (bytes) – The bytes to store in the Axon.

Notes

This API should not be used for files greater than 128 MiB in size.

Returns

A tuple with the file size and sha256 hash of the bytes.

Return type

tuple(int, bytes)

async puts(files)[source]

Store a set of bytes in the Axon.

Parameters

files (list) – A list of bytes to store in the Axon.

Notes

This API should not be used for storing more than 128 MiB of bytes at once.

Returns

A list containing tuples of file size and sha256 hash of the saved bytes.

Return type

list(tuple(int, bytes))

async readlines(sha256)[source]

Yield lines from a multi-line text file in the axon.

Parameters

sha256 (bytes) – The sha256 hash of the file.

Yields

str – Lines of text

async size(sha256)[source]

Get the size of a file in the Axon.

Parameters

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns

The size of the file, in bytes. If not present, None is returned.

Return type

int

async upload()[source]

Get an Upload object.

Notes

The UpLoad object should be used to manage uploads greater than 128 MiB in size.

Examples

Use an UpLoad object to upload a file to the Axon:

async with axonProxy.upload() as upfd:
    # Assumes bytesGenerator yields bytes
    async for byts in bytsgenerator():
        upfd.write(byts)
    upfd.save()

Use a single UpLoad object to save multiple files:

async with axonProxy.upload() as upfd:
    for fp in file_paths:
        # Assumes bytesGenerator yields bytes
        async for byts in bytsgenerator(fp):
            upfd.write(byts)
        upfd.save()
Returns

An Upload manager object.

Return type

UpLoadShare

async wants(sha256s)[source]

Get a list of sha256 values the axon does not have from a input list.

Parameters

sha256s (list) – A list of sha256 values as bytes.

Returns

A list of bytes containing the sha256 hashes the Axon does not have.

Return type

list

async wget(url, params=None, headers=None, json=None, body=None, method='GET', ssl=True, timeout=None)[source]

Stream a file download directly into the Axon.

Parameters
  • url (str) – The URL to retrieve.

  • params (dict) – Additional parameters to add to the URL.

  • headers (dict) – Additional HTTP headers to add in the request.

  • json – A JSON body which is included with the request.

  • body – The body to be included in the request.

  • method (str) – The HTTP method to use.

  • ssl (bool) – Perform SSL verification.

  • timeout (int) – The timeout of the request, in seconds.

Notes

The response body will be stored, regardless of the response code. The ok value in the reponse does not reflect that a status code, such as a 404, was encountered when retrieving the URL.

The dictionary returned by this may contain the following values:

{
    'ok': <boolean> - False if there were exceptions retrieving the URL.
    'url': <str> - The URL retrieved (which could have been redirected)
    'code': <int> - The response code.
    'mesg': <str> - An error message if there was an exception when retrieving the URL.
    'headers': <dict> - The response headers as a dictionary.
    'size': <int> - The size in bytes of the response body.
    'hashes': {
        'md5': <str> - The MD5 hash of the response body.
        'sha1': <str> - The SHA1 hash of the response body.
        'sha256': <str> - The SHA256 hash of the response body.
        'sha512': <str> - The SHA512 hash of the response body.
    }
}
Returns

An information dictionary containing the results of the request.

Return type

dict

async wput(sha256, url, params=None, headers=None, method='PUT', ssl=True, timeout=None)[source]
class synapse.axon.AxonHttpBySha256V1(application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)[source]

Bases: synapse.lib.httpapi.Handler

async delete(sha256)[source]
async get(sha256)[source]
class synapse.axon.AxonHttpDelV1(application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)[source]

Bases: synapse.lib.httpapi.Handler

async post()[source]
class synapse.axon.AxonHttpHasV1(application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)[source]

Bases: synapse.lib.httpapi.Handler

async get(sha256)[source]
class synapse.axon.AxonHttpUploadV1(application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)[source]

Bases: synapse.lib.httpapi.StreamHandler

async data_received(chunk)[source]

Implement this method to handle streamed request data.

Requires the .stream_request_body decorator.

May be a coroutine for flow control.

on_connection_close()[source]

Called in async handlers if the client closed the connection.

Override this to clean up resources associated with long-lived connections. Note that this method is called only if the connection was closed during asynchronous processing; if you need to do cleanup after every request override on_finish instead.

Proxies may keep a connection open for a time (perhaps indefinitely) after the client has gone away, so this method may not be called promptly after the end user closes their connection.

on_finish()[source]

Called after the end of a request.

Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare. on_finish may not produce any output, as it is called after the response has been sent to the client.

async post()[source]

Called after all data has been read.

async prepare()[source]

Called at the beginning of a request before get/post/etc.

Override this method to perform common initialization regardless of the request method.

Asynchronous support: Use async def or decorate this method with .gen.coroutine to make it asynchronous. If this method returns an Awaitable execution will not proceed until the Awaitable is done.

New in version 3.1: Asynchronous support.

async put()[source]
class synapse.axon.UpLoad[source]

Bases: synapse.lib.base.Base

An object used to manage uploads to the Axon.

async save()[source]

Save the currently uploaded bytes to the Axon.

Notes

This resets the Upload object, so it can be reused.

Returns

A tuple of sizes in bytes and the sha256 hash of the saved files.

Return type

tuple(int, bytes)

async write(byts)[source]

Write bytes to the Upload object.

Parameters

byts (bytes) – Bytes to write to the current Upload object.

Returns

Returns None.

Return type

(None)

class synapse.axon.UpLoadProxy[source]

Bases: synapse.lib.share.Share

async save()[source]
async write(byts)[source]
class synapse.axon.UpLoadShare[source]

Bases: synapse.axon.UpLoad, synapse.lib.share.Share

typename = 'upload'

synapse.cells module

synapse.common module

class synapse.common.NoValu[source]

Bases: object

class synapse.common.aclosing(thing)[source]

Bases: contextlib.AbstractAsyncContextManager

Async context manager for safely finalizing an asynchronously cleaned-up resource such as an async generator, calling its aclose() method.

Code like this:

async with aclosing(<module>.fetch(<arguments>)) as agen:
    <block>

is equivalent to this:

agen = <module>.fetch(<arguments>)
try:
    <block>
finally:
    await agen.aclose()
async synapse.common.agen(*items)[source]
async synapse.common.aspin(genr)[source]

Async version of spin

synapse.common.buid(valu=None)[source]

A binary GUID like sequence of 32 bytes.

Parameters
  • valu (object) – Optional, if provided, the hash of the msgpack

  • to (encoded form of the object is returned. This can be used) –

  • buids. (create stable) –

Notes

By default, this returns a random 32 byte value.

Returns

A 32 byte value.

Return type

bytes

synapse.common.chunks(item, size)[source]

Divide an iterable into chunks.

Parameters
  • item – Item to slice

  • size (int) – Maximum chunk size.

Notes

This supports Generator objects and objects which support calling the __getitem__() method with a slice object.

Yields

Slices of the item containing up to “size” number of items.

synapse.common.config(conf, confdefs)[source]

Initialize a config dict using the given confdef tuples.

synapse.common.debase64(b)[source]
synapse.common.deprecated(name, curv='2.x', eolv='3.0.0')[source]
synapse.common.ehex(byts)[source]

Encode a bytes variable to a string using binascii.hexlify.

Parameters

byts (bytes) – Bytes to encode.

Returns

A string representing the bytes.

Return type

str

synapse.common.enbase64(b)[source]
synapse.common.envbool(name, defval='false')[source]

Resolve an environment variable to a boolean value.

Parameters
  • name (str) – Environment variable to resolve.

  • defval (str) – Default string value to resolve as.

Notes

False values will be consider strings “0” or “false” after lower casing.

Returns

True if the envar is set, false if it is set to a false value.

Return type

boolean

synapse.common.err(e, fulltb=False)[source]
synapse.common.errinfo(name, mesg)[source]
synapse.common.excinfo(e)[source]

Populate err,errmsg,errtrace info from exc.

synapse.common.firethread(f)[source]

A decorator for making a function fire a thread.

synapse.common.gendir(*paths, **opts)[source]

Return the absolute path of the joining of the arguments, creating a directory at the resulting path if one does not exist.

Performs home directory(~) and environment variable expansion.

Parameters
  • *paths ([str,...]) – A list of path elements

  • **opts – arguments as kwargs to os.makedirs

synapse.common.genfile(*paths)[source]

Create or open (for read/write) a file path join.

Parameters

*paths – A list of paths to join together to make the file.

Notes

If the file already exists, the fd returned is opened in r+b mode. Otherwise, the fd is opened in w+b mode.

The file position is set to the start of the file. The user is responsible for truncating (fd.truncate()) if the existing file contents are not desired, or seeking to the end (fd.seek(0, 2)) to append.

Returns

A file-object which can be read/written too.

Return type

io.BufferedRandom

synapse.common.genpath(*paths)[source]

Return an absolute path of the joining of the arguments as path elements

Performs home directory(~) and environment variable expansion on the joined path

Parameters

*paths ([str,...]) – A list of path elements

Note

All paths used by Synapse operations (i.e. everything but the data) shall use this function or one of its callers before storing as object properties.

synapse.common.getDirSize(*paths)[source]

Get the size of a directory.

Parameters

*paths (str) – A list of path elements.

Notes

This is equivalent to du -B 1 -s and du -bs.

Returns

Tuple of total real and total apparent size of all normal files and directories underneath *paths plus *paths itself.

Return type

tuple

synapse.common.getSslCtx(cadir, purpose=Purpose.SERVER_AUTH)[source]

Create as SSL Context and load certificates from a given directory.

Parameters
  • cadir (str) – Path to load certificates from.

  • purpose – SSLContext purposes flags.

Returns

A SSL Context object.

Return type

ssl.SSLContext

synapse.common.getSynDir(*paths)[source]
synapse.common.getSynPath(*paths)[source]
synapse.common.getTempDir()[source]
synapse.common.getbytes(*paths, **opts)[source]
synapse.common.getfile(*paths, **opts)[source]

Return a file at the path resulting from joining of the arguments, or None if the file does not exist.

Parameters
  • *paths ([str,...]) – A list of path elements

  • **opts – arguments as kwargs to io.open

Returns

A file-object which can be read/written too.

Return type

io.BufferedRandom

synapse.common.guid(valu=None)[source]

Get a 16 byte guid value.

By default, this is a random guid value.

Parameters

valu – Object used to construct the guid valu from. This must be able to be msgpack’d.

Returns

32 character, lowercase ascii string.

Return type

str

synapse.common.hugeadd(x, y)[source]

Add two decimal.Decimal with proper precision to support synapse hugenums.

synapse.common.hugenum(valu)[source]

Return a decimal.Decimal with proper precision for use as a synapse hugenum.

synapse.common.hugeround(x)[source]

Round a decimal.Decimal with proper precision for synapse hugenums.

synapse.common.hugesub(x, y)[source]

Subtract two decimal.Decimal with proper precision to support synapse hugenums.

synapse.common.int64en(i)[source]

Encode an unsigned 64-bit int into 8 byte big-endian bytes

synapse.common.int64un(b)[source]

Decode an unsigned 64-bit int from 8 byte big-endian

synapse.common.intify(x)[source]

Ensure ( or coerce ) a value into being an integer or None.

Parameters

x (obj) – An object to intify

Returns

The int value ( or None )

Return type

(int)

synapse.common.isbuidhex(text)[source]
synapse.common.isguid(text)[source]
synapse.common.iterfd(fd, size=10000000)[source]

Generator which yields bytes from a file descriptor.

Parameters
  • fd (file) – A file-like object to read bytes from.

  • size (int) – Size, in bytes, of the number of bytes to read from the

  • time. (fd at a given) –

Notes

If the first read call on the file descriptor is a empty bytestring, that zero length bytestring will be yielded and the generator will then be exhuasted. This behavior is intended to allow the yielding of contents of a zero byte file.

Yields

bytes – Bytes from the file descriptor.

synapse.common.iterzip(*args, fillvalue=None)[source]
synapse.common.jslines(*paths)[source]
synapse.common.jsload(*paths)[source]
synapse.common.jsonsafe_nodeedits(nodeedits)[source]

Hexlify the buid of each node:edits

synapse.common.jssave(js, *paths)[source]
synapse.common.listdir(*paths, glob=None)[source]

List the (optionally glob filtered) full paths from a dir.

Parameters
  • *paths ([str,...]) – A list of path elements

  • glob (str) – An optional fnmatch glob str

synapse.common.lockfile(path)[source]

A file lock with-block helper.

Parameters

path (str) – A path to a lock file.

Examples

Get the lock on a file and dostuff while having the lock:

path = '/hehe/haha.lock'
with lockfile(path):
    dostuff()

Notes

This is curently based on fcntl.lockf(), and as such, it is purely advisory locking. If multiple processes are attempting to obtain a lock on the same file, this will block until the process which has the current lock releases it.

Yields

None

synapse.common.makedirs(path, mode=511)[source]
async synapse.common.merggenr(genrs, cmprkey)[source]

Iterate multiple sorted async generators and yield their results in order.

Parameters
  • genrs (Sequence[AsyncGenerator[T]]) – a sequence of async generator that each yield sorted items

  • cmprkey (Callable[T, T, bool]) – a comparison function over the items yielded

Note

If the genrs yield increasing items, cmprkey should return True if the first parameter is less than the second parameter, e.g lambda x, y: x < y.

async synapse.common.merggenr2(genrs, cmprkey=None, reverse=False)[source]

Optimized version of merggenr based on heapq.merge

synapse.common.normLogLevel(valu)[source]

Norm a log level value to a integer.

Parameters

valu – The value to norm ( a string or integer ).

Returns

A valid Logging log level.

Return type

int

synapse.common.now()[source]

Get the current epoch time in milliseconds.

This relies on time.time(), which is system-dependent in terms of resolution.

Examples

Get the current time and make a row for a Cortex:

tick = now()
row = (someiden, 'foo:prop', 1, tick)
core.addRows([row])
Returns

Epoch time in milliseconds.

Return type

int

synapse.common.reqbytes(*paths)[source]
synapse.common.reqdir(*paths)[source]

Return the absolute path of the joining of the arguments, raising an exception if a directory does not exist at the resulting path.

Performs home directory(~) and environment variable expansion.

Parameters

*paths ([str,...]) – A list of path elements

synapse.common.reqfile(*paths, **opts)[source]

Return a file at the path resulting from joining of the arguments, raising an exception if the file does not exist.

Parameters
  • *paths ([str,...]) – A list of path elements

  • **opts – arguments as kwargs to io.open

Returns

A file-object which can be read/written too.

Return type

io.BufferedRandom

synapse.common.reqjsonsafe(item)[source]

Returns None if item is json serializable, otherwise raises an exception

synapse.common.reqpath(*paths)[source]

Return the absolute path of the joining of the arguments, raising an exception if a file doesn’t exist at resulting path

Parameters

*paths ([str,...]) – A list of path elements

synapse.common.result(retn)[source]

Return a value or raise an exception from a retn tuple.

synapse.common.retnexc(e)[source]

Construct a retn tuple for the given exception.

synapse.common.setlogging(mlogger, defval=None, structlog=None)[source]

Configure synapse logging.

Parameters
  • mlogger (logging.Logger) – Reference to a logging.Logger()

  • defval (str) – Default log level. May be an integer.

  • structlog (bool) – Enabled structured (jsonl) logging output.

Notes

This calls logging.basicConfig and should only be called once per process.

Returns

None

synapse.common.signedint64en(i)[source]

Encode a signed 64-bit int into 8 byte big-endian bytes

synapse.common.signedint64un(b)[source]

Decode a signed 64-bit int from 8 byte big-endian

synapse.common.spin(genr)[source]

Crank through a generator but discard the yielded values.

Parameters

genr – Any generator or iterable valu.

Notes

This generator is exhausted via the collections.dequeue() constructor with a maxlen=0, which will quickly exhaust an iterator staying in C code as much as possible.

Returns

None

synapse.common.switchext(*paths, ext)[source]

Return an absolute path of the joining of the arguments with the extension replaced.

If an extension does not exist, it will be added.

Parameters
  • *paths ([str,...]) – A list of path elements

  • ext (str) – A file extension (e.g. ‘.txt’). It should begin with a period.

synapse.common.todo(_todoname, *args, **kwargs)[source]

Construct and return a todo tuple of (name, args, kwargs).

Note: the odd name for the first parameter is to avoid collision with keys in kwargs.

synapse.common.tuplify(obj)[source]

Convert a nested set of python primitives into tupleized forms via msgpack.

synapse.common.uhex(text)[source]

Decode a hex string into bytes.

Parameters

text (str) – Text to decode.

Returns

The decoded bytes.

Return type

bytes

synapse.common.unjsonsafe_nodeedits(nodeedits)[source]
synapse.common.verstr(vtup)[source]

Convert a version tuple to a string.

synapse.common.vertup(vstr)[source]

Convert a version string to a tuple.

Example

ver = vertup(‘1.3.30’)

synapse.common.worker(meth, *args, **kwargs)[source]
synapse.common.yamlload(*paths)[source]
synapse.common.yamlmod(obj, *paths)[source]

Combines/creates a yaml file and combines with obj. obj and file must be maps/dict or empty.

synapse.common.yamlsave(obj, *paths)[source]

synapse.cortex module

class synapse.cortex.CoreApi[source]

Bases: synapse.lib.cell.CellApi

The CoreApi is exposed when connecting to a Cortex over Telepath.

Many CoreApi methods operate on packed nodes consisting of primitive data structures which can be serialized with msgpack/json.

An example of a packaged Node:

( (<form>, <valu>), {

    "props": {
        <name>: <valu>,
        ...
    },
    "tags": {
        "foo": <time>,
        "foo.bar": <time>,
    },
})
async addCronJob(cdef)[source]

Add a cron job to the cortex

A cron job is a persistently-stored item that causes storm queries to be run in the future. The specification for the times that the queries run can be one-shot or recurring.

Parameters
  • query (str) – The storm query to execute in the future

  • reqs (Union[Dict[str, Union[int, List[int]]], List[Dict[...]]]) – Either a dict of the fixed time fields or a list of such dicts. The keys are in the set (‘year’, ‘month’, ‘dayofmonth’, ‘dayofweek’, ‘hour’, ‘minute’. The values must be positive integers, except for the key of ‘dayofmonth’ in which it may also be a negative integer which represents the number of days from the end of the month with -1 representing the last day of the month. All values may also be lists of valid values.

  • incunit (Optional[str]) – A member of the same set as above, with an additional member ‘day’. If is None (default), then the appointment is one-shot and will not recur.

  • incvals (Union[int, List[int]) – A integer or a list of integers of the number of units

Returns (bytes):

An iden that can be used to later modify, query, and delete the job.

Notes

reqs must have fields present or incunit must not be None (or both) The incunit if not None it must be larger in unit size than all the keys in all reqs elements.

async addFeedData(name, items, *, viewiden=None)[source]
async addForm(formname, basetype, typeopts, typeinfo)[source]

Add an extended form to the data model.

Extended forms must begin with _

async addFormProp(form, prop, tdef, info)[source]

Add an extended property to the given form.

Extended properties must begin with _

async addNode(form, valu, props=None)[source]

Deprecated in 2.0.0.

async addNodeTag(iden, tag, valu=(None, None))[source]

Add a tag to a node specified by iden.

Parameters
  • iden (str) – A hex encoded node BUID.

  • tag (str) – A tag string.

  • valu (tuple) – A time interval tuple or (None, None).

async addNodes(nodes)[source]

Add a list of packed nodes to the cortex.

Parameters

nodes (list) – [ ( (form, valu), {‘props’:{}, ‘tags’:{}}), … ]

Yields

(tuple) – Packed node tuples ((form,valu), {‘props’: {}, ‘tags’:{}})

Deprecated in 2.0.0

addStormDmon(ddef)[source]
async addStormPkg(pkgdef)[source]
async addTagProp(name, tdef, info)[source]

Add a tag property to record data about tags on nodes.

async addUnivProp(name, tdef, info)[source]

Add an extended universal property.

Extended properties must begin with _

addUserNotif(useriden, mesgtype, mesgdata=None)[source]
bumpStormDmon(iden)[source]
async callStorm(text, opts=None)[source]

Return the value expressed in a return() statement within storm.

cloneLayer(iden, ldef=None)[source]
async count(text, opts=None)[source]

Count the number of nodes which result from a storm query.

Parameters
  • text (str) – Storm query text.

  • opts (dict) – Storm query options.

Returns

The number of nodes resulting from the query.

Return type

(int)

async delCronJob(iden)[source]

Delete a cron job

Parameters

iden (bytes) – The iden of the cron job to be deleted

async delForm(formname)[source]

Remove an extended form from the data model.

async delFormProp(form, name)[source]

Remove an extended property from the given form.

async delNodeProp(iden, name)[source]

Delete a property from a single node. Deprecated in 2.0.0.

async delNodeTag(iden, tag)[source]

Delete a tag from the node specified by iden. Deprecated in 2.0.0.

Parameters
  • iden (str) – A hex encoded node BUID.

  • tag (str) – A tag string.

async delStormCmd(name)[source]

Remove a pure storm command from the cortex.

delStormDmon(iden)[source]
async delStormPkg(iden)[source]
async delTagProp(name)[source]

Remove a previously added tag property.

async delUnivProp(name)[source]

Remove an extended universal property.

delUserNotif(indx)[source]
async disableCronJob(iden)[source]

Enable a cron job

Parameters

iden (bytes) – The iden of the cron job to be changed

disableMigrationMode()[source]
disableStormDmon(iden)[source]
async editCronJob(iden, name, valu)[source]

Update a value in a cron definition.

async enableCronJob(iden)[source]

Enable a cron job

Parameters

iden (bytes) – The iden of the cron job to be changed

enableMigrationMode()[source]
enableStormDmon(iden)[source]
async eval(text, opts=None)[source]

Evaluate a storm query and yield packed nodes.

NOTE: This API is deprecated as of 2.0.0 and will be removed in 3.0.0

async exportStorm(text, opts=None)[source]

Execute a storm query and package nodes for export/import.

NOTE: This API yields nodes after an initial complete lift

in order to limit exported edges.

async feedFromAxon(sha256, opts=None)[source]

Import a msgpack .nodes file from the axon.

async getAxonBytes(sha256)[source]
async getAxonUpload()[source]
getCoreInfo()[source]

Return static generic information about the cortex including model definition

async getCoreInfoV2()[source]

Return static generic information about the cortex including model definition

getCoreMods()[source]
async getFeedFuncs()[source]

Get a list of Cortex feed functions.

Notes

Each feed dictionary has the name of the feed function, the full docstring for the feed function, and the first line of the docstring broken out in their own keys for easy use.

Returns

A tuple of dictionaries.

Return type

tuple

async getModelDefs()[source]
async getModelDict()[source]

Return a dictionary which describes the data model.

Returns

A model description dictionary.

Return type

(dict)

async getPermDef(perm)[source]

Return a perm definition if it is present in getPermDefs() output.

async getPermDefs()[source]

Return a non-comprehensive list of perm definitions.

async getPropNorm(prop, valu)[source]

Get the normalized property value based on the Cortex data model.

Parameters
  • prop (str) – The property to normalize.

  • valu – The value to normalize.

Returns

A two item tuple, containing the normed value and the info dictionary.

Return type

(tuple)

Raises
  • s_exc.NoSuchProp – If the prop does not exist.

  • s_exc.BadTypeValu – If the value fails to normalize.

getProvStack(iden: str)[source]

Return the provenance stack associated with the given iden.

Parameters

iden (str) – the iden of the provenance stack

Note: the iden appears on each splice entry as the ‘prov’ property

getStormDmon(iden)[source]
getStormDmonLog(iden)[source]
getStormDmons()[source]
getStormPkg(name)[source]
getStormPkgs()[source]
async getStormVar(name, default=None)[source]
async getTypeNorm(name, valu)[source]

Get the normalized type value based on the Cortex data model.

Parameters
  • name (str) – The type to normalize.

  • valu – The value to normalize.

Returns

A two item tuple, containing the normed value and the info dictionary.

Return type

(tuple)

Raises
  • s_exc.NoSuchType – If the type does not exist.

  • s_exc.BadTypeValu – If the value fails to normalize.

getUserNotif(indx)[source]
async iterFormRows(layriden, form, stortype=None, startvalu=None)[source]

Yields buid, valu tuples of nodes of a single form, optionally (re)starting at startvalue

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • form (str) – A form name

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

async iterPropRows(layriden, form, prop, stortype=None, startvalu=None)[source]

Yields buid, valu tuples of nodes with a particular secondary property, optionally (re)starting at startvalue

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • form (str) – A form name.

  • prop (str) – A secondary property name.

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

async iterTagPropRows(layriden, tag, prop, form=None, stortype=None, startvalu=None)[source]

Yields (buid, valu) that match a tag:prop, optionally (re)starting at startvalu.

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • tag (str) – tag name

  • prop (str) – prop name

  • form (Optional[str]) – optional form name

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

async iterTagRows(layriden, tag, form=None, starttupl=None)[source]

Yields (buid, (valu, form)) values that match a tag and optional form, optionally (re)starting at starttupl.

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • tag (str) – the tag to match

  • form (Optional[str]) – if present, only yields buids of nodes that match the form.

  • starttupl (Optional[Tuple[buid, form]]) – if present, (re)starts the stream of values there.

Returns

AsyncIterator[Tuple(buid, (valu, form))]

Note

This yields (buid, (tagvalu, form)) instead of just buid, valu in order to allow resuming an interrupted call by feeding the last value retrieved into starttupl

async iterUnivRows(layriden, prop, stortype=None, startvalu=None)[source]

Yields buid, valu tuples of nodes with a particular universal property, optionally (re)starting at startvalue

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • prop (str) – A universal property name.

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

iterUserNotifs(useriden, size=None)[source]
async listCronJobs()[source]

Get information about all the cron jobs accessible to the current user

async popStormVar(name, default=None)[source]
provStacks(offs, size)[source]

Return stream of (iden, provenance stack) tuples at the given offset.

async reqValidStorm(text, opts=None)[source]

Parse a Storm query to validate it.

Parameters
  • text (str) – The text of the Storm query to parse.

  • opts (dict) – A Storm options dictionary.

Returns

If the query is valid.

Return type

True

Raises

BadSyntaxError – If the query is invalid.

saveLayerNodeEdits(layriden, edits, meta)[source]
async setNodeProp(iden, name, valu)[source]

Set a property on a single node. Deprecated in 2.0.0.

async setStormCmd(cdef)[source]

Set the definition of a pure storm command in the cortex.

async setStormVar(name, valu)[source]
async spliceHistory()[source]

Yield splices backwards from the end of the splice log.

Will only return the user’s own splices unless they are an admin.

splices(offs=None, size=None, layriden=None)[source]

Return the list of splices at the given offset.

splicesBack(offs=None, size=None)[source]

Return the list of splices backwards from the given offset.

stat()[source]
async storm(text, opts=None)[source]

Evaluate a storm query and yield result messages.

Yields

((str,dict)) – Storm messages.

async syncIndexEvents(matchdef, offsdict=None, wait=True)[source]
async syncLayerNodeEdits(offs, layriden=None, wait=True)[source]

Yield (indx, mesg) nodeedit sets for the given layer beginning at offset.

Once caught up, this API will begin yielding nodeedits in real-time. The generator will only terminate on network disconnect or if the consumer falls behind the max window size of 10,000 nodeedit messages.

async syncLayersEvents(offsdict=None, wait=True)[source]
async updateCronJob(iden, query)[source]

Change an existing cron job’s query

Parameters

iden (bytes) – The iden of the cron job to be changed

async watch(wdef)[source]

Hook cortex/view/layer watch points based on a specified watch definition.

Example

wdef = { ‘tags’: [ ‘foo.bar’, ‘baz.*’ ] }

async for mesg in core.watch(wdef):

dostuff(mesg)

watchAllUserNotifs(offs=None)[source]
class synapse.cortex.Cortex[source]

Bases: synapse.lib.cell.Cell

A Cortex implements the synapse hypergraph.

The bulk of the Cortex API lives on the Snap() object which can be obtained by calling Cortex.snap() in a with block. This allows callers to manage transaction boundaries explicitly and dramatically increases performance.

async addCoreQueue(name, info)[source]
async addCronJob(cdef)[source]

Add a cron job to the cortex. Convenience wrapper around agenda.add

A cron job is a persistently-stored item that causes storm queries to be run in the future. The specification for the times that the queries run can be one-shot or recurring.

Parameters
  • query (str) – The storm query to execute in the future

  • reqs (Union[Dict[str, Union[int, List[int]]], List[Dict[...]]]) – Either a dict of the fixed time fields or a list of such dicts. The keys are in the set (‘year’, ‘month’, ‘dayofmonth’, ‘dayofweek’, ‘hour’, ‘minute’. The values must be positive integers, except for the key of ‘dayofmonth’ in which it may also be a negative integer which represents the number of days from the end of the month with -1 representing the last day of the month. All values may also be lists of valid values.

  • incunit (Optional[str]) – A member of the same set as above, with an additional member ‘day’. If is None (default), then the appointment is one-shot and will not recur.

  • incvals (Union[int, List[int]) – A integer or a list of integers of the number of units

Returns (bytes):

An iden that can be used to later modify, query, and delete the job.

Notes

reqs must have fields present or incunit must not be None (or both) The incunit if not None it must be larger in unit size than all the keys in all reqs elements. Non-recurring jobs may also have a req of ‘now’ which will cause the job to also execute immediately.

async addFeedData(name, items, *, viewiden=None)[source]

Add data using a feed/parser function.

Parameters
  • name (str) – The name of the feed record format.

  • items (list) – A list of items to ingest.

  • iden (str) – The iden of a view to use. If a view is not specified, the default view is used.

async addForm(formname, basetype, typeopts, typeinfo)[source]
async addFormProp(form, prop, tdef, info)[source]
async addLayer(ldef=None, nexs=True)[source]

Add a Layer to the cortex.

Parameters
  • ldef (Optional[Dict]) – layer configuration

  • nexs (bool) – whether to record a nexus transaction (internal use only)

async addLayrPull(layriden, pdef)[source]
async addLayrPush(layriden, pdef)[source]
async addNode(user, form, valu, props=None)[source]
async addNodeTag(user, iden, tag, valu=(None, None))[source]

Add a tag to a node specified by iden.

Parameters
  • iden (str) – A hex encoded node BUID.

  • tag (str) – A tag string.

  • valu (tuple) – A time interval tuple or (None, None).

async addNodes(nodedefs, view=None)[source]

Quickly add/modify a list of nodes from node definition tuples. This API is the simplest/fastest way to add nodes, set node props, and add tags to nodes remotely.

Parameters

nodedefs (list) – A list of node definition tuples. See below.

A node definition tuple is defined as:

( (form, valu), {‘props’:{}, ‘tags’:{})

The “props” or “tags” keys may be omitted.

addRuntLift(prop, func)[source]

Register a runt lift helper for a given prop.

Parameters
  • prop (str) – Full property name for the prop to register the helper for.

  • func

Returns

None.

Return type

None

addRuntPropDel(full, func)[source]

Register a prop set helper for a runt form

addRuntPropSet(full, func)[source]

Register a prop set helper for a runt form

addStormCmd(ctor)[source]

Add a synapse.lib.storm.Cmd class to the cortex.

async addStormDmon(ddef)[source]

Add a storm dmon task.

addStormLib(path, ctor)[source]
async addStormPkg(pkgdef)[source]

Add the given storm package to the cortex.

This will store the package for future use.

async addStormSvc(sdef)[source]

Add a registered storm service to the cortex.

async addTagProp(name, tdef, info)[source]
async addUnivProp(name, tdef, info)[source]
async addUserNotif(useriden, mesgtype, mesgdata=None)[source]
async addView(vdef, nexs=True)[source]
async bumpStormDmon(iden)[source]
async callStorm(text, opts=None)[source]
cellapi

alias of synapse.cortex.CoreApi

async cloneLayer(iden, ldef=None)[source]

Make a copy of a Layer in the cortex.

Parameters
  • iden (str) – Layer iden to clone

  • ldef (Optional[Dict]) – Layer configuration overrides

Note

This should only be called with a reasonably static Cortex due to possible races.

confbase = {'_log_conf': {'description': 'Opaque structure used for logging by spawned processes.', 'hideconf': True, 'type': 'object'}, 'aha:admin': {'description': 'An AHA client certificate CN to register as a local admin user.', 'type': 'string'}, 'aha:leader': {'description': 'The AHA service name to claim as the active instance of a storm service.', 'type': 'string'}, 'aha:name': {'description': 'The name of the cell service in the aha service registry.', 'type': 'string'}, 'aha:network': {'description': 'The AHA service network. This makes aha:name/aha:leader relative names.', 'type': 'string'}, 'aha:registry': {'description': 'The telepath URL of the aha service registry.', 'items': {'type': 'string'}, 'type': ['string', 'array']}, 'aha:svcinfo': {'description': 'An AHA svcinfo object. If set, this overrides self discovered Aha service information.', 'properties': {'urlinfo': {'properties': {'host': {'type': 'string'}, 'port': {'type': 'integer'}, 'schema': {'type': 'string'}}, 'required': ('host', 'port', 'scheme'), 'type': 'object'}}, 'required': ('urlinfo',), 'type': 'object'}, 'auth:anon': {'description': 'Allow anonymous telepath access by mapping to the given user name.', 'type': 'string'}, 'auth:conf': {'description': 'Extended configuration to be used by an alternate auth constructor.', 'hideconf': True, 'type': 'object'}, 'auth:ctor': {'description': 'Allow the construction of the cell auth object to be hooked at runtime.', 'hideconf': True, 'type': 'string'}, 'auth:passwd': {'description': 'Set to <passwd> (local only) to bootstrap the root user password.', 'type': 'string'}, 'backup:dir': {'description': 'A directory outside the service directory where backups will be saved. Defaults to ./backups in the service storage directory.', 'type': 'string'}, 'cell:ctor': {'description': 'An optional python path to the Cell class.  Used by stemcell.', 'type': 'string'}, 'cell:guid': {'description': 'An optional hard-coded GUID to store as the permanent GUID for the cell.', 'type': 'string'}, 'dmon:listen': {'description': 'A config-driven way to specify the telepath bind URL.', 'type': ['string', 'null']}, 'https:headers': {'description': 'Headers to add to all HTTPS server responses.', 'hidecmdl': True, 'type': 'object'}, 'https:port': {'description': 'A config-driven way to specify the HTTPS port.', 'type': ['integer', 'null']}, 'inaugural': {'defs': {'role': {'additionalProperties': False, 'properties': {'name': {'pattern': '^(?!all$).+$', 'type': 'string'}, 'rules': {'items': {'$ref': '#/properties/inaugural/defs/rule'}, 'type': 'array'}}, 'required': ['name'], 'type': 'object'}, 'rule': {'items': [{'type': 'boolean'}, {'type': 'array', 'items': {'type': 'string'}}], 'maxItems': 2, 'minItems': 2, 'type': 'array'}, 'user': {'additionalProperties': False, 'properties': {'admin': {'default': False, 'type': 'boolean'}, 'email': {'type': 'string'}, 'name': {'pattern': '^(?!root$).+$', 'type': 'string'}, 'roles': {'items': {'type': 'string'}, 'type': 'array'}, 'rules': {'items': {'$ref': '#/properties/inaugural/defs/rule'}, 'type': 'array'}}, 'required': ['name'], 'type': 'object'}}, 'description': 'Data used to drive configuration of the Cell upon first startup.', 'properties': {'roles': {'items': {'$ref': '#/properties/inaugural/defs/role'}, 'type': 'array'}, 'users': {'items': {'$ref': '#/properties/inaugural/defs/user'}, 'type': 'array'}}, 'type': 'object'}, 'mirror': {'description': 'A telepath URL for our upstream mirror (we must be a backup!).', 'hidecmdl': False, 'hidedocs': False, 'type': 'string'}, 'nexslog:async': {'default': False, 'description': '(Experimental) Map the nexus log LMDB instance with map_async=True.', 'type': 'boolean'}, 'nexslog:en': {'default': True, 'description': 'Record all changes to the cell.  Required for mirroring (on both sides).', 'type': 'boolean'}}
confdefs = {'axon': {'description': 'A telepath URL for a remote axon.', 'type': 'string'}, 'cron:enable': {'default': True, 'description': 'Enable cron jobs running.', 'type': 'boolean'}, 'http:proxy': {'description': 'An aiohttp-socks compatible proxy URL to use storm HTTP API.', 'type': 'string'}, 'jsonstor': {'description': 'A telepath URL for a remote jsonstor.', 'type': 'string'}, 'layer:lmdb:map_async': {'default': True, 'description': 'Set the default lmdb:map_async value in LMDB layers.', 'type': 'boolean'}, 'layer:lmdb:max_replay_log': {'default': 10000, 'description': 'Set the max size of the replay log for all layers.', 'type': 'integer'}, 'layers:lockmemory': {'default': False, 'description': 'Should new layers lock memory for performance by default.', 'type': 'boolean'}, 'layers:logedits': {'default': True, 'description': 'Whether nodeedits are logged in each layer.', 'type': 'boolean'}, 'max:nodes': {'description': 'Maximum number of nodes which are allowed to be stored in a Cortex.', 'hidecmdl': True, 'minimum': 1, 'type': 'integer'}, 'modules': {'default': [], 'description': 'A list of module classes to load.', 'type': 'array'}, 'provenance:en': {'default': False, 'description': 'Enable provenance tracking for all writes.', 'type': 'boolean'}, 'storm:interface:scrape': {'default': True, 'description': 'Enable Storm scrape interfaces when using $lib.scrape APIs.', 'type': 'boolean'}, 'storm:interface:search': {'default': True, 'description': 'Enable Storm search interfaces for lookup mode.', 'type': 'boolean'}, 'storm:log': {'default': False, 'description': 'Log storm queries via system logger.', 'type': 'boolean'}, 'storm:log:level': {'default': 'INFO', 'description': 'Logging log level to emit storm logs at.', 'type': ['integer', 'string']}, 'tls:ca:dir': {'description': 'An optional directory of CAs which are added to the TLS CA chain for Storm HTTP API calls.', 'type': 'string'}, 'trigger:enable': {'default': True, 'description': 'Enable triggers running.', 'type': 'boolean'}}
async coreQueueCull(name, offs)[source]
async coreQueueGet(name, offs=0, cull=True, wait=False)[source]
async coreQueueGets(name, offs=0, cull=True, wait=False, size=None)[source]
async coreQueuePop(name, offs)[source]
async coreQueuePuts(name, items)[source]
async coreQueueSize(name)[source]
async count(text, opts=None)[source]
async delCoreQueue(name)[source]
async delCronJob(iden)[source]

Delete a cron job

Parameters

iden (bytes) – The iden of the cron job to be deleted

async delForm(formname)[source]
async delFormProp(form, prop)[source]
async delJsonObj(path)[source]
async delJsonObjProp(path, prop)[source]
async delLayer(iden)[source]
async delLayrPull(layriden, pulliden)[source]
async delLayrPush(layriden, pushiden)[source]
async delNodeTag(user, iden, tag)[source]

Delete a tag from the node specified by iden.

Parameters
  • iden (str) – A hex encoded node BUID.

  • tag (str) – A tag string.

async delStormCmd(name)[source]

Remove a previously set pure storm command.

async delStormDmon(iden)[source]

Stop and remove a storm dmon.

async delStormPkg(name)[source]
async delStormSvc(iden)[source]
async delTagModel(tagname)[source]

Delete all the model specification properties for a tag.

Parameters

tagname (str) – The name of the tag.

async delTagProp(name)[source]
async delUnivProp(prop)[source]
async delUserNotif(indx)[source]
async delView(iden)[source]
async disableCronJob(iden)[source]

Enable a cron job

Parameters

iden (bytes) – The iden of the cron job to be changed

async disableStormDmon(iden)[source]
async editCronJob(iden, name, valu)[source]

Modify a cron job definition.

async enableCronJob(iden)[source]

Enable a cron job

Parameters

iden (bytes) – The iden of the cron job to be changed

async enableStormDmon(iden)[source]
async eval(text, opts=None)[source]

Evaluate a storm query and yield packed nodes.

NOTE: This API is deprecated as of 2.0.0 and will be removed in 3.0.0

async exportStorm(text, opts=None)[source]
async exportStormToAxon(text, opts=None)[source]
async feedFromAxon(sha256, opts=None)[source]
async getAxon()[source]
async getCellApi(link, user, path)[source]
getCoreInfo()[source]
async getCoreInfoV2()[source]
getCoreMod(name)[source]
getCoreMods()[source]
async getCoreQueue(name)[source]
getDataModel()[source]
async getDeprLocks()[source]

Return a dictionary of deprecated properties and their lock status.

getFeedFunc(name)[source]

Get a data ingest function.

async getFeedFuncs()[source]
async getFormCounts()[source]

Return total form counts for all existing layers

async getJsonObj(path)[source]
async getJsonObjProp(path, prop)[source]
async getJsonObjs(path)[source]
getLayer(iden=None)[source]

Get a Layer object.

Parameters

iden (str) – The layer iden to retrieve.

Returns

A Layer object.

Return type

Layer

async getLayerDef(iden=None)[source]
async getLayerDefs()[source]
async getModelDefs()[source]
async getModelDict()[source]
async getNodeByNdef(ndef, view=None)[source]

Return a single Node() instance by (form,valu) tuple.

async getPermDef(perm)[source]
async getPermDefs()[source]
async getPropNorm(prop, valu)[source]

Get the normalized property value based on the Cortex data model.

Parameters
  • prop (str) – The property to normalize.

  • valu – The value to normalize.

Returns

A two item tuple, containing the normed value and the info dictionary.

Return type

(tuple)

Raises
  • s_exc.NoSuchProp – If the prop does not exist.

  • s_exc.BadTypeValu – If the value fails to normalize.

getStormCmd(name)[source]
getStormCmds()[source]
async getStormDmon(iden)[source]
async getStormDmonLog(iden)[source]
async getStormDmons()[source]
async getStormDocs()[source]

Get a struct containing the Storm Types documentation.

Returns

A Dictionary of storm documentation information.

Return type

dict

async getStormIfaces(name)[source]
getStormLib(path)[source]
async getStormMod(name, reqvers=None)[source]
async getStormMods()[source]
async getStormPkg(name)[source]
async getStormPkgs()[source]
async getStormQuery(text, mode='storm')[source]
getStormRuntime(query, opts=None)[source]
getStormSvc(name)[source]
getStormSvcs()[source]
async getStormVar(name, default=None)[source]
async getTagModel(tagname)[source]

Retrieve the tag model specification for a tag.

Returns

The tag model specification or None.

Return type

(dict)

async getTagPrune(tagname)[source]
async getTypeNorm(name, valu)[source]

Get the normalized type value based on the Cortex data model.

Parameters
  • name (str) – The type to normalize.

  • valu – The value to normalize.

Returns

A two item tuple, containing the normed value and the info dictionary.

Return type

(tuple)

Raises
  • s_exc.NoSuchType – If the type does not exist.

  • s_exc.BadTypeValu – If the value fails to normalize.

async getUserNotif(indx)[source]
getView(iden=None, user=None)[source]

Get a View object.

Parameters

iden (str) – The View iden to retrieve.

Returns

A View object.

Return type

View

async getViewDef(iden)[source]
async getViewDefs(deporder=False)[source]
async hasJsonObj(path)[source]
hiveapi

alias of synapse.lib.hive.HiveApi

async initServiceActive()[source]
async initServicePassive()[source]
async initServiceRuntime()[source]
async initServiceStorage()[source]
isTagValid(tagname)[source]

Check if a tag name is valid according to tag model regular expressions.

Returns

True if the tag is valid.

Return type

(bool)

async itemsStormVar()[source]
async iterFormRows(layriden, form, stortype=None, startvalu=None)[source]

Yields buid, valu tuples of nodes of a single form, optionally (re)starting at startvalu.

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • form (str) – A form name.

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

async iterPropRows(layriden, form, prop, stortype=None, startvalu=None)[source]

Yields buid, valu tuples of nodes with a particular secondary property, optionally (re)starting at startvalu.

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • form (str) – A form name.

  • prop (str) – A universal property name.

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

async iterTagPropRows(layriden, tag, prop, form=None, stortype=None, startvalu=None)[source]

Yields (buid, valu) that match a tag:prop, optionally (re)starting at startvalu.

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • tag (str) – tag name

  • prop (str) – prop name

  • form (Optional[str]) – optional form name

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

async iterTagRows(layriden, tag, form=None, starttupl=None)[source]

Yields (buid, (valu, form)) values that match a tag and optional form, optionally (re)starting at starttupl.

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • tag (str) – the tag to match

  • form (Optional[str]) – if present, only yields buids of nodes that match the form.

  • starttupl (Optional[Tuple[buid, form]]) – if present, (re)starts the stream of values there.

Returns

AsyncIterator[Tuple(buid, (valu, form))]

Note

This yields (buid, (tagvalu, form)) instead of just buid, valu in order to allow resuming an interrupted call by feeding the last value retrieved into starttupl

async iterUnivRows(layriden, prop, stortype=None, startvalu=None)[source]

Yields buid, valu tuples of nodes with a particular universal property, optionally (re)starting at startvalu.

Parameters
  • layriden (str) – Iden of the layer to retrieve the nodes

  • prop (str) – A universal property name.

  • stortype (Optional[int]) – a STOR_TYPE_* integer representing the type of form:prop

  • startvalu (Any) – The value to start at. May only be not None if stortype is not None.

Returns

AsyncIterator[Tuple(buid, valu)]

async iterUserNotifs(useriden, size=None)[source]
layerapi

alias of synapse.lib.layer.LayerApi

async classmethod layrctor(*args, **kwargs)
async listCoreQueues()[source]
async listCronJobs()[source]

Get information about all the cron jobs accessible to the current user

listLayers()[source]
async listTagModel()[source]

Retrieve a list of the tag model specifications.

Returns

A list of tag model specification tuples.

Return type

([(str, dict), …])

listViews()[source]
async loadCoreModule(ctor, conf=None)[source]

Load a single cortex module with the given ctor and conf.

Parameters
  • ctor (str) – The python module class path

  • conf (dict) – Config dictionary for the module

async loadStormPkg(pkgdef)[source]

Load a storm package into the storm library for this cortex.

NOTE: This will not persist the package (allowing service dynamism).

async moveCronJob(useriden, croniden, viewiden)[source]
async nodes(text, opts=None)[source]

A simple non-streaming way to return a list of nodes.

offTagAdd(name, func)[source]

Unregister a callback for tag addition.

Parameters
  • name (str) – The name of the tag or tag glob.

  • func (function) – The callback func(node, tagname, tagval).

offTagDel(name, func)[source]

Unregister a callback for tag deletion.

Parameters
  • name (str) – The name of the tag or tag glob.

  • func (function) – The callback func(node, tagname, tagval).

onTagAdd(name, func)[source]

Register a callback for tag addition.

Parameters
  • name (str) – The name of the tag or tag glob.

  • func (function) – The callback func(node, tagname, tagval).

onTagDel(name, func)[source]

Register a callback for tag deletion.

Parameters
  • name (str) – The name of the tag or tag glob.

  • func (function) – The callback func(node, tagname, tagval).

async popStormVar(name, default=None)[source]
async popTagModel(tagname, name)[source]

Pop a property from the model specification of a tag.

Parameters
  • tagname (str) – The name of the tag.

  • name (str) – The name of the specification property.

Returns

The current value of the property.

Return type

(object)

async reqValidStorm(text, opts=None)[source]

Parse a storm query to validate it.

Parameters
  • text (str) – The text of the Storm query to parse.

  • opts (dict) – A Storm options dictionary.

Returns

If the query is valid.

Return type

True

Raises

BadSyntaxError – If the query is invalid.

async runLayrPull(layr, pdef)[source]
async runLayrPush(layr, pdef)[source]
async runRuntLift(full, valu=None, cmpr=None, view=None)[source]

Execute a runt lift function.

Parameters
  • full (str) – Property to lift by.

  • valu

  • cmpr

Returns

Yields bytes, list tuples where the list contains a series of

key/value pairs which are used to construct a Node object.

Return type

bytes, list

async runRuntPropDel(node, prop)[source]
async runRuntPropSet(node, prop, valu)[source]
async runStormDmon(iden, ddef)[source]
async runStormSvcEvent(iden, name)[source]
async saveLayerNodeEdits(layriden, edits, meta)[source]
async setDeprLock(name, locked)[source]
setFeedFunc(name, func)[source]

Set a data ingest function.

def func(snap, items):

loaditems…

async setJsonObj(path, item)[source]
async setJsonObjProp(path, prop, item)[source]
async setStormCmd(cdef)[source]
async setStormSvcEvents(iden, edef)[source]

Set the event callbacks for a storm service. Extends the sdef dict.

Parameters
  • iden (str) – The service iden.

  • edef (dict) – The events definition.

Notes

The edef is formatted like the following:

{
    <name> : {
        'storm': <storm>
    }
}

where name is one of the following items:

add

Run the given storm ‘before the service is first added (a la service.add), but not on a reconnect.

del

Run the given storm after the service is removed (a la service.del), but not on a disconnect.

Returns

An updated storm service definition dictionary.

Return type

dict

async setStormVar(name, valu)[source]
async setTagModel(tagname, name, valu)[source]

Set a model specification property for a tag.

Parameters
  • tagname (str) – The name of the tag.

  • name (str) – The name of the property.

  • valu (object) – The value of the property.

Tag Model Properties:

regex - A list of None or regular expression strings to match each tag level. prune - A number that determines how many levels of pruning are desired.

Examples

await core.setTagModel(“cno.cve”, “regex”, (None, None, “[0-9]{4}”, “[0-9]{5}”))

async setUserLocked(iden, locked)[source]
async setViewLayers(layers, iden=None)[source]
Parameters
  • layers ([str]) – A top-down list of of layer guids

  • iden (str) – The view iden (defaults to default view).

async snap(user=None, view=None)[source]

Return a transaction object for the default view.

Parameters
  • user (str) – The user to get the snap for.

  • view (View) – View object to use when making the snap.

Notes

This must be used as an asynchronous context manager.

Returns

A Snap object for the view.

Return type

s_snap.Snap

async spliceHistory(user)[source]

Yield splices backwards from the end of the nodeedit log.

Will only return user’s own splices unless they are an admin.

async stat()[source]
async storm(text, opts=None)[source]
async stormlist(text, opts=None)[source]
async syncIndexEvents(matchdef, offsdict=None, wait=True)[source]

Yield (offs, layriden, <STYPE>, <item>) tuples from the nodeedit logs of all layers starting from the given nexus/layer offset (they are synchronized). Only edits that match the filter in matchdef will be yielded, plus EDIT_PROGRESS (see layer.syncIndexEvents) messages.

The format of the 4th element of the tuple depends on STYPE. STYPE is one of the following constants:

SYNC_LAYR_ADD: item is an empty tuple () SYNC_LAYR_DEL: item is an empty tuple () SYNC_NODEEDIT: item is (buid, form, ETYPE, VALS, META)) or (None, None, s_layer.EDIT_PROGRESS, (), ())

For edits in the past, events are yielded in offset order across all layers. For current data (wait=True), events across different layers may be emitted slightly out of offset order.

Note

Will not yield any values from layers created with logedits disabled

Parameters
  • matchdef (Dict[str, Sequence[str]]) – a dict describing which events are yielded. See layer.syncIndexEvents for matchdef specification.

  • offsdict (Optional(Dict[str,int])) – starting nexus/editlog offset by layer iden. Defaults to 0 for unspecified layers or if offsdict is None.

  • wait (bool) – whether to pend and stream value until this layer is fini’d

async syncLayerNodeEdits(iden, offs, wait=True)[source]

Yield (offs, mesg) tuples for nodeedits in a layer.

async syncLayersEvents(offsdict=None, wait=True)[source]

Yield (offs, layriden, STYP, item, meta) tuples for nodeedits for all layers, interspersed with add/del layer messages.

STYP is one of the following constants:

SYNC_NODEEDITS: item is a nodeedits (buid, form, edits) SYNC_LAYR_ADD: A layer was added (item and meta are empty) SYNC_LAYR_DEL: A layer was deleted (item and meta are empty)

Parameters
  • offsdict (Optional(Dict[str,int])) – starting nexus/editlog offset by layer iden. Defaults to 0 for unspecified layers or if offsdict is None.

  • wait (bool) – whether to pend and stream value until this layer is fini’d

async updateCronJob(iden, query)[source]

Change an existing cron job’s query

Parameters

iden (bytes) – The iden of the cron job to be changed

viewapi

alias of synapse.lib.view.ViewApi

async classmethod viewctor(*args, **kwargs)
async waitStormSvc(name, timeout=None)[source]
async watch(wdef)[source]

Hook cortex/view/layer watch points based on a specified watch definition. ( see CoreApi.watch() docs for details )

async watchAllUserNotifs(offs=None)[source]
watcher(wdef)[source]
synapse.cortex.cmprkey_buid(x)[source]
synapse.cortex.cmprkey_indx(x)[source]
synapse.cortex.getTempCortex(mods=None)[source]

Get a proxy to a cortex backed by a temporary directory.

Parameters

mods (list) – A list of modules which are loaded into the cortex.

Notes

The cortex and temporary directory are town down on exit. This should only be called from synchronous code.

Returns

Proxy to the cortex.

synapse.cortex.stormlogger = <Logger synapse.storm (WARNING)>

A Cortex implements the synapse hypergraph object.

async synapse.cortex.wrap_liftgenr(iden, genr)[source]

synapse.cryotank module

class synapse.cryotank.CryoApi[source]

Bases: synapse.lib.cell.CellApi

The CryoCell API as seen by a telepath proxy.

This is the API to reference for remote CryoCell use.

delete(name)[source]
async init(name, conf=None)[source]
async last(name)[source]
async list()[source]
async metrics(name, offs, size=None)[source]
async offset(name, iden)[source]
async puts(name, items, seqn=None)[source]
async rows(name, offs, size, iden=None)[source]
async slice(name, offs, size=None, iden=None)[source]
class synapse.cryotank.CryoCell[source]

Bases: synapse.lib.cell.Cell

cellapi

alias of synapse.cryotank.CryoApi

async delete(name)[source]
async getCellApi(link, user, path)[source]
classmethod getEnvPrefix()[source]
async init(name, conf=None)[source]

Generate a new CryoTank with a given name or get an reference to an existing CryoTank.

Parameters

name (str) – Name of the CryoTank.

Returns

A CryoTank instance.

Return type

CryoTank

async list()[source]

Get a list of (name, info) tuples for the CryoTanks.

Returns

A list of tufos.

Return type

list

tankapi

alias of synapse.cryotank.TankApi

class synapse.cryotank.CryoTank[source]

Bases: synapse.lib.base.Base

A CryoTank implements a stream of structured data.

getOffset(iden)[source]
async iden()[source]
async info()[source]

Returns information about the CryoTank instance.

Returns

A dict containing items and metrics indexes.

Return type

dict

last()[source]

Return an (offset, item) tuple for the last element in the tank ( or None ).

async metrics(offs, size=None)[source]

Yield metrics rows starting at offset.

Parameters
  • offs (int) – The index offset.

  • size (int) – The maximum number of records to yield.

Yields

((int, dict)) – An index offset, info tuple for metrics.

async puts(items, seqn=None)[source]

Add the structured data from items to the CryoTank.

Parameters
  • items (list) – A list of objects to store in the CryoTank.

  • seqn (iden, offs) – An iden / offset pair to record.

Returns

The ending offset of the items or seqn.

Return type

int

async rows(offs, size=None, iden=None)[source]

Yield a number of raw items from the CryoTank starting at a given offset.

Parameters
  • offs (int) – The index of the desired datum (starts at 0)

  • size (int) – The max number of items to yield.

Yields

((indx, bytes)) – Index and msgpacked bytes.

setOffset(iden, offs)[source]
async slice(offs, size=None, iden=None)[source]

Yield a number of items from the CryoTank starting at a given offset.

Parameters
  • offs (int) – The index of the desired datum (starts at 0)

  • size (int) – The max number of items to yield.

Yields

((index, object)) – Index and item values.

class synapse.cryotank.TankApi[source]

Bases: synapse.lib.cell.CellApi

async iden()[source]
async metrics(offs, size=None)[source]
async offset(iden)[source]
async puts(items, seqn=None)[source]
async slice(offs, size=None, iden=None)[source]

synapse.daemon module

class synapse.daemon.AsyncGenr[source]

Bases: synapse.lib.share.Share

typename = 'genr'
class synapse.daemon.Daemon[source]

Bases: synapse.lib.base.Base

async getSessInfo()[source]
async listen(url, **opts)[source]

Bind and listen on the given host/port with possible SSL.

Parameters
  • host (str) – A hostname or IP address.

  • port (int) – The TCP port to bind.

async setReady(ready)[source]
share(name, item)[source]

Share an object via the telepath protocol.

Parameters
  • name (str) – Name of the shared object

  • item (object) – The object to share over telepath.

class synapse.daemon.Genr[source]

Bases: synapse.lib.share.Share

typename = 'genr'
class synapse.daemon.Sess[source]

Bases: synapse.lib.base.Base

getSessItem(name)[source]
pack()[source]
popSessItem(name)[source]
setSessItem(name, item)[source]
async synapse.daemon.t2call(link, meth, args, kwargs)[source]

Call the given meth(*args, **kwargs) and handle the response to provide telepath task v2 events to the given link.

synapse.datamodel module

An API to assist with the creation and enforcement of cortex data models.

class synapse.datamodel.Form(modl, name, info)[source]

Bases: object

The Form class implements data model logic for a node form.

delProp(name)[source]
getFormDef()[source]
getRefsOut()[source]
getStorNode(form)[source]
offAdd(func)[source]

Unregister a callback for tag addition.

Parameters
  • name (str) – The name of the tag.

  • func (function) – The callback func(node)

onAdd(func)[source]

Add a callback for adding this type of node.

The callback is executed after node construction.

Parameters

func (function) – A callback func(node)

def func(xact, node):

dostuff()

onDel(func)[source]
pack()[source]
prop(name: str)[source]

Return a secondary property for this form by relative prop name.

Parameters

name (str) – The relative property name.

Returns

The property or None.

Return type

(synapse.datamodel.Prop)

setProp(name, prop)[source]
async wasAdded(node)[source]

Fire the onAdd() callbacks for node creation.

async wasDeleted(node)[source]

Fire the onDel() callbacks for node deletion.

class synapse.datamodel.Model[source]

Bases: object

The data model used by a Cortex hypergraph.

addBaseType(item)[source]

Add a Type instance to the data model.

addDataModels(mods)[source]

Add a list of (name, mdef) tuples.

A model definition (mdef) is structured as follows:

{
    "ctors":(
        ('name', 'class.path.ctor', {}, {'doc': 'The foo thing.'}),
    ),

    "types":(
        ('name', ('basetype', {typeopts}), {info}),
    ),

    "forms":(
        (formname, (typename, typeopts), {info}, (
            (propname, (typename, typeopts), {info}),
        )),
    ),
    "univs":(
        (propname, (typename, typeopts), {info}),
    )
    "tagprops":(
        (tagpropname, (typename, typeopts), {info}),
    )
    "interfaces":(
        (ifacename, {
            'props': ((propname, (typename, typeopts), {info}),),
            'doc': docstr,
            'interfaces': (ifacename,)
        }),
    )
}
Parameters

mods (list) – The list of tuples.

Returns

None

addForm(formname, forminfo, propdefs)[source]
addFormProp(formname, propname, tdef, info)[source]
addIface(name, info)[source]
addTagProp(name, tdef, info)[source]
addType(typename, basename, typeopts, typeinfo)[source]
addUnivProp(name, tdef, info)[source]
delForm(formname)[source]
delFormProp(formname, propname)[source]
delTagProp(name)[source]
delType(typename)[source]
delUnivProp(propname)[source]
form(name)[source]
getModelDefs()[source]
Returns

A list of one model definition compatible with addDataModels that represents the current data model

getModelDict()[source]
getProps()[source]
getPropsByType(name)[source]
getTagProp(name)[source]
getTypeClone(typedef)[source]
prop(name)[source]
tagprop(name)[source]
type(name)[source]

Return a synapse.lib.types.Type by name.

univ(name)[source]
class synapse.datamodel.Prop(modl, form, name, typedef, info)[source]

Bases: object

The Prop class represents a property defined within the data model.

getCompOffs()[source]

Return the offset of this field within the compound primary prop or None.

getPropDef()[source]
getStorNode(form)[source]
onDel(func)[source]

Add a callback for deleting this property.

The callback is executed after the property is deleted.

Parameters

func (function) – A prop del callback.

The callback is called within the current transaction, with the node, and the old property value (or None).

def func(node, oldv):

dostuff()

onSet(func)[source]

Add a callback for setting this property.

The callback is executed after the property is set.

Parameters

func (function) – A prop set callback.

The callback is called within the current transaction, with the node, and the old property value (or None).

def func(node, oldv):

dostuff()

pack()[source]
async wasDel(node, oldv)[source]
async wasSet(node, oldv)[source]

Fire the onset() handlers for this property.

Parameters
  • node (synapse.lib.node.Node) – The node whose property was set.

  • oldv (obj) – The previous value of the property.

class synapse.datamodel.TagProp(model, name, tdef, info)[source]

Bases: object

getStorNode(form)[source]
getTagPropDef()[source]
pack()[source]

synapse.exc module

Exceptions used by synapse, all inheriting from SynErr

exception synapse.exc.AuthDeny(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BackupAlreadyRunning(*args, **info)[source]

Bases: synapse.exc.SynErr

Only one backup may be running at a time

exception synapse.exc.BadArg(*args, **info)[source]

Bases: synapse.exc.SynErr

Improper function arguments

exception synapse.exc.BadCast(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadCertHost(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadCmdName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadCmprType(*args, **info)[source]

Bases: synapse.exc.SynErr

Attempt to compare two incomparable values

exception synapse.exc.BadCmprValu(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadConfValu(*args, **info)[source]

Bases: synapse.exc.SynErr

The configuration value provided is not valid.

This should contain the config name, valu and mesg.

exception synapse.exc.BadCoreStore(*args, **info)[source]

Bases: synapse.exc.SynErr

The storage layer has encountered an error

exception synapse.exc.BadCtorType(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadEccExchange(*args, **info)[source]

Bases: synapse.exc.CryptoErr

Raised when there is an issue doing a ECC Key Exchange

exception synapse.exc.BadFileExt(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadFormDef(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadHivePath(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadIndxValu(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadJsonText(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadLiftValu(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadMesgFormat(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadMesgVers(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadOperArg(*args, **info)[source]

Bases: synapse.exc.SynErr

Improper storm function arguments

exception synapse.exc.BadOptValu(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadPkgDef(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadPropDef(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadStorageVersion(*args, **info)[source]

Bases: synapse.exc.SynErr

Stored persistent data is incompatible with running software

exception synapse.exc.BadSyntax(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadTag(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadTime(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadTypeDef(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadTypeValu(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadUrl(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.BadVersion(*args, **info)[source]

Bases: synapse.exc.SynErr

Generic Bad Version exception.

exception synapse.exc.CantDelCmd(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CantDelForm(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CantDelNode(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CantDelProp(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CantDelType(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CantDelUniv(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CantMergeView(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CantRevLayer(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.CliFini(*args, **info)[source]

Bases: synapse.exc.SynErr

Raised when the CLI is to exit.

exception synapse.exc.CryptoErr(*args, **info)[source]

Bases: synapse.exc.SynErr

Raised when there is a synapse.lib.crypto error.

exception synapse.exc.DataAlreadyExists(*args, **info)[source]

Bases: synapse.exc.SynErr

Cannot copy data to a location that already contains data

exception synapse.exc.DbOutOfSpace(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DmonSpawn(*args, **info)[source]

Bases: synapse.exc.SynErr

Raised by a dispatched telepath method that has answered the call using a spawned process. ( control flow that is compatible with aborting standard calls, generators, and async generators ).

exception synapse.exc.DupFileName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupFormName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupIden(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupIndx(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupPropName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupRoleName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupStormSvc(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupTagPropName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.DupUserName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.FatalErr(*args, **info)[source]

Bases: synapse.exc.SynErr

Raised when a fatal error has occurred which an application cannot recover from.

exception synapse.exc.FeatureNotSupported(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.FileExists(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.HitLimit(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.InconsistentStorage(*args, **info)[source]

Bases: synapse.exc.SynErr

Stored persistent data is inconsistent

exception synapse.exc.IsDeprLocked(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.IsFini(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.IsReadOnly(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.IsRuntForm(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.LayerInUse(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.LinkErr(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.LinkShutDown(*args, **info)[source]

Bases: synapse.exc.LinkErr

exception synapse.exc.LmdbLock(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.ModAlreadyLoaded(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.MustBeJsonSafe(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NeedConfValu(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoCertKey(*args, **info)[source]

Bases: synapse.exc.SynErr

Raised when a Cert object requires a RSA Private Key to perform an operation and the key is not present.

exception synapse.exc.NoSuchAbrv(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchAct(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchAuthGate(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchCert(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchCmd(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchCmpr(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchCond(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchCtor(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchDecoder(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchDir(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchDyn(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchEncoder(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchFile(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchForm(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchFunc(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchIden(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchImpl(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchIndx(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchLayer(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchLift(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchMeth(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchName(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchObj(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchOpt(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchPath(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchPivot(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchPkg(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchProp(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchRole(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchStor(name)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchStormSvc(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchTagProp(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchType(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchUniv(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchUser(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchVar(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NoSuchView(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NotANumberCompared(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NotMsgpackSafe(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.NotReady(*args, **info)[source]

Bases: synapse.exc.Retry

exception synapse.exc.ParserExit(*args, **info)[source]

Bases: synapse.exc.SynErr

Raised by synapse.lib.cmd.Parser on Parser exit()

exception synapse.exc.PathExists(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.ReadOnlyLayer(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.ReadOnlyProp(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.RecursionLimitHit(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.Retry(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.SchemaViolation(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.SlabAlreadyOpen(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.SlabInUse(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.SpawnExit(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.StepTimeout(*args, **info)[source]

Bases: synapse.exc.SynErr

Raised when a TestStep.wait() call times out.

exception synapse.exc.StormPkgConflicts(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.StormPkgRequires(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.StormRaise(name, mesg, info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.StormRuntimeError(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.StormVarListError(*args, **info)[source]

Bases: synapse.exc.StormRuntimeError

exception synapse.exc.SynErr(*args, **info)[source]

Bases: Exception

get(name, defv=None)[source]

Return a value from the errinfo dict.

Example

try:

foothing()

except SynErr as e:

blah = e.get(‘blah’)

items()[source]
exception synapse.exc.TeleRedir(*args, **info)[source]

Bases: synapse.exc.SynErr

exception synapse.exc.TimeOut(*args, **info)[source]

Bases: synapse.exc.SynErr

synapse.glob module

synapse.glob.iAmLoop()[source]
synapse.glob.initloop()[source]
synapse.glob.setGreedCoro(loop: asyncio.events.AbstractEventLoop)[source]
synapse.glob.sync(coro, timeout=None)[source]

Schedule a coroutine to run on the global loop and return it’s result.

Parameters

coro (coroutine) – The coroutine instance.

Notes

This API is thread safe and should only be called by non-loop threads.

synapse.glob.synchelp(f)[source]

The synchelp decorator allows the transparent execution of a coroutine using the global loop from a thread other than the event loop. In both use cases, the actual work is done by the global event loop.

Examples

Use as a decorator:

@s_glob.synchelp
async def stuff(x, y):
    await dostuff()

Calling the stuff function as regular async code using the standard await syntax:

valu = await stuff(x, y)

Calling the stuff function as regular sync code outside of the event loop thread:

valu = stuff(x, y)

synapse.mindmeld module

synapse.telepath module

An RMI framework for synapse.

class synapse.telepath.Aware[source]

Bases: object

The telepath.Aware mixin allows shared objects to handle individual links managed by the Daemon.

async getTeleApi(link, mesg, path)[source]

Return a shared object for this link. :param link: A network link. :type link: synapse.lib.link.Link :param mesg: The tele:syn handshake message. :type mesg: (str,dict)

onTeleShare(dmon, name)[source]
class synapse.telepath.Client[source]

Bases: synapse.lib.base.Base

A Telepath client object which reconnects and allows waiting for link up.

Notes

The conf data allows changing parameters such as timeouts, retry period, and link pool size. The default conf data can be seen below:

conf = {
    'timeout': 10,
    'retrysleep': 0.2,
    'link_poolsize': 4,
}
async proxy(timeout=10)[source]
async task(todo, name=None)[source]
async waitready(timeout=10)[source]
class synapse.telepath.Genr[source]

Bases: synapse.telepath.Share

class synapse.telepath.GenrIter(proxy, todo, share)[source]

Bases: object

An object to help delay a telepath call until iteration.

async list()[source]
class synapse.telepath.GenrMethod(proxy, name, share=None)[source]

Bases: synapse.telepath.Method

class synapse.telepath.Method(proxy, name, share=None)[source]

Bases: object

The telepath Method is used to provide proxy method calls.

class synapse.telepath.Pipeline[source]

Bases: synapse.lib.base.Base

class synapse.telepath.Proxy[source]

Bases: synapse.lib.base.Base

A telepath Proxy is used to call remote APIs on a shared object.

Example

import synapse.telepath as s_telepath

# open the “foo” object shared in a dmon on localhost:3344

async def doFooThing():

proxy = await s_telepath.openurl(’tcp://127.0.0.1:3344/foo’)

valu = await proxy.getFooValu(x, y)

The proxy (and openurl function) may also be used from sync code:

proxy = s_telepath.openurl(’tcp://127.0.0.1:3344/foo’)

valu = proxy.getFooValu(x, y)

async call(methname, *args, **kwargs)[source]

Call a remote method by name.

Parameters
  • methname (str) – The name of the remote method.

  • *args – Arguments to the method call.

  • **kwargs – Keyword arguments to the method call.

Most use cases will likely use the proxy methods directly:

The following two are effectively the same:

valu = proxy.getFooBar(x, y) valu = proxy.call(‘getFooBar’, x, y)

async getPipeline(genr, name=None)[source]

Construct a proxy API call pipeline in order to make multiple telepath API calls while minimizing round trips.

Parameters
  • genr (async generator) – An async generator that yields todo tuples.

  • name (str) – The name of the shared object on the daemon.

Example

def genr():

yield s_common.todo(‘getFooByBar’, 10) yield s_common.todo(‘getFooByBar’, 20)

for retn in proxy.getPipeline(genr()):

valu = s_common.result(retn)

async handshake(auth=None)[source]
async task(todo, name=None)[source]
async taskv2(todo, name=None)[source]
class synapse.telepath.Share[source]

Bases: synapse.lib.base.Base

The telepath client side of a dynamically shared object.

class synapse.telepath.Task[source]

Bases: object

A telepath Task is used to internally track calls/responses.

reply(retn)[source]
async result()[source]
async synapse.telepath.addAhaUrl(url)[source]

Add (incref) an aha registry URL.

NOTE: You may also add a list of redundant URLs.

synapse.telepath.alias(name)[source]

Resolve a telepath alias via ~/.syn/aliases.yaml

Parameters

name (str) – Name of the alias to resolve.

Notes

An exact match against the aliases will always be returned first. If no exact match is found and the name contains a ‘/’ in it, the value before the slash is looked up and the remainder of the path is joined to any result. This is done to support dynamic Telepath share names.

Returns

The url string, if present in the alias. None will be returned if there are no matches.

Return type

str

synapse.telepath.chopurl(url, **opts)[source]
async synapse.telepath.delAhaUrl(url)[source]

Remove (decref) an aha registry URL.

NOTE: You may also remove a list of redundant URLs.

async synapse.telepath.disc_consul(info)[source]

Support for updating a URL info dictionary which came from a protocol+consul:// URL.

Notes

This updates the info-dictionary in place, placing the host value into an original_host key, and updating host and port.

By default we pull the host value from the catalog Address value, and the port from the ServicePort value.

The following HTTP parameters are supported:

  • consul: This is the consul host (schema, fqdn and port) to connect to.

  • consul_tag: If set, iterate through the catalog results until a result is found which matches the tag value. This is a case sensitive match.

  • consul_tag_address: If set, prefer the TaggedAddresses from the catalog.

  • consul_service_tag_address: If set, prefer the associated value from the ServiceTaggedAddresses field.

  • consul_nosslverify: If set, disables SSL verification.

async synapse.telepath.getAhaProxy(urlinfo)[source]

Return a telepath proxy by looking up a host from an aha registry.

async synapse.telepath.loadTeleEnv(path)[source]
synapse.telepath.mergeAhaInfo(info0, info1)[source]
async synapse.telepath.openinfo(info)[source]
synapse.telepath.zipurl(info)[source]

Reconstruct a URL string from a parsed telepath info dict.