March cohort is now open: How to secure your spot:

How to ingest premium market data with Zipline Reloaded

How to ingest premium market data with Zipline Reloaded

This article explains how to build the two Python scripts you need to use premium data to create a custom data bundle using Zipline Reloaded.

Step 1: Subscribe to premium data

By now you should already have an account with Nasdaq Data Link. If not, head over to https://data.nasdaq.com and set one up.

You’re looking for QuoteMedia End of Day US Stock Prices. This product offers end-of-day prices, dividends, adjustments and splits for US publicly traded stocks with history to 1996. Prices are provided both adjusted and unadjusted. The product covers all stocks with primary listing on NASDAQ, AMEX, NYSE, and ARCA.

You can find the page to subscribe here: https://data.nasdaq.com/databases/EOD/data

Once subscribed, you’ll be able to use it through your API key.

Step 2: Create/Edit extension.py

Now we’ll create the two files we need to create the bundle.

For Windows users

In the .zipline directory, you will store the extension.py file, which informs Zipline about the custom data bundle.

  1. Open the File Explorer and navigate to your home directory. You should find the .zipline folder there. If you’re not sure where your home directory is, it’s usually C:\Users\[YourUsername].
  2. Open the .zipline folder.
  3. Right-click within the folder, select New, then choose Text Document. Rename the newly created file to extension.py. Make sure you change the file extension from .txt to .py.

Note: If you can’t see file extensions in your File Explorer, you’ll need to enable them. To do this, click on the View tab in File Explorer, and then check the box for File name extensions.

For Mac/Linux/Unix users

  1. Open Terminal: You can do this by searching for “Terminal” using Spotlight (Cmd + Space) on Mac or by accessing it from the Applications folder.
  2. Navigate to .zipline Directory: By default, the terminal opens in your home directory. To ensure you’re in the home directory and then navigate to the .zipline directory, you can use the following commands:cd ~ cd .zipline
  3. Create/Edit the extension.py File:
    • If the file doesn’t exist: You can create it using the touch command followed by opening it with a text editor of your choice. touch extension.py
    • If the file already exists: Simply open it with a text editor.

For all users

Within the editor, you can now proceed to input or edit the necessary content. In the file, add the following content:

import sys
from pathlib import Path
sys.path.append(Path("~", ".zipline").expanduser().as_posix())

from zipline.data.bundles import register

from daily_us_equities import daily_us_equities_bundle

register("quotemedia", daily_us_equities_bundle, calendar_name="XNYS")

Save and close the file

Step 3: Create the code to build the bundle

Use the instructions above to create a file called daily_us_equities.py.

In the file, add the following code exactly as is (do not alter!):

import time
from io import BytesIO
from zipfile import ZipFile

import numpy as np
import pandas as pd
import requests
from click import progressbar
from logbook import Logger
from six import iteritems
from six.moves.urllib.parse import urlencode

log = Logger(__name__)

DATA_START_DATE = "2000-01-01"
ONE_MEGABYTE = 1024 * 1024
DATALINK_DATA_URL = "https://data.nasdaq.com/api/v3/datatables/QUOTEMEDIA/PRICES"
MAX_DOWNLOAD_TRIES = 5


def format_metadata_url(api_key):
    """Build the query URL for Quandl WIKI Prices metadata."""
    columns = ",".join(
        [
            "ticker",
            "date",
            "open",
            "high",
            "low",
            "close",
            "volume",
            "dividend",
            "split",
        ]
    )

    query_params = [
        ("date.gte", DATA_START_DATE),
        ("api_key", api_key),
        ("qopts.export", "true"),
        ("qopts.columns", columns),
    ]
    return f"{DATALINK_DATA_URL}?{urlencode(query_params)}"


def fetch_download_link(table_url, max_download_tries=MAX_DOWNLOAD_TRIES):
    log.info(f"Attempting to fetch download link with ...")

    status = None
    cnt = 0

    while status != "fresh" and cnt < max_download_tries:
        log.info(f"Fetching download link...")
        try:
            resp = requests.get(table_url)
            resp.raise_for_status()
        except:
            log.info("Failed to get download link from Quandl")

        payload = resp.json()

        status = payload["datatable_bulk_download"]["file"]["status"]

        if status == "fresh":
            link = payload["datatable_bulk_download"]["file"]["link"]
            log.info(f"Status is {status}. Returning download link: {link}")
            return link

        log.info(f"Status is {status}. Retrying in 10 seconds...")

        time.sleep(10)


def load_data_table(file, index_col=None):
    """Load data table from zip file provided by Quandl."""
    with ZipFile(file) as zip_file:
        file_names = zip_file.namelist()
        assert len(file_names) == 1, "Expected a single file from Quandl."
        eod_prices = file_names.pop()
        with zip_file.open(eod_prices) as table_file:
            log.info("Parsing raw data.")
            data_table = pd.read_csv(
                table_file,
                header=0,
                names=[
                    "ticker",
                    "date",
                    "open",
                    "high",
                    "low",
                    "close",
                    "volume",
                    "dividend",
                    "split",
                ],
                parse_dates=["date"],
                index_col=index_col,
                usecols=[
                    "ticker",
                    "date",
                    "open",
                    "high",
                    "low",
                    "close",
                    "volume",
                    "dividend",
                    "split",
                ],
            ).rename(
                columns={
                    "ticker": "symbol",
                    "dividend": "ex_dividend",
                    "split": "split_ratio",
                }
            )

    return data_table


def fetch_data_table(api_key):
    """Fetch WIKI Prices data table from Quandl"""
    log.info(f"Fetching data table...")

    table_url = format_metadata_url(api_key)
    download_link = fetch_download_link(table_url)
    raw_file = download_with_progress(download_link, chunk_size=ONE_MEGABYTE)

    return load_data_table(file=raw_file)


def gen_asset_metadata(data, show_progress):
    if show_progress:
        log.info("Generating asset metadata.")

    data = data.groupby(by="symbol").agg({"date": ["min", "max"]})
    data.reset_index(inplace=True)
    data["start_date"] = data.date.min(axis=1)
    data["end_date"] = data.date.max(axis=1)
    del data["date"]
    data.columns = data.columns.get_level_values(0)

    data["exchange"] = "QUOTEMEDIA"
    data["auto_close_date"] = data["end_date"].values + pd.Timedelta(days=1)
    return data


def parse_splits(data, show_progress):
    if show_progress:
        log.info("Parsing split data.")

    data["split_ratio"] = 1.0 / data.split_ratio
    data.rename(
        columns={"split_ratio": "ratio", "date": "effective_date"},
        inplace=True,
        copy=False,
    )
    return data


def parse_dividends(data, show_progress):
    if show_progress:
        log.info("Parsing dividend data.")

    data["record_date"] = data["declared_date"] = data["pay_date"] = pd.NaT
    data.rename(
        columns={"ex_dividend": "amount", "date": "ex_date"}, inplace=True, copy=False
    )
    return data


def parse_pricing_and_vol(data, sessions, symbol_map):
    for asset_id, symbol in iteritems(symbol_map):
        asset_data = (
            data.xs(symbol, level=1).reindex(sessions.tz_localize(None)).fillna(0.0)
        )
        yield asset_id, asset_data


def daily_us_equities_bundle(
    environ,
    asset_db_writer,
    minute_bar_writer,
    daily_bar_writer,
    adjustment_writer,
    calendar,
    start_session,
    end_session,
    cache,
    show_progress,
    output_dir,
):
    """
    daily_us_equities_bundle builds a daily dataset using Quotemedia
    end of day equities data. For more information on the Quotemedia
    data see here: https://data.nasdaq.com/databases/EOD
    """
    api_key = environ.get("DATALINK_API_KEY")
    if api_key is None:
        raise ValueError(
            "Please set your DATALINK_API_KEY environment variable and retry."
        )

    raw_data = fetch_data_table(api_key)

    start_session, end_session = raw_data.date.min(), raw_data.date.max()
    asset_metadata = gen_asset_metadata(raw_data[["symbol", "date"]], show_progress)

    exchanges = pd.DataFrame(
        data=[["QUOTEMEDIA", "QUOTEMEDIA", "US"]],
        columns=["exchange", "canonical_name", "country_code"],
    )
    asset_db_writer.write(equities=asset_metadata, exchanges=exchanges)

    symbol_map = asset_metadata.symbol
    sessions = calendar.sessions_in_range(start_session, end_session)

    raw_data.set_index(["date", "symbol"], inplace=True)
    daily_bar_writer.write(
        parse_pricing_and_vol(raw_data, sessions, symbol_map),
        show_progress=show_progress,
    )

    raw_data.reset_index(inplace=True)
    raw_data["symbol"] = raw_data["symbol"].astype("category")
    raw_data["sid"] = raw_data.symbol.cat.codes
    adjustment_writer.write(
        splits=parse_splits(
            raw_data[["sid", "date", "split_ratio"]].loc[raw_data.split_ratio != 1],
            show_progress=show_progress,
        ),
        dividends=parse_dividends(
            raw_data[["sid", "date", "ex_dividend"]].loc[raw_data.ex_dividend != 0],
            show_progress=show_progress,
        ),
    )


def download_with_progress(url, chunk_size, **progress_kwargs):
    """
    Download streaming data from a URL, printing progress information to the
    terminal.
    Parameters
    ----------
    url : str
        A URL that can be understood by ``requests.get``.
    chunk_size : int
        Number of bytes to read at a time from requests.
    **progress_kwargs
        Forwarded to click.progressbar.
    Returns
    -------
    data : BytesIO
        A BytesIO containing the downloaded data.
    """
    resp = requests.get(url, stream=True)
    resp.raise_for_status()

    total_size = int(resp.headers["content-length"])
    data = BytesIO()
    with progressbar(length=total_size, **progress_kwargs) as pbar:
        for chunk in resp.iter_content(chunk_size=chunk_size):
            data.write(chunk)
            pbar.update(len(chunk))

    data.seek(0)
    return data

The format_metadata_url function constructs the URL for querying Nasdaq Data Link based on a provided API key and selects specific columns of data to retrieve, including ticker information, date, and price metrics.

The fetch_download_link function attempts to retrieve the actual data download link from Nasdaq Data Link. This link is dynamic and can change, so the function continually checks the status of the data until it is ready for download. If the data isn’t ready after a certain number of tries (defined by MAX_DOWNLOAD_TRIES), the function waits for a set interval before trying again.

load_data_table extracts and processes data from a downloaded ZIP file. It assumes the ZIP file contains a single CSV file, from which data is read into a Pandas DataFrame. The columns are renamed to be compatible with Zipline’s naming conventions.

In the fetch_data_table function, the data table is fetched by constructing the appropriate metadata URL and then downloading the data, leveraging the previously mentioned functions.

Subsequent functions like gen_asset_metadata, parse_splits, parse_dividends, and parse_pricing_and_vol provide parsing and transformation capabilities to process the raw data into a format suitable for Zipline. They generate asset metadata, handle stock split and dividend data, and parse pricing and volume data, respectively.

The core function, daily_us_equities_bundle, integrates all the functionalities to fetch and prepare the QuoteMedia End of Day US Stock Prices dataset for Zipline’s consumption. It checks for the required API key, fetches the raw data table, processes it, and writes the formatted data to disk. This function is the primary interface that a user or system might call to get Quandl data into Zipline’s bundle format.

Lastly, the download_with_progress functions facilitate the actual data download. The function provides a visual progress bar for tracking download progress. It returns the downloaded data as a BytesIO object, making it easier to subsequently process or store the data.