Skip to main content
Schedule Functions to run automatically at specific times using cron expressions.

Overview

Function scheduling allows you to:
  • Run automations on a regular schedule (daily, weekly, hourly, etc.)
  • Execute tasks at specific times
  • Automate recurring workflows
  • Monitor websites continuously
Schedules are configured in the Notte Console and run automatically on Notte’s infrastructure.

Creating a Schedule

Via Console

  1. Go to console.notte.cc/workflows
  2. Select your Function
  3. Click “Schedule”
  4. Enter cron expression
  5. Set parameters (variables)
  6. Save schedule

Cron Expression Format

Cron expressions define when Functions run:
* * * * *
│ │ │ │ │
│ │ │ │ └─── Day of week (0-6, Sunday=0)
│ │ │ └───── Month (1-12)
│ │ └─────── Day of month (1-31)
│ └───────── Hour (0-23)
└─────────── Minute (0-59)

Common Schedules

Every Minute

* * * * *
Run function every minute. Use for: Real-time monitoring, high-frequency updates

Every Hour

0 * * * *
Run at the start of every hour (e.g., 1:00, 2:00, 3:00). Use for: Hourly data collection, periodic checks

Every Day at 9 AM

0 9 * * *
Run every day at 9:00 AM. Use for: Daily reports, morning data sync

Every Monday at 8 AM

0 8 * * 1
Run every Monday at 8:00 AM. Use for: Weekly reports, start-of-week tasks

First Day of Month

0 0 1 * *
Run at midnight on the 1st of every month. Use for: Monthly reports, billing cycles

Every 15 Minutes

*/15 * * * *
Run every 15 minutes. Use for: Frequent monitoring, regular updates

Weekdays at 6 PM

0 18 * * 1-5
Run at 6:00 PM Monday through Friday. Use for: End-of-day business tasks

Multiple Times Per Day

0 9,12,15,18 * * *
Run at 9 AM, 12 PM, 3 PM, and 6 PM every day. Use for: Regular business hours monitoring

Use Cases

1. Daily Price Monitoring

Monitor competitor pricing every day:
price_monitor.py
from datetime import datetime
from notte_sdk import NotteClient

def run(competitor_urls: list[str]):
    """Monitor competitor prices daily."""
    client = NotteClient()
    prices = []

    for url in competitor_urls:
        with client.Session() as session:
            session.execute(type="goto", url=url)
            price = session.scrape(
                instructions="Extract product price"
            )
            prices.append({"url": url, "price": price})

    return {"prices": prices, "checked_at": datetime.now().isoformat()}
Schedule: 0 9 * * * (Every day at 9 AM)

2. Hourly Data Collection

Collect data every hour:
# data_collector.py
def run(source_url: str):
    """Collect data every hour."""
    from notte_sdk import NotteClient

    client = NotteClient()

    with client.Session() as session:
        session.execute(type="goto", url=source_url)
        data = session.scrape()

    return {"data": data, "timestamp": datetime.now().isoformat()}
Schedule: 0 * * * * (Every hour)

3. Weekly Reports

Generate reports every Monday:
# weekly_report.py
def run():
    """Generate weekly report."""
    from notte_sdk import NotteClient

    client = NotteClient()

    # Collect data from multiple sources
    report_data = []

    sources = [
        "https://analytics.example.com",
        "https://dashboard.example.com"
    ]

    for url in sources:
        with client.Session() as session:
            session.execute(type="goto", url=url)
            data = session.scrape(
                instructions="Extract weekly metrics"
            )
            report_data.append(data)

    return {"report": report_data, "week": datetime.now().isocalendar()[1]}
Schedule: 0 8 * * 1 (Every Monday at 8 AM)

4. Continuous Monitoring

Monitor website availability every 5 minutes:
uptime_monitor.py
from datetime import datetime
from notte_sdk import NotteClient

def run(target_url: str, expected_text: str):
    """Monitor site uptime."""
    client = NotteClient()

    try:
        with client.Session(timeout_minutes=2) as session:
            session.execute(type="goto", url=target_url)
            content = session.scrape()

            is_up = expected_text in content

        return {
            "status": "up" if is_up else "down",
            "url": target_url,
            "checked_at": datetime.now().isoformat()
        }

    except Exception as e:
        return {
            "status": "error",
            "url": target_url,
            "error": str(e),
            "checked_at": datetime.now().isoformat()
        }
Schedule: */5 * * * * (Every 5 minutes)

Schedule Parameters

Static Parameters

Set fixed parameters for scheduled runs:
Schedule: 0 9 * * *
Parameters:
  url: "https://example.com"
  max_results: 10
Every scheduled run uses these parameters.

Dynamic Parameters

Use environment variables or context:
import os
from datetime import datetime

def run():
    # Access environment variables
    api_key = os.getenv("API_KEY")
    webhook_url = os.getenv("WEBHOOK_URL")

    # Use current date
    today = datetime.now().strftime("%Y-%m-%d")

    # Your automation
    pass

Managing Schedules

View Scheduled Runs

Check upcoming scheduled executions in the Console:
  1. Go to Function details
  2. View “Scheduled Runs” section
  3. See next execution time
  4. View execution history

Pause Schedule

Temporarily disable schedule:
  1. Go to Function schedules
  2. Toggle schedule off
  3. Resume later without losing configuration

Delete Schedule

Remove schedule permanently:
  1. Go to Function schedules
  2. Delete schedule
  3. Function can still be invoked manually

Monitoring Scheduled Functions

Execution History

View past scheduled runs:
from notte_sdk import NotteClient

client = NotteClient()

# List recent runs
runs = client.functions.list_runs(
    function_id="function_abc123",
    only_active=False  # Include completed runs
)

for run in runs.runs:
    print(f"Run {run.function_run_id}:")
    print(f"  Status: {run.status}")
    print(f"  Created: {run.created_at}")
    print(f"  Updated: {run.updated_at}")

Failed Runs

Handle scheduled failures:
# In your function
def run():
    try:
        # Your automation
        result = perform_automation()

        # Send success notification
        notify_success(result)

        return result

    except Exception as e:
        # Send failure alert
        notify_failure(str(e))

        # Return error info
        return {
            "success": False,
            "error": str(e)
        }

Alerts

Set up alerts for failures:
def run():
    import requests

    try:
        # Your automation
        result = perform_automation()
        return result

    except Exception as e:
        # Send to Slack/Discord/Email
        requests.post(
            os.getenv("WEBHOOK_URL"),
            json={
                "text": f"Scheduled function failed: {str(e)}",
                "function_id": "function_abc123"
            }
        )

        raise  # Re-raise to mark run as failed

Best Practices

1. Use Appropriate Frequency

Match schedule to data update frequency:
# Data updates hourly
0 * * * *  ✅ Good

# Data updates hourly, but checking every minute
* * * * *  ❌ Wasteful

2. Add Error Handling

Scheduled functions should handle errors gracefully:
def run():
    try:
        result = perform_automation()
        return {"success": True, "data": result}
    except Exception as e:
        # Log error, send alert
        return {"success": False, "error": str(e)}

3. Set Reasonable Timeouts

def run():
    client = NotteClient()

    # Set timeout appropriate for schedule
    with client.Session(timeout_minutes=10) as session:
        # Your automation
        pass

4. Monitor Schedule Health

Check that schedules are running:
# weekly_health_check.py
def run():
    """Check if all schedules are healthy."""
    from notte_sdk import NotteClient

    client = NotteClient()

    # Get all workflow runs from last 24 hours
    workflows = client.functions.list()

    health_report = []
    for workflow in workflows.workflows:
        runs = client.functions.list_runs(
            function_id=workflow.function_id
        )

        # Check recent runs
        recent_failures = [
            r for r in runs.runs
            if r.status == "failed"
        ]

        health_report.append({
            "function_id": workflow.function_id,
            "recent_failures": len(recent_failures)
        })

    return health_report
Schedule: 0 0 * * * (Daily health check)

5. Use Idempotent Operations

Design functions to handle duplicate runs:

def already_processed(date: str):
    """Check if date has already been processed."""
    # TODO: Implement logic to check if date has already been processed
    False

def process_data(date: str):
    """Process data for the given date."""
    # TODO: Implement logic to process data for the given date
    "processed"

def mark_processed(date: str):
    """Mark date as processed."""
    # TODO: Implement logic to mark date as processed
    "mark_as_processed"

def run(date: str):
    """Idempotent function - safe to run multiple times."""
    # Use date as unique identifier
    # Skip if already processed
    if already_processed(date):
        return {"status": "already_processed", "date": date}

    # Process data
    result = process_data(date)

    # Mark as processed
    mark_processed(date)

    return {"status": "processed", "date": date, "result": result}

Timezone Considerations

Schedules run in UTC by default. Convert to your timezone:
# If you want 9 AM EST (UTC-5), use:
# 14 * * * * (9 AM + 5 hours = 14:00 UTC)

# If you want 9 AM PST (UTC-8), use:
# 17 * * * * (9 AM + 8 hours = 17:00 UTC)
Or handle timezone in function:
from datetime import datetime
import pytz

def run():
    # Get current time in specific timezone
    tz = pytz.timezone("America/New_York")
    current_time = datetime.now(tz)

    # Your automation
    pass

Next Steps