Documentation Index
Fetch the complete documentation index at: https://docs.linkup.so/llms.txt
Use this file to discover all available pages before exploring further.
This page covers when to reach for Tasks over the synchronous endpoints,
how to size and poll batches, and how to handle errors, mixed-endpoint
submissions, and the per-task cost model.
When to use Tasks
Tasks is an asynchronous wrapper. It does not make individual calls
cheaper or faster than the synchronous endpoints. Reach for it when:
- Bulk workloads: CRM enrichment, backfills, batch research over hundreds of queries.
- Long-running jobs: submit and poll later, instead of holding an HTTP connection open.
- Scheduled pipelines: kick off a nightly batch, retrieve results in the morning.
- Mixed batches: combine Search, Fetch, and Research calls in one submission.
- Concurrency overflow: the workload exceeds the synchronous concurrency budget.
For interactive single-shot calls (chat UIs, agent steps), call the
synchronous endpoints directly. End-to-end latency via Tasks is identical
or slightly higher, plus the polling round-trip cost.
Batch sizing
The hard limit is 100 tasks per submission. For larger workloads, submit
multiple batches in parallel. There is no penalty for doing so.
# 500 queries → 5 batches of 100
batches = [queries[i:i+100] for i in range(0, len(queries), 100)]
task_ids = []
for batch in batches:
response = client.tasks.create([
{"type": "search", "input": {...}} for q in batch
])
task_ids.extend(t.id for t in response)
Polling strategy
Two strategies are appropriate, depending on the workload:
Per-task polling (when results must be processed as they arrive):
import time
for tid in task_ids:
while True:
result = client.tasks.get(tid)
if result.status in ("completed", "failed"):
break
time.sleep(2)
handle(result)
Bulk polling (when the workload can wait for the entire batch):
import time
remaining = set(task_ids)
while remaining:
all_tasks = client.tasks.list()
for t in all_tasks:
if t.id in remaining and t.status in ("completed", "failed"):
handle(t)
remaining.discard(t.id)
if remaining:
time.sleep(5)
Bulk polling consumes fewer API calls when the batch is large. Per-task
polling provides faster latency-to-first-result.
Polling cadence:
- Mostly
"search" ("fast"/"standard") and "fetch": 1–2 second intervals.
- Mostly
"research": 5 second intervals, with backoff to 30 seconds for long-running batches.
- Mixed: start at 2 seconds, back off to 10.
Maximum poll rate is 1 request per second. Higher rates trigger rate limits
without reducing time-to-completion.
Error handling
Each task in a batch completes or fails independently. A failure in one task
does not fail the batch. Inspect error on individual tasks and retry
only the failures.
for t in completed:
if t.status == "failed":
retry_inputs.append(t.input)
else:
handle(t.output)
if retry_inputs:
client.tasks.create([{"type": "search", "input": i} for i in retry_inputs])
No credit is deducted for failed tasks (consistent with the synchronous
endpoints).
Mixed-endpoint batches
A common pattern combines Search to find URLs and Fetch to read them in one
batch.
[
{ "type": "search", "input": { "q": "Datadog pricing", "depth": "standard", "outputType": "searchResults" } },
{ "type": "fetch", "input": { "url": "https://www.datadoghq.com/pricing/", "renderJs": true } },
{ "type": "fetch", "input": { "url": "https://docs.datadoghq.com/account_management/billing/usage_metrics/", "renderJs": false } }
]
Submission order does not constrain execution order. Tasks run in parallel.
For dependent work (search results feeding fetch URLs), submit the second
batch after the first completes.
Cost behavior
Tasks does not change pricing. Each task is billed exactly as a direct
synchronous call to its endpoint. Tasks is appropriate for workflow
benefits, not cost reduction.
Cost reduction is achieved by selecting the appropriate depth ("standard"
over "deep" when the workload allows), using includeDomains to reduce
work, and designing schemas to be shallow.
Result lifetime
Completed task results are retrievable for a bounded period. For
long-running pipelines, persist results to durable storage as soon as they
complete rather than relying on Tasks for storage.
Common pitfalls
Bad → Fix pairs grounded in the documented behavior of Tasks (per-task
billing identical to direct calls, 1 rps poll cap, bounded result
lifetime).
Polling each task individually when the batch is large. Wastes API
budget against the 1 rps cap.
for tid in task_ids:
while True:
result = client.tasks.get(tid)
if result.status in ("completed", "failed"):
break
time.sleep(0.2)
remaining = set(task_ids)
while remaining:
for t in client.tasks.list():
if t.id in remaining and t.status in ("completed", "failed"):
handle(t)
remaining.discard(t.id)
if remaining:
time.sleep(5)
Expecting batching to reduce per-task cost. Pricing is per-task
identical to direct synchronous calls.
client.tasks.create([
{"type": "research", "input": {"q": q, "outputType": "sourcedAnswer"}}
for q in queries
])
client.tasks.create([
{"type": "search", "input": {"q": q, "depth": "standard", "outputType": "sourcedAnswer",
"includeDomains": TRUSTED_DOMAINS}}
for q in queries
])
Persisting taskIds late and losing results to the bounded result
lifetime.
task_ids = [t.id for t in client.tasks.create(payload)]
# ... process unrelated work for hours ...
results = [client.tasks.get(tid) for tid in task_ids]
task_ids = [t.id for t in client.tasks.create(payload)]
db.save_task_ids(task_ids)
# poll on a schedule, write each completed result to durable storage
Resources