Amazon SP-API: Get Inventory and Prices with Python
The Amazon Seller Partner API [1] is the official way to retrieve data from Amazon Seller accounts, offering numerous methods and official documentation. For building analytics on products and sales from an Amazon Seller account, one of the most basic and initial actions is to retrieve Inventory — a list of products and their quantities in stock, along with current prices. Let’s try to implement this.
To get the product list and quantities in stock (inventory), we need to use the Amazon FBA Inventory API [2]. To get current prices, we need to use the Amazon Product Pricing API Amazon Product Pricing API [3].
How will we retrieve the data?
There is a decent Python library called python-amazon-sp-api
, but I prefer working directly with the API using the requests
library. Why? The code might be a bit longer, but the Seller Central API often changes, and the library doesn’t get updated as frequently as one might wish. If you have a serious project, it’s better to use direct requests.
How to connect to the SP API?
I won’t go into detail about this in the article, but you need an Amazon Seller Central account and an application for a developer account [4]. You need to fill out the form and wait for support to respond. Usually, if you need a developer account to access only your seller account data, this happens quickly.
After the application is validated, you will receive a SECRET_KEY
and be able to create an application and get its APP ID
. Next, the application needs to be linked to your client. After linking, you will receive a REFRESH_TOKEN
. Using these three keys, you can make a request to obtain an Access Token
, which should be used in all subsequent requests and is valid for only one hour. If you work with multiple Amazon Sellers, you will have a unique REFRESH_TOKEN
for each seller, while the APP ID
and Client Secret
remain the same.
Authorization in SP API and Obtaining an Access Token
How do you get an ACCESS TOKEN
? To do this, you need to make a request to https://api.amazon.com/auth/o2/token
. If all your data is correct, you will receive a token.
LWA_APP_ID = '...'
LWA_CLIENT_SECRET = '...'
REFRESH_TOKEN = '...'
response = requests.post(
"https://api.amazon.com/auth/o2/token",
data = {
"grant_type": "refresh_token",
"refresh_token": REFRESH_TOKEN,
"client_id": LWA_APP_ID,
"client_secret": LWA_CLIENT_SECRET
}
)
if(response.status_code!=200):
print(f'Something wrong..')
data = json.loads(response.content)
print(data)
else:
access_token = response.json()['access_token']
print('access_token=', access_token)
To avoid requesting an access_token
with every request, let’s create a universal function that will cache it in a file and load it from there. If the file’s date is older than 50 minutes, it will load a new token. We’ll cache it in the data
folder, which you should create before running the script.
LWA_APP_ID = '...'
LWA_CLIENT_SECRET = '...'
REFRESH_TOKEN = '...'
def get_lwa_access_token():
#Define path to file with cached token
access_token_path = os.path.join(os.getcwd(), 'data', 'access_token')
#Check cached file existing and time of modifying
if os.path.isfile(access_token_path):
modify_time = datetime.fromtimestamp(os.path.getctime(access_token_path))
if modify_time >= (datetime.now() - timedelta(minutes=50)):
with open(access_token_path, "r") as f:
access_token = f.read()
print(f'Load access_token from file...')
return access_token
#If token not valid -- load a new
print('Loading access_token from API...')
response = requests.post(
"https://api.amazon.com/auth/o2/token",
data={
"grant_type": "refresh_token",
"refresh_token": REFRESH_TOKEN,
"client_id": LWA_APP_ID,
"client_secret": LWA_CLIENT_SECRET
}
)
if response.status_code != 200:
print('Unable to load access_token.')
return False
#Save new token to file
access_token = response.json()['access_token']
with open(access_token_path, "w") as f:
f.write(access_token)
print('Access_token loaded and saved to file')
return access_token
Let’s check the function:
access_token = get_lwa_access_token()
print('access_token=', access_token)
Running our code, on the first execution, the function will request a code and save the token to a file.
Loading access_token from API...
Access_token loaded and saved to file
Atza|IwEBILqHYhAPme....
Upon subsequent executions, within 50 minutes, it will request the token from the file.
Load access_token from file...
Atza|IwEBILqHYhAPme…
Great, now that we have authorization, let’s move on.
Retrieving Inventory Data
After we’ve implemented authorization, we can retrieve raw Inventory data from the SP-API. To get the Inventory, we need to use the FBA Inventory API v1 [2].
Let’s import the necessary libraries and define the Endpoint
and Marketplace_id
. Each Amazon marketplace has its unique identifier [5]. For example, for the US, it’s ATVPDKIKX0DER
. Endpoints also differ by region (different URLs for North America, Europe, and the East) [6].
import os, time
import requests, urllib.parse, json
#Define endpoint and marketplace ID
ENDPOINT = "https://sellingpartnerapi-na.amazon.com"
MARKETPLACE_ID = 'ATVPDKIKX0DER' #US
Next, we’ll set the parameters for the request. We want to request all products within the specified marketplace with all detailed data and request an access_token
using our function above.
#Set Request params
request_params = {
"granularityType": "Marketplace",
"granularityId": MARKETPLACE_ID,
"marketplaceIds": MARKETPLACE_ID,
"details": "true"
}
To make a request to Amazon SP API, we need to perform a request to /fba/inventory/v1/summaries
, pass the parameters as a GET string, and our access_token
as a parameter in the header. The requests
library is perfect for this. The data will be in JSON format, and to convert it from JSON to a Python structure, we’ll use the json
library.
#Get access_token
access_token = get_lwa_access_token()
#Call request
response = requests.get(
ENDPOINT
+ "/fba/inventory/v1/summaries" + "?"
+ urllib.parse.urlencode(request_params),
headers = {
"x-amz-access-token": access_token
}
)
print(response)
print(f'Response code={response.status_code}')
if(response.status_code==200):
data = json.loads(response.content)
print(data)
When we execute the code, we will get a JSON response from the API.
{
"pagination": {"nextToken": "ZXlKdWRXMWlaWE..."},
"payload": {
"granularity": {"granularityType": "Marketplace", "granularityId": "ATVPDKIKX0DER"},
"inventorySummaries": [
{
"asin": "B035B6E5A3",
"fnSku": "B0F5CEED8D",
"sellerSku": "C1-76735-97AA",
"condition": "NewItem",
"inventoryDetails": {
"fulfillableQuantity": 946,
"inboundWorkingQuantity": 0,
"inboundShippedQuantity": 0,
"inboundReceivingQuantity": 1,
"reservedQuantity": {
"totalReservedQuantity": 456,
"pendingCustomerOrderQuantity": 255,
"pendingTransshipmentQuantity": 180,
"fcProcessingQuantity": 21
},
"researchingQuantity": {
"totalResearchingQuantity": 4,
"researchingQuantityBreakdown": [
{"name": "researchingQuantityInShortTerm", "quantity": 4},
{"name": "researchingQuantityInMidTerm", "quantity": 0},
{"name": "researchingQuantityInLongTerm", "quantity": 0}
]
},
"unfulfillableQuantity": {
"totalUnfulfillableQuantity": 3,
"customerDamagedQuantity": 0,
"warehouseDamagedQuantity": 3,
"distributorDamagedQuantity": 0,
"carrierDamagedQuantity": 0,
"defectiveQuantity": 0,
"expiredQuantity": 0
},
"futureSupplyQuantity": {
"reservedFutureSupplyQuantity": 0,
"futureSupplyBuyableQuantity": 70
}
},
"lastUpdatedTime": "2024-08-03T16:24:49Z",
"productName": "Avila-Carlson Blender for Adults & Kids, 2 in 1",
"totalQuantity": 1406
},
...
]
}
}
It looks good. The product data list will be in data['payload']['inventorySummaries']
, but the problem is that one request returns only 50 products. If we want to request the next 50, we need to make a similar request but specify data['pagination']['nextToken']
from the data and perform requests one after another.
Next, I’ll add saving the original data to a JSON file. I always separate the data retrieval process and the data processing process as much as possible. First, we get the data and save everything in raw form as is; then we’ll write a script that processes our raw and saved data into the format we need. These can be different functions, different files, or even different DAGs in Airflow, but the more these processes are separated, the more nerves it will save you in the long run. Also, get used to saving everything; often, you can find a lot of interesting things in historical data and use it for analytics.
Here’s the entire script for retrieving raw data from Inventory and saving it to a JSON file. I moved the get_lwa_access_token
function to the sp_api_functions.py
file and can load it in the file through import
.
import os, time
import requests, urllib.parse, json
from sp_api_functions import get_lwa_access_token
print('//// LOAD INVENTORY //// \n')
#Define endpoint and marketplace ID
ENDPOINT = "https://sellingpartnerapi-na.amazon.com"
MARKETPLACE_ID = 'ATVPDKIKX0DER' #US
#Set Request params
request_params = {
"granularityType": "Marketplace",
"granularityId": MARKETPLACE_ID,
"marketplaceIds": MARKETPLACE_ID,
"details": "true"
}
#Define structure to collect inventory data
INVENTORY = []
i = 0
nextToken = None
while True:
print(f'/// CALL {i}')
#Add nextToken to request
_request_params = request_params.copy()
if nextToken:
_request_params['nextToken'] = nextToken
#Get access token
access_token = get_lwa_access_token()
#Call FBA Inventory API
response = requests.get(
ENDPOINT
+ "/fba/inventory/v1/summaries"
+ "?"
+ urllib.parse.urlencode(_request_params),
headers = {
"x-amz-access-token": access_token
}
)
print(f'Response code={response.status_code}')
if(response.status_code!=200):
print(f'-- Wrong response code -- exit!')
# Show error
data = json.loads(response.content)
print(data)
break
else:
data = json.loads(response.content)
loaded_qty = len(data['payload']['inventorySummaries'])
print(f'-- Loaded {loaded_qty} records')
#Process every inventory item and add to structure
for item in data['payload']['inventorySummaries']:
INVENTORY.append(item)
if 'pagination' in data:
nextToken = data['pagination']['nextToken']
print(f'-- nextToken found! {nextToken}')
else:
print(f'-- nextToken not found - exit!')
break
i += 1
time.sleep(1)
print(f'\n/// SAVING {len(INVENTORY)} inventory items to json file...')
json_file_path = os.path.join(os.getcwd(), 'data', 'inventory.json')
with open(json_file_path, "w") as f:
json.dump(INVENTORY, f, ensure_ascii=False, indent=2)
Let’s run the script:
//// LOAD INVENTORY ////
/// CALL 0
Load access_token from file...
Response code=200
-- Loaded 50 records
-- nextToken found! ZXlKdWRXMWlaWEpQWm....
/// CALL 1
Load access_token from file...
Response code=200
-- Loaded 50 records
-- nextToken found! YTEwZWNhYjgtNjk3Mi0...
/// CALL 2
Load access_token from file...
<Response [200]>
Response code=200
-- Loaded 33 records
-- nextToken not found - exit!
/// SAVING 133 inventory items to json file...
Great, our script made three requests and saved 133 inventory items to a JSON file.
Retrieving Price Data
Now let’s retrieve prices. For this, we’ll use the Amazon Product Pricing API [3].
Unlike the Inventory request, which returns all products, to request current prices, we need to specify exactly which ASIN
(Amazon ID of the product) we want to request prices for. Therefore, we first need to get all the asin
of the previously loaded products from inventory.json
.
Then, using the requests
library, we’ll make a request to the Amazon API /products/pricing/v0/price
and load the prices. You can pass up to 20 asin
in one request. Otherwise, the process is similar to retrieving Inventory data. All received raw data will be saved in prices.json
.
import os, time
import requests, urllib.parse, json
from sp_api_functions import get_lwa_access_token
print('//// LOAD PRICES //// \n')
#Define endpoint and marketplace ID
ENDPOINT = "https://sellingpartnerapi-na.amazon.com"
MARKETPLACE_ID = 'ATVPDKIKX0DER' #US
print('// Load asin list from Inventory')
asin_list = []
file_path = os.path.join(os.getcwd(), 'data', 'inventory.json')
with open(file_path, "r") as f:
data = json.load(f)
for item in data:
asin = item["asin"]
asin_list.append(asin)
print(f'// {len(asin_list)} asins loaded...\n')
#Get PRICES
#Define params
request_params = {
"MarketplaceId": MARKETPLACE_ID,
"ItemType": "Asin"
}
#Define structure to collect prices data
PRICES = []
#function to separate asin list to chunks with 20 asins
def chunk_list(data, chunk_size=20):
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
for chunk in chunk_list(asin_list):
print(f'/// CALL asins={chunk}')
_request_params = request_params.copy()
_request_params['Asins'] = ','.join(chunk)
#Get access token
access_token = get_lwa_access_token()
#Call SP API to get prices
response = requests.get(
ENDPOINT
+ "/products/pricing/v0/price"
+ "?"
+ urllib.parse.urlencode(_request_params),
headers = {
"x-amz-access-token": access_token,
"Content-Type": "application/json"
}
)
print(f'Response code={response.status_code}')
if(response.status_code!=200):
print(f'-- Wrong response code -- exit!')
data = json.loads(response.content)
print(data)
break
else:
data = json.loads(response.content)
#print(data)
loaded_qty = len(data['payload'])
print(f'-- Loaded {loaded_qty} records')
for item in data['payload']:
PRICES.append(item)
time.sleep(2)
print(f'\nSAVING prices, {len(PRICES)} items saved...')
json_file_path = os.path.join(os.getcwd(), 'data', 'prices.json')
with open(json_file_path, "w") as f:
json.dump(PRICES, f, ensure_ascii=False, indent=2)
When we run the script, we will see how the script requests 20 asins at a time and saves all the received price data to a file.
//// LOAD PRICES ////
// Load asin list from Inventory
// 133 asins loaded...
/// CALL asins=['B0F5CEED8D', 'B04EAB5272', ...]
Load access_token from file...
Response code=200
-- Loaded 17 records
/// CALL asins=['B0C7BD0A09', 'B035B6E5A3', ...]
Load access_token from file...
Response code=200
-- Loaded 18 records
....
SAVING prices, 114 saved...
Processing the Retrieved Data in a Pandas DataFrame
Now let’s create a script that will load raw data from inventory.json
and prices.json
, merge them by product, and save the necessary values in a Pandas DataFrame, which can be saved to CSV, opened in Excel or Google Spreadsheet, sent to a database, combined with other data, and so on.
With properly processed data in the form of a DataFrame, you can work comfortably!
import os, json
import pandas as pd
print('/// LOAD PRICES\n')
PRICES = {}
#Read and cache Prices
file_path = os.path.join(os.getcwd(), 'data', 'prices.json')
if not os.path.isfile(file_path):
print('-- prices file not found!\n')
else:
with open(file_path, "r") as f:
data = json.load(f)
for item in data:
asin = item['ASIN']
price = item.get('Product', {}).get('Offers', [{}])[0].get('RegularPrice', {}).get('Amount', 0)
PRICES[asin] = price
print(f'-- Load {len(PRICES)} prices records\n')
print
#Read and process Inventory
inv_data = []
file_path = os.path.join(os.getcwd(), 'data', f'inventory.json')
if not os.path.isfile(file_path):
print('-- inventory file not found!\n')
else:
with open(file_path, "r") as f:
data = json.load(f)
for item in data:
asin = item["asin"]
sellerSku = item["sellerSku"]
productName = item['productName']
condition = item['condition']
lastUpdatedTime = item['lastUpdatedTime']
fulfillableQuantity = item['inventoryDetails']['fulfillableQuantity']
reservedQuantity = item['inventoryDetails']['reservedQuantity']['totalReservedQuantity']
unfulfillableQuantity = item['inventoryDetails']['unfulfillableQuantity']['totalUnfulfillableQuantity']
totalQuantity = item['totalQuantity']
price = PRICES[asin] if asin in PRICES else 0
inv_data.append(dict(
asin = asin,
sellerSku = sellerSku,
productName = productName,
condition = condition,
fulfillableQuantity = fulfillableQuantity,
reservedQuantity = reservedQuantity,
unfulfillableQuantity = unfulfillableQuantity,
totalQuantity = totalQuantity,
price = price
))
print('/// Creating frame...')
frame = pd.DataFrame(inv_data)
print(frame)
print('/// Saving frame...')
processed_file_path = os.path.join(os.getcwd(), 'data', 'processed_inventory.csv')
frame.to_csv(processed_file_path, index=False)
print('-- saved!')
Let’s execute:
/// LOAD PRICES
-- Load 97 prices records
/// Creating frame...
asin sellerSku ... totalQuantity price
0 B0F5CEED8D C1-76735-97AA ... 1405 21.63
1 B04EAB5272 CF-19223-CCDF ... 0 0.00
2 B0C7BD0A09 A7-A5A8A-9FAF ... 0 0.00
3 B035B6E5A3 F7-68F04-7A58 ... 0 0.00
4 B051E0BB8A ED-030B1-D115 ... 0 0.00
.. ... ... ... ... ...
128 B0010DF4C5 D1-3269D-E8E4 ... 222 28.99
129 B0FC065705 22-0AC0A-4E8C ... 3 21.63
130 B0F9E739E8 D6-B4747-8278 ... 0 0.00
131 B09895C104 22-DE626-B99A ... 0 0.00
132 B0A94E648B F7-AB433-5465 ... 354 37.23
[133 rows x 9 columns]
/// Saving frame...
-- saved
The saved file processed_inventory.csv
can be opened in Excel or Google Spreadsheet and used for analytics!
Note:
If you have a lot of products/prices, you might encounter the error:
{'errors': [
{'code': 'QuotaExceeded',
'message': 'You exceeded your quota for the requested resource.',
'details': ''}
]}
This means you are making requests too frequently. The simplest way to solve this is to slightly increase the pause between requests. Increase the pause in the loop time.sleep(1)
from one second to a few seconds.
Links:
1. Amazon SP API: https://developer-docs.amazon.com/sp-api/
2. Amazon FBA Inventory API: https://developer-docs.amazon.com/sp-api/docs/fbainventory-api-v1-reference
3. Amazon Product Pricing API: https://developer-docs.amazon.com/sp-api/docs/product-pricing-api-v0-reference
4. Amazon SP API Registration Overview: https://developer-docs.amazon.com/sp-api/docs/sp-api-registration-overview
5. Amazon Marketplaces IDs: https://developer-docs.amazon.com/sp-api/docs/marketplace-ids
6. Amazon SP API Endpoints: https://developer-docs.amazon.com/sp-api/docs/sp-api-endpoints