# -*- coding: utf-8 -*- from web3 import Web3 import time # 设置 RPC URL rpc_url = "rpc" # 初始化 web3.py 提供器 w3 = Web3(Web3.HTTPProvider(rpc_url)) # 检查是否连接成功 def check_connection(): try: network_id = w3.eth.chain_id print(f"已成功连接到链节点,网络ID为: {network_id}。正在查询钱包余额...") return True except Exception as e: print("无法连接到链节点,请检查 URL 是否正确") return False # 钱包地址列表 wallet_addresses = [ 'key' ] def query_balances(): if not check_connection(): return wallet_num = 1 for wallet_address in wallet_addresses: retry_count = 0 max_retries = 3 delay = 2 # 延迟时间,单位为秒 result_message = "" result_balance = -1 # 转换为校验和地址 try: checksum_address = w3.to_checksum_address(wallet_address) except Exception as e: print(f"钱包 {wallet_address} 地址格式无效: {str(e)}") wallet_num += 1 continue while retry_count < max_retries: try: # 查询余额 balance = w3.eth.get_balance(checksum_address) # 将余额从 Wei 转换为 Ether balance_eth = w3.from_wei(balance, 'ether') result_message = f"Wallet address: {checksum_address} {wallet_num}, balance: {balance_eth} Token" result_balance = balance_eth print(result_message) wallet_num += 1 break except Exception as e: print(f"查询钱包 {checksum_address} {wallet_num} 余额时发生错误: {str(e)}") retry_count += 1 print(f"正在重试...(第 {retry_count} 次)") time.sleep(delay) if retry_count == max_retries: result_message = f"钱包 {checksum_address} 查询余额失败,已达到最大重试次数。" print(result_message) if __name__ == "__main__": query_balances()