Usage with Python3 Asyncio + DataLoader

Facebook created DataLoader library for batching and caching purposes to be used with any kind of service. Read more on https://github.com/facebook/dataloader

The above package is written for JavaScript. There is a port of DataLoader for Python too (https://github.com/syrusakbary/aiodataloader) which avails the new async/await syntax provided by Python3.5 for asyncio.

Flash has got support for making cache query using async/await syntax, recently.

await syntax

E.g. following code registers some cache classes

class User(Model):
    ...
    class CacheMeta:
        get_key_fields_list = [
            ('id',),
        ]
        filter_key_fields_list = [
            ('first_name',),
        ]

class EventCacheOnSlug(InstanceCache):
    model = Event
    key_fields = ('slug',)

Here is the syntax to use above classes with asyncio.

user = await User.cache.get_async(id=user_id)
users = await User.cache.filter_async(first_name=first_name)
event = await EventCacheOnSlug(event_slug).resolve_async()

If we hadn’t used asyncio, the code would have looked like

user = User.cache.get(id=user_id)
users = User.cache.filter(first_name=first_name)
event = EventCacheOnSlug(event_slug).resolve()

Like Model.cache.get_or_none() and Model.cache.get_or_404(), you can use Model.cache.get_async_or_none() and Model.cache.get_async_or_404() with asyncio.

The results get locally cached too for same query with await syntax.

user_id = 42
user1 = await User.cache.get_async(id=user_id)
user2 = await User.cache.get_async(id=user_id)

The result will get locally cached for first query. And while resolving second query it will be used from local cache instead of making network call.

Batching multiple queries

In earlier section, we had seen how to batch queries using BatchCacheQuery. With asyncio and dataloaders you need to use asyncio.gather function. Let’s say you have a list of user_ids and you want corresponding User instances then you can get them at once by

from asyncio import gather

users = await gather(*[
    User.cache.get_async(id=user_id) for user_id in user_ids
])

Multiple independent cache queries can also be gathered/batched together. E.g.

user, event = await gather(
    User.cache.get_async(id=user_id),
    EventCacheOnSlug(slug=event_slug).resolve_async()
)

The above code results in one network call for both queries.

While using gather, it will raise exception if any one of the cache query raises an exception (E.g. User.DoesNotExist). If you manually want to handle exceptions for individual queries, then pass return_exceptions=True while calling gather(). In this case, the result objects can be exception objects too. ( Read more on https://docs.python.org/3/library/asyncio-task.html#asyncio.gather ) E.g.

user_result, event_result = await gather(
    User.cache.get_async(id=user_id),
    EventCacheOnSlug(slug=event_slug).resolve_async(),
    return_exceptions=True,
)

if isinstance(user_result, Exception):
    # handle

if isinstance(event_result, Exception):
    # hanlde

Event loop

The async/await syntax works with coroutine functions only. And coroutines can be used inside other coroutines only. So it is advised to write your Django view as a coroutine function and apply run_in_async_loop decorator, that would be the starting point.

E.g. If your view was like

def my_view(request, event_slug, ...):
    ...
    data = get_data(event_slug)
    ...

def get_data(event_slug):
    event = EventCacheOnSlug(event_slug).resolve()
    return {
        'event': serialize_event(event),
    }

Change it too

from core.utils.asyncio import run_in_async_loop

@run_in_async_loop
async def my_view(request, event_slug, ...):
    ...
    data = await get_data(event_slug)
    ...

async def get_data(event_slug):
    event = await EventCacheOnSlug(event_slug).resolve_async()
    return {
        'event': serialize_event(event),
    }

If you are running workers with while loop, you can put this decorator on loop function (or callback method) and call that function inside loop.

The decorator is defined like:

import asyncio
from functools import wraps

from thread_context.dataloader_context import DataLoadersFactory


def run_in_async_loop(coroutine_func):
    @wraps(coroutine_func)
    def wrappped_func(*args, **kwargs):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop = asyncio.get_event_loop()
        DataLoadersFactory.reset()
        try:
            return loop.run_until_complete(
                coroutine_func(*args, **kwargs))
        finally:
            loop.close()
    return wrappped_func

You might have noticed that we haven’t used DataLoders explicilty. This is because we are using it implicitly using DataLoadersFactory. DataLoadersFactory.get_loader_for(DataLoaderSubclass) is used to get the same instance of defined DataLoaderSubclass for current thread. And it is necessary to call DataLoadersFactory.reset() otherwise locally cached results for any DataLoader subclass won’t ever get removed.

Dependencies

Flash has peer dependencies on packages aiodataloader and he-thread-context. So host project should install these python packages to use Flash’s asyncio functionality.