Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.linkup.so/llms.txt

Use this file to discover all available pages before exploring further.

This page covers when to reach for Tasks over the synchronous endpoints, how to size and poll batches, and how to handle errors, mixed-endpoint submissions, and the per-task cost model.

When to use Tasks

Tasks is an asynchronous wrapper. It does not make individual calls cheaper or faster than the synchronous endpoints. Reach for it when:
  • Bulk workloads: CRM enrichment, backfills, batch research over hundreds of queries.
  • Long-running jobs: submit and poll later, instead of holding an HTTP connection open.
  • Scheduled pipelines: kick off a nightly batch, retrieve results in the morning.
  • Mixed batches: combine Search, Fetch, and Research calls in one submission.
  • Concurrency overflow: the workload exceeds the synchronous concurrency budget.
For interactive single-shot calls (chat UIs, agent steps), call the synchronous endpoints directly. End-to-end latency via Tasks is identical or slightly higher, plus the polling round-trip cost.

Batch sizing

The hard limit is 100 tasks per submission. For larger workloads, submit multiple batches in parallel. There is no penalty for doing so.
# 500 queries → 5 batches of 100
batches = [queries[i:i+100] for i in range(0, len(queries), 100)]
task_ids = []
for batch in batches:
    response = client.tasks.create([
        {"type": "search", "input": {...}} for q in batch
    ])
    task_ids.extend(t.id for t in response)

Polling strategy

Two strategies are appropriate, depending on the workload: Per-task polling (when results must be processed as they arrive):
import time
for tid in task_ids:
    while True:
        result = client.tasks.get(tid)
        if result.status in ("completed", "failed"):
            break
        time.sleep(2)
    handle(result)
Bulk polling (when the workload can wait for the entire batch):
import time
remaining = set(task_ids)
while remaining:
    all_tasks = client.tasks.list()
    for t in all_tasks:
        if t.id in remaining and t.status in ("completed", "failed"):
            handle(t)
            remaining.discard(t.id)
    if remaining:
        time.sleep(5)
Bulk polling consumes fewer API calls when the batch is large. Per-task polling provides faster latency-to-first-result. Polling cadence:
  • Mostly "search" ("fast"/"standard") and "fetch": 1–2 second intervals.
  • Mostly "research": 5 second intervals, with backoff to 30 seconds for long-running batches.
  • Mixed: start at 2 seconds, back off to 10.
Maximum poll rate is 1 request per second. Higher rates trigger rate limits without reducing time-to-completion.

Error handling

Each task in a batch completes or fails independently. A failure in one task does not fail the batch. Inspect error on individual tasks and retry only the failures.
for t in completed:
    if t.status == "failed":
        retry_inputs.append(t.input)
    else:
        handle(t.output)

if retry_inputs:
    client.tasks.create([{"type": "search", "input": i} for i in retry_inputs])
No credit is deducted for failed tasks (consistent with the synchronous endpoints).

Mixed-endpoint batches

A common pattern combines Search to find URLs and Fetch to read them in one batch.
[
  { "type": "search", "input": { "q": "Datadog pricing", "depth": "standard", "outputType": "searchResults" } },
  { "type": "fetch",  "input": { "url": "https://www.datadoghq.com/pricing/", "renderJs": true } },
  { "type": "fetch",  "input": { "url": "https://docs.datadoghq.com/account_management/billing/usage_metrics/", "renderJs": false } }
]
Submission order does not constrain execution order. Tasks run in parallel. For dependent work (search results feeding fetch URLs), submit the second batch after the first completes.

Cost behavior

Tasks does not change pricing. Each task is billed exactly as a direct synchronous call to its endpoint. Tasks is appropriate for workflow benefits, not cost reduction. Cost reduction is achieved by selecting the appropriate depth ("standard" over "deep" when the workload allows), using includeDomains to reduce work, and designing schemas to be shallow.

Result lifetime

Completed task results are retrievable for a bounded period. For long-running pipelines, persist results to durable storage as soon as they complete rather than relying on Tasks for storage.

Common pitfalls

Bad → Fix pairs grounded in the documented behavior of Tasks (per-task billing identical to direct calls, 1 rps poll cap, bounded result lifetime). Polling each task individually when the batch is large. Wastes API budget against the 1 rps cap.
Bad
for tid in task_ids:
    while True:
        result = client.tasks.get(tid)
        if result.status in ("completed", "failed"):
            break
        time.sleep(0.2)
Fix
remaining = set(task_ids)
while remaining:
    for t in client.tasks.list():
        if t.id in remaining and t.status in ("completed", "failed"):
            handle(t)
            remaining.discard(t.id)
    if remaining:
        time.sleep(5)
Expecting batching to reduce per-task cost. Pricing is per-task identical to direct synchronous calls.
Bad
client.tasks.create([
    {"type": "research", "input": {"q": q, "outputType": "sourcedAnswer"}}
    for q in queries
])
Fix
client.tasks.create([
    {"type": "search", "input": {"q": q, "depth": "standard", "outputType": "sourcedAnswer",
                                   "includeDomains": TRUSTED_DOMAINS}}
    for q in queries
])
Persisting taskIds late and losing results to the bounded result lifetime.
Bad
task_ids = [t.id for t in client.tasks.create(payload)]
# ... process unrelated work for hours ...
results = [client.tasks.get(tid) for tid in task_ids]
Fix
task_ids = [t.id for t in client.tasks.create(payload)]
db.save_task_ids(task_ids)
# poll on a schedule, write each completed result to durable storage

Resources