什么网站免费做简历模板,福田蒙派克油耗是多少,运动网站建设,广州模板建站多少钱亚马逊SP-API实战#xff1a;5分钟搞定订单数据抓取#xff08;Python版#xff09; 如果你正在为如何将亚马逊店铺的订单数据自动、实时地同步到自己的ERP或数据库而头疼#xff0c;那么这篇文章就是为你准备的。作为技术负责人或开发者#xff0c;你可能已经厌倦了手动…亚马逊SP-API实战5分钟搞定订单数据抓取Python版如果你正在为如何将亚马逊店铺的订单数据自动、实时地同步到自己的ERP或数据库而头疼那么这篇文章就是为你准备的。作为技术负责人或开发者你可能已经厌倦了手动导出CSV报告或者被MWS API的迁移搞得焦头烂额。亚马逊的Selling Partner APISP-API是官方指定的未来但其复杂的OAuth 2.0流程和AWS SigV4签名常常让第一个“Hello World”式的调用都变得遥不可及。今天我们不谈冗长的理论不搞复杂的架构设计就聚焦一个目标用最短的时间、最少的代码跑通一个能真实抓取到订单列表的Python脚本。我们将绕过那些容易让人迷失的文档细节直击核心步骤并提供可以直接复用的代码块让你在5分钟内看到数据返回的结果。1. 环境准备与核心概念速览在动手写代码之前我们需要快速理清几个关键概念并准备好“弹药”。SP-API的认证体系可以看作两道安全门第一道是OAuth 2.0用于确认你的应用有权限访问某个卖家数据第二道是AWS Signature Version 4 (SigV4)用于对每一次API请求进行加密签名确保请求的完整性和来源可信。你需要准备以下四组凭证它们缺一不可IAM用户凭证在AWS IAM控制台创建的用户访问密钥Access Key ID和Secret Access Key。这个用户需要关联一个能够调用SP-API的角色。应用凭证在亚马逊开发者中心创建SP-API应用后获得的Client ID和Client Secret。刷新令牌这是通过OAuth授权流程获得的一个长期有效的令牌用于换取短期访问令牌。对于卖家自用最快的方式是使用开发者中心的“自我授权”功能生成。商城ID你要抓取订单的亚马逊站点标识符例如北美站是ATVPDKIKX0DER。注意所有凭证尤其是Client Secret和Refresh Token必须像保护数据库密码一样保护起来。绝对不要硬编码在脚本中或提交到版本控制系统。本文示例中使用变量仅为演示生产环境请务必使用环境变量或密钥管理服务。为了简化SigV4签名的复杂性我们不会从头造轮子。本文将使用一个非常轻量级的第三方库amazon-sp-api它封装了认证、签名、请求和错误处理的全过程。首先我们创建独立的Python虚拟环境并安装依赖。# 创建并激活虚拟环境推荐 python -m venv spapi_env source spapi_env/bin/activate # Linux/macOS # spapi_env\Scripts\activate # Windows # 安装核心库 pip install amazon-sp-api python-dotenv这里我们额外安装了python-dotenv用于方便地从.env文件加载环境变量。接下来在你的项目根目录创建一个.env文件用于安全地存储凭证# .env 文件 - 请替换为你的真实凭证 SP_API_REFRESH_TOKEN你的刷新令牌 SP_API_CLIENT_ID你的客户端ID SP_API_CLIENT_SECRET你的客户端密钥 AWS_ACCESS_KEY_ID你的IAM用户Access Key AWS_SECRET_ACCESS_KEY你的IAM用户Secret Key SP_API_REGIONus-east-1 # SP-API网关区域通常固定为此 SP_API_MARKETPLACE_IDATVPDKIKX0DER # 你的目标商城ID2. 授权流程的极简实践与令牌获取很多教程在授权这一步就卡住了因为它涉及在开发者中心进行多次点击配置。我们化繁为简专注于最直接有效的“自我授权”路径这适用于应用只访问自己卖家账户数据的场景。登录到亚马逊开发者中心进入你的SP-API应用设置页面。你需要确保应用已配置了正确的权限。对于基础订单抓取至少需要sellingpartnerapi::orders:read这个权限范围。配置完成后找到“生成授权链接”或“自我授权”的选项。点击后系统会生成一个URL。你用你的卖家账户登录并授权页面跳转后URL的参数中会包含一个code授权码。你需要用这个code去交换refresh_token。虽然可以手动用curl命令完成但更简单的方法是使用amazon-sp-api库自带的工具方法或者直接使用开发者中心提供的“交换令牌”功能页面直接输入code获取refresh_token。一旦你获得了refresh_token并存入.env文件后续的令牌刷新就完全自动化了。库内部会帮你处理用refresh_token换取一小时有效的access_token的过程。下面这段代码演示了如何初始化客户端它会自动完成令牌获取和刷新# sp_api_demo.py import os from dotenv import load_dotenv from sp_api.api import Orders from sp_api.base import Marketplaces, SellingApiException # 1. 加载环境变量 load_dotenv() # 2. 初始化订单API客户端 try: orders_client Orders( marketplaceMarketplaces.US, # 对应北美市场 refresh_tokenos.getenv(SP_API_REFRESH_TOKEN), accountdefault, # 账户别名用于区分多店铺配置 credentials{ refresh_token: os.getenv(SP_API_REFRESH_TOKEN), client_id: os.getenv(SP_API_CLIENT_ID), client_secret: os.getenv(SP_API_CLIENT_SECRET), access_key: os.getenv(AWS_ACCESS_KEY_ID), secret_key: os.getenv(AWS_SECRET_ACCESS_KEY), role_arn: None # 自用场景下通常不需要显式指定role_arn } ) print(SP-API客户端初始化成功) except Exception as e: print(f客户端初始化失败: {e}) exit(1)如果上述代码没有报错恭喜你最复杂的认证和签名环节已经由库底层搞定。你可以看到我们完全不需要手动构造x-amz-access-token头也不需要关心SigV4签名的具体算法。3. 核心调用抓取订单列表与详情客户端准备就绪后调用API获取数据就变得异常简单。getOrders接口是获取订单列表的入口支持按创建时间、更新时间、订单状态等多种条件筛选。一个常见的需求是获取最近更新的待处理订单。3.1 获取订单列表我们首先尝试获取过去24小时内创建的所有订单。这里有一个关键点SP-API的时间参数要求是UTC时间的ISO 8601格式。from datetime import datetime, timedelta, timezone def fetch_recent_orders(client, hours_back24): 获取指定小时数内创建的订单 # 计算时间点 created_after (datetime.now(timezone.utc) - timedelta(hourshours_back)).isoformat(timespecseconds) print(f正在查询 {created_after} 之后创建的订单...) try: # 调用 getOrders API response client.get_orders( MarketplaceIds[os.getenv(SP_API_MARKETPLACE_ID)], CreatedAftercreated_after, OrderStatuses[Unshipped, PartiallyShipped], # 可选筛选特定状态 MaxResultsPerPage50 # 每页最大数量可选默认50 ) # 解析响应 if response.payload and Orders in response.payload: orders response.payload[Orders] print(f成功获取到 {len(orders)} 个订单。) for order in orders: print(f 订单ID: {order[AmazonOrderId]}, f状态: {order[OrderStatus]}, f金额: {order.get(OrderTotal, {}).get(Amount, N/A)} {order.get(OrderTotal, {}).get(CurrencyCode, )}) # 检查是否有下一页 next_token response.payload.get(NextToken) if next_token: print(f存在更多订单下一页令牌: {next_token[:30]}...) # 这里可以递归或循环调用 get_orders(NextTokennext_token) 获取全部 return orders, next_token else: print(未查询到符合条件的订单。) return [], None except SellingApiException as e: print(fAPI调用异常: {e.error}, 详情: {e.message}) return [], None # 执行查询 orders_list, next_page_token fetch_recent_orders(orders_client)这段代码已经是一个可工作的最小实现。它处理了基本的请求、响应解析和错误捕获。SellingApiException是库封装的异常类包含了亚马逊返回的错误代码和信息对于调试非常有用。3.2 处理分页与获取订单商品详情订单列表可能很长因此分页是必须处理的。getOrders接口的响应中如果包含NextToken就意味着还有更多数据。我们需要一个循环来获取所有页面。def fetch_all_orders(client, initial_created_after): 获取从某个时间点开始的所有订单自动处理分页 all_orders [] next_token None created_after initial_created_after while True: try: if next_token: # 使用NextToken获取下一页 response client.get_orders(NextTokennext_token) else: # 首次请求使用时间筛选 response client.get_orders( MarketplaceIds[os.getenv(SP_API_MARKETPLACE_ID)], CreatedAftercreated_after, MaxResultsPerPage100 # 每页最多100条 ) page_orders response.payload.get(Orders, []) all_orders.extend(page_orders) print(f已获取 {len(page_orders)} 个订单累计 {len(all_orders)} 个。) # 检查是否还有下一页 next_token response.payload.get(NextToken) if not next_token: break # 建议在分页请求间加入短暂延迟以示友好 import time time.sleep(0.5) except SellingApiException as e: print(f分页查询过程中出错: {e}) break return all_orders获取到订单列表后我们通常需要每个订单包含的商品明细SKU、数量、价格等。这需要调用另一个接口getOrderItems。好消息是amazon-sp-api库也为其提供了便捷的方法。def fetch_order_items(client, amazon_order_id): 根据订单ID获取商品明细 try: items_response client.get_order_items(amazon_order_id) order_items items_response.payload.get(OrderItems, []) print(f订单 {amazon_order_id} 包含 {len(order_items)} 个商品。) for item in order_items: print(f - ASIN: {item.get(ASIN)}, fSKU: {item.get(SellerSKU, N/A)}, f标题: {item.get(Title, )[:50]}..., f数量: {item.get(QuantityOrdered)}, f价格: {item.get(ItemPrice, {}).get(Amount, N/A)}) return order_items except SellingApiException as e: print(f获取订单 {amazon_order_id} 商品失败: {e}) return [] # 示例获取第一个订单的商品 if orders_list: first_order_id orders_list[0][AmazonOrderId] fetch_order_items(orders_client, first_order_id)4. 生产级加固错误处理、限流与数据持久化一个能用于生产环境的脚本绝不能仅仅在理想状态下运行。SP-API有严格的速率限制网络可能不稳定数据也需要落地存储。本章节我们为脚本穿上“盔甲”。4.1 速率限制与优雅重试SP-API的限流策略通常包含一个“恢复速率”和一个“突发配额”。当请求过快时你会收到HTTP 429状态码。粗暴地不断重试会导致账户被进一步限制。正确的做法是实现带有指数退避的智能重试。import time from requests.exceptions import RequestException def make_api_call_with_retry(api_call_func, max_retries5, initial_delay1): 封装API调用实现带指数退避的重试机制 retries 0 delay initial_delay while retries max_retries: try: response api_call_func() # 检查是否为429错误库可能已将其抛出为SellingApiException return response except SellingApiException as e: if e.status_code 429: # 速率限制 retries 1 if retries max_retries: print(f达到最大重试次数({max_retries})放弃请求。) raise print(f触发速率限制(429)第{retries}次重试等待{delay}秒...) time.sleep(delay) delay * 2 # 指数退避 continue else: # 其他API错误直接抛出 raise except RequestException as e: # 网络错误 retries 1 if retries max_retries: print(f网络错误达到最大重试次数放弃请求。) raise print(f网络请求异常({e})第{retries}次重试等待{delay}秒...) time.sleep(delay) delay * 2 continue # 理论上不会执行到这里 return None # 使用示例用重试包装器调用API def safe_get_orders(client, **kwargs): def _call(): return client.get_orders(**kwargs) return make_api_call_with_retry(_call) # 这样调用就具备了自动重试能力 orders_response safe_get_orders(orders_client, MarketplaceIds[os.getenv(SP_API_MARKETPLACE_ID)], CreatedAftercreated_after)4.2 数据持久化与增量同步将数据抓取下来后我们需要将其存储到数据库或文件中。一个常见的模式是使用SQLite或MySQL并记录每次同步的“水印”最后抓取到的订单创建时间或更新时间以实现增量同步避免重复抓取和历史数据负担。下面是一个使用SQLite的简单示例包含创建表、插入数据和更新水印的逻辑import sqlite3 from contextlib import closing def init_database(db_pathamazon_orders.db): 初始化数据库表 with closing(sqlite3.connect(db_path)) as conn: cursor conn.cursor() # 创建订单表 cursor.execute( CREATE TABLE IF NOT EXISTS orders ( amazon_order_id TEXT PRIMARY KEY, purchase_date TEXT, order_status TEXT, marketplace_id TEXT, order_total_amount REAL, order_total_currency TEXT, fulfillment_channel TEXT, last_updated_time TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ) # 创建订单商品表 cursor.execute( CREATE TABLE IF NOT EXISTS order_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, amazon_order_id TEXT, asin TEXT, seller_sku TEXT, title TEXT, quantity_ordered INTEGER, item_price_amount REAL, item_price_currency TEXT, FOREIGN KEY (amazon_order_id) REFERENCES orders (amazon_order_id) ) ) # 创建同步状态表记录最后一次成功抓取的时间点 cursor.execute( CREATE TABLE IF NOT EXISTS sync_watermark ( id INTEGER PRIMARY KEY CHECK (id 1), last_synced_after TEXT ) ) conn.commit() print(数据库表初始化完成。) def save_order_and_items(conn, order_data, items_data): 保存订单及商品数据到数据库 cursor conn.cursor() try: # 插入或更新订单 cursor.execute( INSERT OR REPLACE INTO orders (amazon_order_id, purchase_date, order_status, marketplace_id, order_total_amount, order_total_currency, fulfillment_channel, last_updated_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?) , ( order_data[AmazonOrderId], order_data.get(PurchaseDate), order_data.get(OrderStatus), order_data.get(MarketplaceId), order_data.get(OrderTotal, {}).get(Amount), order_data.get(OrderTotal, {}).get(CurrencyCode), order_data.get(FulfillmentChannel), order_data.get(LastUpdateDate) # 使用最后更新时间作为增量依据更可靠 )) # 插入商品明细先删除旧的再插入新的确保数据最新 cursor.execute(DELETE FROM order_items WHERE amazon_order_id ?, (order_data[AmazonOrderId],)) for item in items_data: cursor.execute( INSERT INTO order_items (amazon_order_id, asin, seller_sku, title, quantity_ordered, item_price_amount, item_price_currency) VALUES (?, ?, ?, ?, ?, ?, ?) , ( order_data[AmazonOrderId], item.get(ASIN), item.get(SellerSKU), item.get(Title), item.get(QuantityOrdered), item.get(ItemPrice, {}).get(Amount), item.get(ItemPrice, {}).get(CurrencyCode) )) conn.commit() except sqlite3.Error as e: conn.rollback() print(f保存订单 {order_data[AmazonOrderId]} 数据时出错: {e}) raise4.3 完整的自动化脚本框架将以上所有部分组合起来我们可以形成一个具备错误恢复、增量同步和持久化能力的完整脚本骨架。# sync_orders.py import os import time import sqlite3 from datetime import datetime, timezone from dotenv import load_dotenv from sp_api.api import Orders from sp_api.base import Marketplaces, SellingApiException # ... (此处包含之前定义的 make_api_call_with_retry, init_database, save_order_and_items 函数) ... def get_last_synced_time(conn): 从数据库获取上一次同步的时间点 cursor conn.cursor() cursor.execute(SELECT last_synced_after FROM sync_watermark WHERE id 1) row cursor.fetchone() if row and row[0]: # 使用上次记录的时间可以稍微提前一点避免漏掉边界订单 return row[0] else: # 首次运行默认同步过去2小时的订单 first_run (datetime.now(timezone.utc) - timedelta(hours2)).isoformat(timespecseconds) return first_run def update_sync_watermark(conn, new_watermark): 更新同步水印 cursor conn.cursor() cursor.execute( INSERT OR REPLACE INTO sync_watermark (id, last_synced_after) VALUES (1, ?) , (new_watermark,)) conn.commit() def main(): load_dotenv() init_database() # 初始化客户端 client Orders( marketplaceMarketplaces.US, refresh_tokenos.getenv(SP_API_REFRESH_TOKEN), credentials{ refresh_token: os.getenv(SP_API_REFRESH_TOKEN), client_id: os.getenv(SP_API_CLIENT_ID), client_secret: os.getenv(SP_API_CLIENT_SECRET), access_key: os.getenv(AWS_ACCESS_KEY_ID), secret_key: os.getenv(AWS_SECRET_ACCESS_KEY), } ) with sqlite3.connect(amazon_orders.db) as conn: # 确定本次同步的起始时间 last_synced get_last_synced_time(conn) print(f本次同步将从 {last_synced} 之后开始查询。) # 使用增量同步基于LastUpdatedAfter更可靠 # 注意这里为了演示仍使用CreatedAfter。生产环境建议使用LastUpdatedAfter并存储其水印。 all_orders [] next_token None latest_order_time last_synced try: while True: # 使用带重试的封装进行调用 def _call(): if next_token: return client.get_orders(NextTokennext_token) else: return client.get_orders( MarketplaceIds[os.getenv(SP_API_MARKETPLACE_ID)], CreatedAfterlast_synced, MaxResultsPerPage50 ) response make_api_call_with_retry(_call) orders_batch response.payload.get(Orders, []) for order in orders_batch: # 获取商品详情 items fetch_order_items(client, order[AmazonOrderId]) # 保存到数据库 save_order_and_items(conn, order, items) # 更新本次同步遇到的最晚订单时间 order_time order.get(PurchaseDate) or order.get(LastUpdateDate) if order_time and order_time latest_order_time: latest_order_time order_time all_orders.extend(orders_batch) print(f已处理 {len(orders_batch)} 个订单累计 {len(all_orders)} 个。) next_token response.payload.get(NextToken) if not next_token: break time.sleep(0.5) # 分页间延迟 except Exception as e: print(f同步过程发生严重错误: {e}) # 可以选择记录错误日志并尝试在下次运行时恢复 finally: # 无论成功与否更新水印为本次尝试的起始时间或最新时间 # 更稳健的做法是只在完全成功时更新为latest_order_time update_sync_watermark(conn, latest_order_time) print(f同步流程结束。共处理 {len(all_orders)} 个订单。最新水印更新为: {latest_order_time}) if __name__ __main__: main()这个脚本框架已经具备了生产环境使用的雏形。你可以将其设置为定时任务如使用cron或systemd timer每隔几分钟或几小时运行一次从而实现订单数据的准实时同步。在实际部署中我强烈建议先从沙盒环境开始测试。amazon-sp-api库支持通过设置sandboxTrue参数来切换到沙盒环境那里有模拟的订单数据不会影响你的实际店铺。把所有的流程在沙盒中跑通、跑稳处理完所有边界情况后再切换到生产环境这是最稳妥的路径。