Amazon Buy Box Tracking with Keepa and Python
The Buy Box is a section on the Amazon product page where the customer sees the direct purchase button. Statistics show that over 80% of sales are made through the Buy Box, so it’s crucial for sellers on Amazon to be present in the Buy Box to ensure successful sales.
Amazon uses a complex algorithm to determine who gets the Buy Box, with price being an important factor, but also seller rating, reviews, and order processing speed playing significant roles.
For Amazon sellers, it is essential to track their presence in the Buy Box for their products and quickly react if another seller takes over the Buy Box.
Let’s develop a Buy Box tracker that will monitor which seller is in the Buy Box for a list of products and signal if another seller takes over the Buy Box.
We will use the Keepa.com service API to get product data. Keepa constantly scans all Amazon products and provides convenient access to this data through its API. We will use the official keepa
library and pandas
.
First, install the necessary libraries using the terminal:
pip install keepa pandas
And import the required libraries:
import os, keepa
import pandas as pd
from datetime import datetime, timedelta
Create a list of ASINs to track and a structure with “good” sellers data (ID and name) so we can distinguish when the seller changes to an unknown seller and back.
#Define list of asins to check
asins_list = ['B071JM699P','B074DZ6NJB','B073Q9PSWD']
#Define structure with sellers чтобы отличать наши seller_id от неизвестных
good_sellers = {
'ATVPDKIKX0DER': 'Amazon Seller',
'SellerId2': 'Amazon Seller2'
}
Connect to the Keepa API and make a request. I always include a free request to determine the number of Keepa tokens we have left.
KEEPA_API_KEY = '[Insert your Keepa API key here]'
#Define Keepa API and get tokens quantity
api = keepa.Keepa(KEEPA_API_KEY)
print('Tokens Left:', api.tokens_left)
Depending on the subscription level, you will have a different number of tokens and different token recovery speeds. In other words, tokens limit how quickly and how often you can fetch product data from Keepa. If you have up to 100 products and need data once an hour, the basic Keepa plan with 300 tokens should suffice.
Next, make a request to Keepa with the buybox=True
option.
#ASIN query with buybox
products = api.query(
asins_list,
progress_bar = False,
buybox = True
)
Iterate over the products and get the buy box history object.
for product in products:
asin = product['asin']
title = product['title']
# Just show info about product
print(f"{product['asin']} | {product['title']}")
#Show buybox history
buyboxhistory = product['buyBoxSellerIdHistory']
print(buyboxhistory)
It returns a structure like this:
['3711124', '-2', '3773244', 'ATVPDKIKX0DER', '3937220', '-1', '3939788', 'ATVPDKIKX0DER', '3980420', '-1', '3980832', 'ATVPDKIKX0DER', '4838214', '-1', '4842500', 'ATVPDKIKX0DER', '4852416', '-1', '4854432', 'ATVPDKIKX0DER']
The buy box history dict from Keepa is a compact list of strings, separated by pairs. In each pair, the first value is time (in a special Keepa time format), and the second is seller_id
, including special cases -1 and -2.
So, we define two functions: one to convert Keepa time to Unix time and another to divide strings from the dict into pairs.
# Transform prices data to list
def transformKeepaHistoryList(list):
return [(datetime.utcfromtimestamp(keepaTimeMinutesToUnixTime(keepaMinutes) / 1000), val) for
keepaMinutes, val in zip(list[::2], list[1::2])]
#Convert keepa time to unix time
def keepaTimeMinutesToUnixTime(keepaMinutes):
return (21564000 + int(keepaMinutes)) * 60000;
Using these functions, we can easily parse the Buy Box history dict and convert it to a pandas dataframe.
for product in products:
asin = product['asin']
title = product['title']
# Just show info about product
print(f"{product['asin']} | {product['title']}")
#Show buybox history
buyboxhistory = transformKeepaHistoryList(product['buyBoxSellerIdHistory'])
df_buyboxhistory = pd.DataFrame(buyboxhistory, columns=['date', 'seller'])
print(df_buyboxhistory)
We have a good-looking dataframe:
date seller
0 2018-01-21 04:04:00 -2
1 2018-03-05 07:24:00 ATVPDKIKX0DER
2 2018-06-27 04:20:00 -1
3 2018-06-28 23:08:00 ATVPDKIKX0DER
4 2018-07-27 04:20:00 -1
.. ... ...
319 2023-10-06 21:08:00 ATVPDKIKX0DER
320 2023-10-16 23:56:00 -1
321 2023-10-17 00:48:00 ATVPDKIKX0DER
322 2023-10-17 13:36:00 -1
323 2023-10-17 15:00:00 ATVPDKIKX0DER
Now we can take the last date from the dataframe, check the date, and print a message depending on which seller has taken the Buy Box.
messages = []
for product in products:
asin = product['asin']
title = product['title']
# Just show info about product
print(f"{product['asin']} | {product['title']}")
#Show buybox history
buyboxhistory = transformKeepaHistoryList(product['buyBoxSellerIdHistory'])
df_buyboxhistory = pd.DataFrame(buyboxhistory, columns=['date', 'seller'])
if not df_buyboxhistory.empty:
last_row = df_buyboxhistory.iloc[-1]
last_date = last_row['date']
if last_date > last_processing_time:
seller_id = last_row['seller']
if seller_id == -1:
message = f'{asin} | no seller in buy box on {last_date}'
elif seller_id == 2:
message = f'{asin} | asin out of stock on {last_date}'
else:
if seller_id in good_sellers:
seller_name = good_sellers[seller_id]
else:
seller_name = f'UNKNOWN SELLER ({seller_id})'
message = f'{asin} | buybox seller changed to {seller_name} on {last_date}'
messages.append(message)
#Result messages
print(messages)
What have we done here? First, we added a messages
array to store the final messages. Then we process the seller_id
and if it's -1
, we output a message that there is no seller for the Buy Box. If it's -2
, then the ASIN is out of stock. Next, we check if the seller_id
is in the list of known "good" sellers and, depending on this, save a message about who took the Buy Box.
What is last_processing_time
? It is the time of the last run, from which period we want to track changes. The simplest way is to take the last day:
last_processing_time = datetime.now() - timedelta(days=1)
But since we want to make a tracker that can be run regularly with different frequencies, it should remember the last check time and on the next run check after this date.
To do this, insert code at the beginning of the script that will read the last run time from a file (or by default check for the last day).
#get current time
current_time = datetime.now()
#load last processing time
last_time_filepath = 'buybox_last_time_processing.txt'
if os.path.isfile(last_time_filepath):
with open(last_time_filepath, 'r') as file:
date_str = file.read()
last_processing_time = datetime.fromisoformat(date_str)
else:
last_processing_time = datetime.now() - timedelta(days=1)
And at the end of the script, update the current run date in the same file.
# Save last date of execution to file
print('Update Last Processing Time...')
with open(last_time_filepath, 'w') as file:
file.write(current_time.isoformat())
print(f'-- Set new time={current_time}')
Assemble all the code together:
import os, keepa
import pandas as pd
from datetime import datetime, timedelta
#Define list of asins to check
asins_list = ['B071JM699P','B074DZ6NJB','B073Q9PSWD']
#Define structure with sellers чтобы отличать наши seller_id от неизвестных
good_sellers = {
'ATVPDKIKX0DER': 'Amazon Seller',
'SellerId2': 'Amazon Seller2'
}
KEEPA_API_KEY = '[Insert your Keepa API key here]'
#Define Keepa API and get tokens quantity
api = keepa.Keepa(KEEPA_API_KEY)
print('Tokens Left:', api.tokens_left)
#Define functions
# Transform prices data to list
def transformKeepaHistoryList(list):
return [(datetime.utcfromtimestamp(keepaTimeMinutesToUnixTime(keepaMinutes) / 1000), val) for
keepaMinutes, val in zip(list[::2], list[1::2])]
#Convert keepa time to unix time
def keepaTimeMinutesToUnixTime(keepaMinutes):
return (21564000 + int(keepaMinutes)) * 60000;
#get current time
current_time = datetime.now()
#load last processing time
last_time_filepath = 'buybox_last_time_processing.txt'
if os.path.isfile(last_time_filepath):
with open(last_time_filepath, 'r') as file:
date_str = file.read()
last_processing_time = datetime.fromisoformat(date_str)
else:
last_processing_time = datetime.now() - timedelta(days=1)
messages = []
#ASIN query with buybox
products = api.query(
asins_list,
progress_bar = False,
buybox = True
)
for product in products:
asin = product['asin']
title = product['title']
# Just show info about product
print(f"{product['asin']} | {product['title']}")
#Show buybox history
buyboxhistory = transformKeepaHistoryList(product['buyBoxSellerIdHistory'])
df_buyboxhistory = pd.DataFrame(buyboxhistory, columns=['date', 'seller'])
if not df_buyboxhistory.empty:
last_row = df_buyboxhistory.iloc[-1]
last_date = last_row['date']
if last_date > last_processing_time:
seller_id = last_row['seller']
if seller_id == -1:
message = f'{asin} | no seller in buy box on {last_date}'
elif seller_id == 2:
message = f'{asin} | asin out of stock on {last_date}'
else:
if seller_id in good_sellers:
seller_name = good_sellers[seller_id]
else:
seller_name = f'UNKNOWN SELLER ({seller_id})'
message = f'{asin} | buybox seller changed to {seller_name} on {last_date}'
messages.append(message)
#Result messages
print(messages)
# Save last date of execution to file
print('Update Last Processing Time...')
with open(last_time_filepath, 'w') as file:
file.write(current_time.isoformat())
print(f'-- Set new time={current_time}')
Great, I would also add sending messages to Telegram using a Telegram bot. I often use Telegram to send status notifications of my scripts to groups (either my own or for my clients).
Install the official library:
pip install python-telegram-bot
And add the following code:
from telegram import Bot
import asyncio
TELEGRAM_BOT_TOKEN = '[Insert Telegtam Bot token here]'
CHAT_ID = '[Insert Chat id here]'
print(f'//// Send Messages to TELEGRAM...')
bot = Bot(token=TELEGRAM_BOT_TOKEN)
async def send_message(text, chat_id):
async with bot:
await bot.send_message(text=text, chat_id=CHAT_ID)
async def run_bot(messages):
text = '\n'.join(messages)
await send_message(text, CHAT_ID)
if messages:
asyncio.run(run_bot(messages))
Now we can set up a cron job on the server to run the script regularly (or, even better, create a DAG in Airflow).
A few more additions:
If we have many products, it is better to divide them into smaller chunks for processing each time:
step = 10
for i in range(0, len(asins), step):
asins_chunk = asins[i:i + step]
print(f'Asins selected to process {len(asins_chunk)}')
#Keepa Query
products = api.query(
asins_chunk,
progress_bar = False,
buybox = True
)
for product in products:
...
If we have a very large number of products (hundreds or thousands), retrieving data from Keepa can become costly. With the buybox=True
option, each request consumes 5 tokens instead of 1 token per request without this option.
products = api.query(
asins_list,
progress_bar = False,
buybox = True
)
There is an alternative way to get data without buybox=True
using the parameter COUNT_NEW: New offer count history
. This parameter tracks the number of marketplace merchants selling the product as new.
Monitoring changes in the number of sellers for a product can help detect new sellers. This method does not guarantee exact Buy Box tracking (as the Buy Box can be awarded to a different seller without changing the list), but it is often necessary to react to the appearance of a new seller.
products = api.query(
asins_list,
progress_bar = False,
)
for product in products:
...
offers = transformKeepaHistoryList(product['csv'][11])
df_offers = pd.DataFrame(offers, columns=['date', 'offer_qty'])
if not df_offers.empty:
last_row = df_offers.iloc[-1]
last_date = last_row['date']
if last_date > last_processing_time:
new_offer_qty = last_row['offer_qty']
if len(df_offers) >= 2:
previous_row = df_offers.iloc[-2]
old_offer_qty = previous_row['offer_qty']
grow = 'increased' if new_offer_qty>old_offer_qty else 'decreased'
message = f'{brandname} | https://www.amazon.com/dp/{asin} | offers qty {grow} from {old_offer_qty} to {new_offer_qty} on {last_date}'
else:
message = f'{brandname} | https://www.amazon.com/dp/{asin} | offers qty changed to {new_offer_qty} on {last_date}'
messages.append(message)
This approach ensures that your Buy Box tracker is robust and can handle varying product loads efficiently, with the ability to notify you of any significant changes promptly.
About Author
The author of this article is the Founder and CTO of SellerFlux. We are Empowering E-commerce with Data-Driven Solutions and specialize in providing software solutions specifically designed for e-commerce businesses. If you need expert help in building, optimizing, or automating your e-commerce analytics and data processes, don’t hesitate to Сontact us.
Useful Links:
1. Keepa API package: https://github.com/akaszynski/keepa
2. Keepa products object format: https://keepa.com/#!discuss/t/product-object/116