Skip to content
Merged
132 changes: 93 additions & 39 deletions src/apify/_charging.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from __future__ import annotations

import json
import math
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import TYPE_CHECKING, Protocol
from typing import TYPE_CHECKING, Any, Protocol

from pydantic import TypeAdapter

from crawlee._utils.context import ensure_context

from apify._models import ActorRun, PricingModel
from apify._models import (
ActorRun,
FlatPricePerMonthActorPricingInfo,
FreeActorPricingInfo,
PayPerEventActorPricingInfo,
PricePerDatasetItemActorPricingInfo,
PricingModel,
)
from apify._utils import docs_group
from apify.log import logger
from apify.storages import Dataset
Expand Down Expand Up @@ -115,20 +123,12 @@ class ChargingManagerImplementation(ChargingManager):

def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None:
self._max_total_charge_usd = configuration.max_total_charge_usd or Decimal('inf')
self._configuration = configuration
self._is_at_home = configuration.is_at_home
self._actor_run_id = configuration.actor_run_id
self._purge_charging_log_dataset = configuration.purge_on_start
self._pricing_model: PricingModel | None = None

if configuration.test_pay_per_event:
if self._is_at_home:
raise ValueError(
'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported '
'in a local development environment'
)

self._pricing_model = 'PAY_PER_EVENT'

self._client = client
self._charging_log_dataset: Dataset | None = None

Expand All @@ -140,41 +140,47 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No

async def __aenter__(self) -> None:
"""Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually."""
self.active = True

if self._is_at_home:
# Running on the Apify platform - fetch pricing info for the current run.

if self._actor_run_id is None:
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')
# Validate config
if self._configuration.test_pay_per_event and self._is_at_home:
raise ValueError(
'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported '
'in a local development environment'
)

run = run_validator.validate_python(await self._client.run(self._actor_run_id).get())
if run is None:
raise RuntimeError('Actor run not found')
self.active = True

if run.pricing_info is not None:
self._pricing_model = run.pricing_info.pricing_model
# Retrieve pricing information from env vars or API
pricing_data = await self._fetch_pricing_info()
pricing_info = pricing_data['pricing_info']
charged_event_counts = pricing_data['charged_event_counts']
max_total_charge_usd = pricing_data['max_total_charge_usd']

if run.pricing_info.pricing_model == 'PAY_PER_EVENT':
for event_name, event_pricing in run.pricing_info.pricing_per_event.actor_charge_events.items():
self._pricing_info[event_name] = PricingInfoItem(
price=event_pricing.event_price_usd,
title=event_pricing.event_title,
)
# Set pricing model
if self._configuration.test_pay_per_event:
self._pricing_model = 'PAY_PER_EVENT'
else:
self._pricing_model = pricing_info.pricing_model if pricing_info else None

# Load per-event pricing information
if pricing_info and pricing_info.pricing_model == 'PAY_PER_EVENT':
for event_name, event_pricing in pricing_info.pricing_per_event.actor_charge_events.items():
self._pricing_info[event_name] = PricingInfoItem(
price=event_pricing.event_price_usd,
title=event_pricing.event_title,
)

self._max_total_charge_usd = run.options.max_total_charge_usd or self._max_total_charge_usd
self._max_total_charge_usd = max_total_charge_usd

for event_name, count in (run.charged_event_counts or {}).items():
price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price
self._charging_state[event_name] = ChargingStateItem(
charge_count=count,
total_charged_amount=count * price,
)
# Load charged event counts
for event_name, count in charged_event_counts.items():
price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price
self._charging_state[event_name] = ChargingStateItem(
charge_count=count,
total_charged_amount=count * price,
)

# Set up charging log dataset for local development
if not self._is_at_home and self._pricing_model == 'PAY_PER_EVENT':
# We are not running on the Apify platform, but PPE is enabled for testing - open a dataset that
Comment thread
janbuchar marked this conversation as resolved.
# will contain a log of all charge calls for debugging purposes.

if self._purge_charging_log_dataset:
dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)
await dataset.drop()
Expand Down Expand Up @@ -328,6 +334,54 @@ def get_charged_event_count(self, event_name: str) -> int:
def get_max_total_charge_usd(self) -> Decimal:
return self._max_total_charge_usd

async def _fetch_pricing_info(self) -> dict[str, Any]:
Comment thread
janbuchar marked this conversation as resolved.
Outdated
"""Fetch pricing information from environment variables or API."""
# Check if pricing info is available via environment variables
if self._configuration.actor_pricing_info and self._configuration.charged_event_counts:
Comment thread
Mantisus marked this conversation as resolved.
Outdated
charged_counts = json.loads(self._configuration.charged_event_counts)

# Validate pricing info with proper discriminator support
pricing_info_adapter: TypeAdapter[
FreeActorPricingInfo
| FlatPricePerMonthActorPricingInfo
| PricePerDatasetItemActorPricingInfo
| PayPerEventActorPricingInfo
] = TypeAdapter(
FreeActorPricingInfo
| FlatPricePerMonthActorPricingInfo
| PricePerDatasetItemActorPricingInfo
| PayPerEventActorPricingInfo
)
pricing_info = pricing_info_adapter.validate_json(self._configuration.actor_pricing_info)
Comment thread
janbuchar marked this conversation as resolved.
Outdated

return {
'pricing_info': pricing_info,
'charged_event_counts': charged_counts,
'max_total_charge_usd': self._configuration.max_total_charge_usd or Decimal('inf'),
}

# Fall back to API call
if self._is_at_home:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')

run = run_validator.validate_python(await self._client.run(self._actor_run_id).get())
if run is None:
raise RuntimeError('Actor run not found')

return {
'pricing_info': run.pricing_info,
'charged_event_counts': run.charged_event_counts or {},
'max_total_charge_usd': run.options.max_total_charge_usd or Decimal('inf'),
}

# Local development without environment variables
return {
'pricing_info': None,
'charged_event_counts': {},
'max_total_charge_usd': self._configuration.max_total_charge_usd or Decimal('inf'),
}


@dataclass
class ChargingStateItem:
Expand Down
16 changes: 16 additions & 0 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,22 @@ class Configuration(CrawleeConfiguration):
),
] = None

actor_pricing_info: Annotated[
str | None,
Field(
alias='apify_actor_pricing_info',
description='JSON string with prising info of the actor',
),
] = None

charged_event_counts: Annotated[
str | None,
Field(
alias='apify_charged_actor_event_counts',
description='Counts of events that were charged for the actor',
),
] = None

@model_validator(mode='after')
def disable_browser_sandbox_on_platform(self) -> Self:
"""Disable the browser sandbox mode when running on the Apify platform.
Expand Down
Loading