Skip to main content

安装

pip install l402kit crewai

完整示例

import os
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from l402kit import L402Client
from l402kit.wallets import BlinkWallet

# Set up wallet
wallet = BlinkWallet(api_key=os.environ["BLINK_API_KEY"])
client = L402Client(wallet=wallet, budget_sats=1000)

# Wrap L402Client as a CrewAI tool
class L402FetchTool(BaseTool):
    name: str = "l402_fetch"
    description: str = "Fetch data from a paid API using Bitcoin Lightning micropayment"

    def _run(self, url: str) -> str:
        import asyncio
        response = asyncio.run(client.get(url))
        return response.text

l402_tool = L402FetchTool()

# Define agent with L402 capability
researcher = Agent(
    role="API Researcher",
    goal="Fetch data from paid APIs and return structured results",
    backstory="""You are a research agent with access to premium data APIs.
    When an API requires payment (HTTP 402), you use the l402_fetch tool
    to pay automatically with Bitcoin Lightning and retrieve the data.
    Budget: 1000 sats per session.""",
    tools=[l402_tool],
    verbose=True
)

# Define task
fetch_task = Task(
    description="Fetch Bitcoin price data from https://l402kit.com/api/demo",
    expected_output="JSON response with the fetched data",
    agent=researcher
)

# Run the crew
crew = Crew(
    agents=[researcher],
    tasks=[fetch_task],
    process=Process.sequential,
    verbose=True
)

result = crew.kickoff()
print(result)
print(f"Sats spent: {client.spent_sats}")

预算控制

from l402kit import L402Client, BudgetExceededError

client = L402Client(wallet=wallet, budget_sats=500)

class SafeL402Tool(BaseTool):
    name: str = "l402_fetch"
    description: str = "Fetch from paid API (budget: 500 sats)"

    def _run(self, url: str) -> str:
        try:
            import asyncio
            return asyncio.run(client.get(url)).text
        except BudgetExceededError:
            return "Budget exceeded. Cannot fetch more data this session."

多智能体配置

# Orchestrator delegates to sub-agents with separate budgets
scout = Agent(
    role="Scout",
    goal="Identify which paid APIs are needed",
    backstory="You plan API calls but don't execute them.",
    tools=[]
)

fetcher = Agent(
    role="Fetcher",
    goal="Execute paid API calls",
    backstory="You fetch data from paid APIs using l402_fetch.",
    tools=[L402FetchTool()]
)

crew = Crew(
    agents=[scout, fetcher],
    tasks=[plan_task, fetch_task],
    process=Process.sequential
)

环境变量

export BLINK_API_KEY="your-blink-api-key"
# or
export ALBY_TOKEN="your-alby-token"
请参阅 钱包快速入门,60 秒内完成钱包配置。