How to ingest premium market data with Zipline Reloaded
How to ingest premium market data with Zipline Reloaded
This article explains how to build the two Python scripts you need to use premium data to create a custom data bundle using Zipline Reloaded.
Step 1: Subscribe to premium data
By now you should already have an account with Nasdaq Data Link. If not, head over to https://data.nasdaq.com and set one up.
You’re looking for QuoteMedia End of Day US Stock Prices. This product offers end-of-day prices, dividends, adjustments and splits for US publicly traded stocks with history to 1996. Prices are provided both adjusted and unadjusted. The product covers all stocks with primary listing on NASDAQ, AMEX, NYSE, and ARCA.
You can find the page to subscribe here: https://data.nasdaq.com/databases/EOD/data
Once subscribed, you’ll be able to use it through your API key.
Step 2: Create/Edit extension.py
Now we’ll create the two files we need to create the bundle.
For Windows users
In the .zipline directory, you will store the extension.py file, which informs Zipline about the custom data bundle.
- Open the File Explorer and navigate to your home directory. You should find the .zipline folder there. If you're not sure where your home directory is, it's usually C:\Users\[YourUsername].
- Open the .zipline folder.
- Right-click within the folder, select New, then choose Text Document. Rename the newly created file to extension.py. Make sure you change the file extension from .txt to .py.
Note: If you can't see file extensions in your File Explorer, you'll need to enable them. To do this, click on the View tab in File Explorer, and then check the box for File name extensions.
For Mac/Linux/Unix users
- Open Terminal: You can do this by searching for "Terminal" using Spotlight (Cmd + Space) on Mac or by accessing it from the Applications folder.
- Navigate to .zipline Directory: By default, the terminal opens in your home directory. To ensure you're in the home directory and then navigate to the .zipline directory, you can use the following commands:
cd ~ cd .zipline
- Create/Edit the extension.py File:
- If the file doesn't exist: You can create it using the touch command followed by opening it with a text editor of your choice.
touch extension.py
- If the file already exists: Simply open it with a text editor.
- If the file doesn't exist: You can create it using the touch command followed by opening it with a text editor of your choice.
For all users
Within the editor, you can now proceed to input or edit the necessary content. In the file, add the following content:
1import sys
2from pathlib import Path
3sys.path.append(Path("~", ".zipline").expanduser().as_posix())
4
5from zipline.data.bundles import register
6
7from daily_us_equities import daily_us_equities_bundle
8
9register("quotemedia", daily_us_equities_bundle, calendar_name="XNYS")
Save and close the file
Step 3: Create the code to build the bundle
Use the instructions above to create a file called daily_us_equities.py.
In the file, add the following code exactly as is (do not alter!):
1import time
2from io import BytesIO
3from zipfile import ZipFile
4
5import numpy as np
6import pandas as pd
7import requests
8from click import progressbar
9from logbook import Logger
10from six import iteritems
11from six.moves.urllib.parse import urlencode
12
13log = Logger(__name__)
14
15DATA_START_DATE = "2000-01-01"
16ONE_MEGABYTE = 1024 * 1024
17DATALINK_DATA_URL = "https://data.nasdaq.com/api/v3/datatables/QUOTEMEDIA/PRICES"
18MAX_DOWNLOAD_TRIES = 5
19
20
21def format_metadata_url(api_key):
22 """Build the query URL for Quandl WIKI Prices metadata."""
23 columns = ",".join(
24 [
25 "ticker",
26 "date",
27 "open",
28 "high",
29 "low",
30 "close",
31 "volume",
32 "dividend",
33 "split",
34 ]
35 )
36
37 query_params = [
38 ("date.gte", DATA_START_DATE),
39 ("api_key", api_key),
40 ("qopts.export", "true"),
41 ("qopts.columns", columns),
42 ]
43 return f"{DATALINK_DATA_URL}?{urlencode(query_params)}"
44
45
46def fetch_download_link(table_url, max_download_tries=MAX_DOWNLOAD_TRIES):
47 log.info(f"Attempting to fetch download link with ...")
48
49 status = None
50 cnt = 0
51
52 while status != "fresh" and cnt < max_download_tries:
53 log.info(f"Fetching download link...")
54 try:
55 resp = requests.get(table_url)
56 resp.raise_for_status()
57 except:
58 log.info("Failed to get download link from Quandl")
59
60 payload = resp.json()
61
62 status = payload["datatable_bulk_download"]["file"]["status"]
63
64 if status == "fresh":
65 link = payload["datatable_bulk_download"]["file"]["link"]
66 log.info(f"Status is {status}. Returning download link: {link}")
67 return link
68
69 log.info(f"Status is {status}. Retrying in 10 seconds...")
70
71 time.sleep(10)
72
73
74def load_data_table(file, index_col=None):
75 """Load data table from zip file provided by Quandl."""
76 with ZipFile(file) as zip_file:
77 file_names = zip_file.namelist()
78 assert len(file_names) == 1, "Expected a single file from Quandl."
79 eod_prices = file_names.pop()
80 with zip_file.open(eod_prices) as table_file:
81 log.info("Parsing raw data.")
82 data_table = pd.read_csv(
83 table_file,
84 header=0,
85 names=[
86 "ticker",
87 "date",
88 "open",
89 "high",
90 "low",
91 "close",
92 "volume",
93 "dividend",
94 "split",
95 ],
96 parse_dates=["date"],
97 index_col=index_col,
98 usecols=[
99 "ticker",
100 "date",
101 "open",
102 "high",
103 "low",
104 "close",
105 "volume",
106 "dividend",
107 "split",
108 ],
109 ).rename(
110 columns={
111 "ticker": "symbol",
112 "dividend": "ex_dividend",
113 "split": "split_ratio",
114 }
115 )
116
117 return data_table
118
119
120def fetch_data_table(api_key):
121 """Fetch WIKI Prices data table from Quandl"""
122 log.info(f"Fetching data table...")
123
124 table_url = format_metadata_url(api_key)
125 download_link = fetch_download_link(table_url)
126 raw_file = download_with_progress(download_link, chunk_size=ONE_MEGABYTE)
127
128 return load_data_table(file=raw_file)
129
130
131def gen_asset_metadata(data, show_progress):
132 if show_progress:
133 log.info("Generating asset metadata.")
134
135 data = data.groupby(by="symbol").agg({"date": ["min", "max"]})
136 data.reset_index(inplace=True)
137 data["start_date"] = data.date.min(axis=1)
138 data["end_date"] = data.date.max(axis=1)
139 del data["date"]
140 data.columns = data.columns.get_level_values(0)
141
142 data["exchange"] = "QUOTEMEDIA"
143 data["auto_close_date"] = data["end_date"].values + pd.Timedelta(days=1)
144 return data
145
146
147def parse_splits(data, show_progress):
148 if show_progress:
149 log.info("Parsing split data.")
150
151 data["split_ratio"] = 1.0 / data.split_ratio
152 data.rename(
153 columns={"split_ratio": "ratio", "date": "effective_date"},
154 inplace=True,
155 copy=False,
156 )
157 return data
158
159
160def parse_dividends(data, show_progress):
161 if show_progress:
162 log.info("Parsing dividend data.")
163
164 data["record_date"] = data["declared_date"] = data["pay_date"] = pd.NaT
165 data.rename(
166 columns={"ex_dividend": "amount", "date": "ex_date"}, inplace=True, copy=False
167 )
168 return data
169
170
171def parse_pricing_and_vol(data, sessions, symbol_map):
172 for asset_id, symbol in iteritems(symbol_map):
173 asset_data = (
174 data.xs(symbol, level=1).reindex(sessions.tz_localize(None)).fillna(0.0)
175 )
176 yield asset_id, asset_data
177
178
179def daily_us_equities_bundle(
180 environ,
181 asset_db_writer,
182 minute_bar_writer,
183 daily_bar_writer,
184 adjustment_writer,
185 calendar,
186 start_session,
187 end_session,
188 cache,
189 show_progress,
190 output_dir,
191):
192 """
193 daily_us_equities_bundle builds a daily dataset using Quotemedia
194 end of day equities data. For more information on the Quotemedia
195 data see here: https://data.nasdaq.com/databases/EOD
196 """
197 api_key = environ.get("DATALINK_API_KEY")
198 if api_key is None:
199 raise ValueError(
200 "Please set your DATALINK_API_KEY environment variable and retry."
201 )
202
203 raw_data = fetch_data_table(api_key)
204
205 start_session, end_session = raw_data.date.min(), raw_data.date.max()
206 asset_metadata = gen_asset_metadata(raw_data[["symbol", "date"]], show_progress)
207
208 exchanges = pd.DataFrame(
209 data=[["QUOTEMEDIA", "QUOTEMEDIA", "US"]],
210 columns=["exchange", "canonical_name", "country_code"],
211 )
212 asset_db_writer.write(equities=asset_metadata, exchanges=exchanges)
213
214 symbol_map = asset_metadata.symbol
215 sessions = calendar.sessions_in_range(start_session, end_session)
216
217 raw_data.set_index(["date", "symbol"], inplace=True)
218 daily_bar_writer.write(
219 parse_pricing_and_vol(raw_data, sessions, symbol_map),
220 show_progress=show_progress,
221 )
222
223 raw_data.reset_index(inplace=True)
224 raw_data["symbol"] = raw_data["symbol"].astype("category")
225 raw_data["sid"] = raw_data.symbol.cat.codes
226 adjustment_writer.write(
227 splits=parse_splits(
228 raw_data[["sid", "date", "split_ratio"]].loc[raw_data.split_ratio != 1],
229 show_progress=show_progress,
230 ),
231 dividends=parse_dividends(
232 raw_data[["sid", "date", "ex_dividend"]].loc[raw_data.ex_dividend != 0],
233 show_progress=show_progress,
234 ),
235 )
236
237
238def download_with_progress(url, chunk_size, **progress_kwargs):
239 """
240 Download streaming data from a URL, printing progress information to the
241 terminal.
242 Parameters
243 ----------
244 url : str
245 A URL that can be understood by ``requests.get``.
246 chunk_size : int
247 Number of bytes to read at a time from requests.
248 **progress_kwargs
249 Forwarded to click.progressbar.
250 Returns
251 -------
252 data : BytesIO
253 A BytesIO containing the downloaded data.
254 """
255 resp = requests.get(url, stream=True)
256 resp.raise_for_status()
257
258 total_size = int(resp.headers["content-length"])
259 data = BytesIO()
260 with progressbar(length=total_size, **progress_kwargs) as pbar:
261 for chunk in resp.iter_content(chunk_size=chunk_size):
262 data.write(chunk)
263 pbar.update(len(chunk))
264
265 data.seek(0)
266 return data
The format_metadata_url function constructs the URL for querying Nasdaq Data Link based on a provided API key and selects specific columns of data to retrieve, including ticker information, date, and price metrics.
The fetch_download_link function attempts to retrieve the actual data download link from Nasdaq Data Link. This link is dynamic and can change, so the function continually checks the status of the data until it is ready for download. If the data isn't ready after a certain number of tries (defined by MAX_DOWNLOAD_TRIES), the function waits for a set interval before trying again.
load_data_table extracts and processes data from a downloaded ZIP file. It assumes the ZIP file contains a single CSV file, from which data is read into a Pandas DataFrame. The columns are renamed to be compatible with Zipline's naming conventions.
In the fetch_data_table function, the data table is fetched by constructing the appropriate metadata URL and then downloading the data, leveraging the previously mentioned functions.
Subsequent functions like gen_asset_metadata, parse_splits, parse_dividends, and parse_pricing_and_vol provide parsing and transformation capabilities to process the raw data into a format suitable for Zipline. They generate asset metadata, handle stock split and dividend data, and parse pricing and volume data, respectively.
The core function, daily_us_equities_bundle, integrates all the functionalities to fetch and prepare the QuoteMedia End of Day US Stock Prices dataset for Zipline's consumption. It checks for the required API key, fetches the raw data table, processes it, and writes the formatted data to disk. This function is the primary interface that a user or system might call to get Quandl data into Zipline's bundle format.
Lastly, the download_with_progress functions facilitate the actual data download. The function provides a visual progress bar for tracking download progress. It returns the downloaded data as a BytesIO object, making it easier to subsequently process or store the data.