Source code for beancount_gocardless.importer

import logging
from datetime import date, timedelta
from os import path
from typing import Any, Dict, List, Optional, Tuple, cast

import beangulp
import yaml
from beancount.core import amount, data, flags
from beancount.core.number import D

from .client import GoCardlessClient, CacheOptions
from .models import AccountConfig, BankTransaction, GoCardlessConfig

logger = logging.getLogger(__name__)

__all__ = ["GoCardlessImporter", "ReferenceDuplicatesComparator"]


[docs] class ReferenceDuplicatesComparator: """Compare two Beancount transactions for duplicate detection. Two entries are considered duplicates if they share at least one common value among the specified metadata reference keys. Args: refs: Metadata keys to compare (default: ``["ref"]``). """
[docs] def __init__(self, refs: List[str] = ["ref"]) -> None: self.refs = refs
def __call__(self, entry1: data.Transaction, entry2: data.Transaction) -> bool: """Return ``True`` if the two entries share any reference value.""" entry1_refs = set() entry2_refs = set() for ref in self.refs: if ref in entry1.meta: entry1_refs.add(entry1.meta[ref]) if ref in entry2.meta: entry2_refs.add(entry2.meta[ref]) return bool(entry1_refs & entry2_refs)
[docs] class GoCardlessImporter(beangulp.Importer): """GoCardless API importer for Beancount. Fetches transactions from the GoCardless Bank Account Data API and converts them into Beancount directives. Attributes: config: Configuration loaded from YAML. _client: GoCardless API client instance. """ NARRATION_SEPARATOR: str = " " DEFAULT_METADATA_FIELDS: Dict[str, str] = { "nordref": "transactionId", "creditorName": "creditorName", "debtorName": "debtorName", "bookingDate": "bookingDate", }
[docs] def __init__(self) -> None: """Initialize the GoCardlessImporter.""" logger.debug("Initializing GoCardlessImporter") self.config: Optional[GoCardlessConfig] = None self._client: Optional[GoCardlessClient] = None
@property def client(self) -> GoCardlessClient: """Lazily initialize and return the GoCardless API client. Returns: The initialized GoCardless API client. Raises: ValueError: If config is not loaded. """ if not self._client: if not self.config: raise ValueError("Config not loaded. Call load_config() first.") self._client = GoCardlessClient( self.config.secret_id, self.config.secret_key, cache_options=cast(CacheOptions, self.config.cache_options) if self.config.cache_options else None, ) return self._client
[docs] def identify(self, filepath: str) -> bool: """Identify if the given file is a GoCardless configuration file. Args: filepath: The path to the file. Returns: True if the file is a GoCardless configuration file. """ result = path.basename(filepath).endswith("gocardless.yaml") logger.debug("Identifying file %s: %s", filepath, result) return result
[docs] def account(self, filepath: str) -> str: """Return an empty string as account (derived from config instead). Args: filepath: The path to the file (unused). Returns: An empty string. """ logger.debug("Returning account for %s: ''", filepath) return "" # We get the account from the config file
[docs] def load_config(self, filepath: str) -> Optional[GoCardlessConfig]: """Load configuration from the specified YAML file. Args: filepath: The path to the YAML configuration file. Returns: The loaded configuration. Also sets ``self.config``. """ logger.debug("Loading config from %s", filepath) with open(filepath, "r") as f: raw_config = f.read() expanded_config = path.expandvars(raw_config) self.config = GoCardlessConfig(**yaml.safe_load(expanded_config)) return self.config
[docs] def get_all_transactions( self, transactions_dict: Dict[str, List[BankTransaction]], types: List[str] ) -> List[Tuple[BankTransaction, str]]: """Combine transactions of specified types and sort them by date. Args: transactions_dict: Transactions grouped by type. types: Types to include. Returns: Sorted list of (transaction, type) tuples. """ all_transactions = [] for tx_type in types: if tx_type in transactions_dict: all_transactions.extend( [(tx, tx_type) for tx in transactions_dict[tx_type]] ) return sorted( all_transactions, key=lambda x: x[0].value_date or x[0].booking_date or "", )
[docs] def add_metadata( self, transaction: BankTransaction, custom_metadata: Dict[str, Any], account_config: Optional[AccountConfig] = None, ) -> Dict[str, Any]: """Build the metadata dict for a Beancount transaction entry. Merges default metadata fields, per-account custom fields from ``account_config``, and any ``custom_metadata`` from the YAML config. Fields listed in ``account_config.exclude_default_metadata`` are removed. This method can be overridden in subclasses to add extra metadata. Args: transaction: The source GoCardless transaction. custom_metadata: Static metadata dict from the account YAML config. account_config: Account-level configuration controlling field inclusion. Returns: A dict of metadata key-value pairs to attach to the Beancount entry. """ metakv: Dict[str, Any] = {} exclude_fields: List[str] = [] custom_fields: Dict[str, str] = {} if account_config is not None: exclude_fields = account_config.exclude_default_metadata or [] custom_fields = account_config.metadata_fields or {} # Start with defaults, merge with custom fields fields = dict(self.DEFAULT_METADATA_FIELDS) fields.update(custom_fields) # Remove excluded fields for key in exclude_fields: fields.pop(key, None) for out_key, gcl_path in fields.items(): if gcl_path is None: continue val = self._get_gcl_path(transaction, gcl_path) if val is None: continue if ( out_key == "original" and hasattr(val, "currency") and hasattr(val, "amount") ): metakv[out_key] = f"{val.currency} {val.amount}" else: metakv[out_key] = val metakv.update(custom_metadata) return metakv
[docs] def get_narration(self, transaction: BankTransaction) -> str: """Extract the narration from a transaction. This method can be overridden in subclasses to customize narration extraction. Args: transaction: The transaction data from the API. Returns: The extracted narration. """ parts = [] if transaction.remittance_information_unstructured: parts.append(transaction.remittance_information_unstructured) if transaction.remittance_information_unstructured_array: parts.append( " ".join(transaction.remittance_information_unstructured_array) ) if not parts: logger.debug( "Transaction %s has no remittance information fields; " "narration will be empty", transaction.transaction_id, ) narration = self.NARRATION_SEPARATOR.join(parts) return narration
[docs] def get_payee(self, transaction: BankTransaction) -> str: """Extract the payee from a transaction. Override in subclasses to customize payee extraction. The default implementation returns an empty string. Args: transaction: The transaction data from the API. Returns: The extracted payee string (empty by default). """ return ""
[docs] def get_transaction_date(self, transaction: BankTransaction) -> Optional[date]: """Extract the transaction date. Prefers value_date, falls back to booking_date. This method can be overridden in subclasses to customize date extraction. Args: transaction: The transaction data from the API. Returns: The extracted transaction date, or None if no date is found. """ date_str = transaction.value_date or transaction.booking_date return date.fromisoformat(date_str) if date_str else None
[docs] def get_transaction_status( self, transaction: BankTransaction, status: str, metakv: Dict[str, Any], tx_amount: amount.Amount, asset_account: str, ) -> str: """Determine the Beancount flag for a transaction. Override in subclasses to customize flag assignment. The default returns ``FLAG_OKAY`` for booked transactions and ``FLAG_WARNING`` for pending. Args: transaction: The transaction data from the API. status: Transaction status (``"booked"`` or ``"pending"``). metakv: Transaction metadata dict. tx_amount: Transaction amount. asset_account: The Beancount asset account string. Returns: A Beancount flag character. """ return flags.FLAG_OKAY if status == "booked" else flags.FLAG_WARNING
[docs] def create_transaction_entry( self, transaction: BankTransaction, status: str, asset_account: str, custom_metadata: Dict[str, Any], account_config: Optional[AccountConfig] = None, ) -> Optional[data.Transaction]: """Create a Beancount transaction entry from a GoCardless transaction. Override in subclasses for full control over entry creation. Args: transaction: The GoCardless transaction data. status: Transaction status (``"booked"`` or ``"pending"``). asset_account: The Beancount asset account string. custom_metadata: Static metadata dict from the account YAML config. account_config: Account-level configuration for metadata options. Returns: A Beancount ``Transaction`` directive, or ``None`` if the transaction has no valid date or amount. """ logger.debug( "Creating entry for transaction %s (%s)", transaction.transaction_id, status ) metakv = self.add_metadata(transaction, custom_metadata, account_config) meta = data.new_metadata("", 0, metakv) trx_date = self.get_transaction_date(transaction) if trx_date is None: logger.debug( "Skipping transaction %s with invalid date", transaction.transaction_id ) return None narration = self.get_narration(transaction) payee = self.get_payee(transaction) # Get transaction amount if transaction.transaction_amount is None: logger.debug( "Skipping transaction %s with no amount", transaction.transaction_id ) return None currency = transaction.transaction_amount.currency or ( self.config.currency if self.config else "EUR" ) tx_amount = amount.Amount( D(str(transaction.transaction_amount.amount)), currency, ) flag = self.get_transaction_status( transaction, status, metakv, tx_amount, asset_account ) return data.Transaction( meta, trx_date, flag, payee, narration, data.EMPTY_SET, data.EMPTY_SET, [ data.Posting( asset_account, tx_amount, None, None, None, None, ), ], )
[docs] def extract( self, filepath: str, existing: data.Entries = None, # type: ignore[assignment] ) -> data.Entries: """Extract Beancount entries from GoCardless transactions. Duplicate detection is handled by the beangulp base class using :attr:`cmp`. Args: filepath: The path to the YAML configuration file. existing_entries: Previously extracted entries (used by the base class). Returns: A list of Beancount transaction entries. """ logger.info("Starting extraction from %s", filepath) self.load_config(filepath) if not self.config: raise ValueError("No config loaded from YAML file") entries: data.Entries = [] accounts = self.config.accounts total_transactions = 0 logger.info("Processing %d accounts", len(accounts)) for account in accounts: account_id = account.id asset_account = account.asset_account custom_metadata = account.metadata days_back = getattr(account, "days_back", 180) logger.debug("Fetching transactions for account %s", account_id) account_transactions = self.client.get_account_transactions( account_id, days_back=days_back ) transactions_dict = account_transactions.transactions all_transactions = self.get_all_transactions( transactions_dict, account.transaction_types ) booked_count = len(transactions_dict.get("booked", [])) pending_count = len(transactions_dict.get("pending", [])) logger.debug( "Fetched %d booked and %d pending transactions for account %s", booked_count, pending_count, account_id, ) total_transactions += sum( len(transactions_dict.get(t, [])) for t in account.transaction_types ) skipped = 0 for transaction, status in all_transactions: entry = self.create_transaction_entry( transaction, status, asset_account, custom_metadata, account ) if entry is not None: entries.append(entry) else: skipped += 1 if skipped > 0: logger.warning( "Skipped %d invalid transactions for account %s", skipped, account_id, ) # Add balance assertion at the end of the account's transactions balances = self.client.get_account_balances(account_id) logger.debug( "Available balances for account %s: %s", account_id, [ (b.balance_type, b.balance_amount.amount, b.balance_amount.currency) for b in balances.balances ], ) # Prioritized balance selection priority = { "expected": 0, "closingBooked": 1, "interimBooked": 2, "interimAvailable": 3, "openingBooked": 4, } if account.preferred_balance_type: priority[account.preferred_balance_type] = -1 # Sort balances based on priority, with unknown types at the end sorted_balances = sorted( balances.balances, key=lambda b: priority.get(b.balance_type, 99) ) if sorted_balances: selected_balance = sorted_balances[0] currency = selected_balance.balance_amount.currency assert currency is not None, "Currency should not be None" balance_amount = amount.Amount( D(str(selected_balance.balance_amount.amount)), currency, ) # Determine balance date if selected_balance.reference_date: try: balance_date = date.fromisoformat( selected_balance.reference_date ) + timedelta(days=1) except ValueError: balance_date = date.today() + timedelta(days=1) else: balance_date = date.today() + timedelta(days=1) balance_meta = {} # Collect all distinct balance values for metadata distinct_details = [] seen_values = set() for b in sorted_balances: val_str = f"{b.balance_amount.amount} {b.balance_amount.currency}" if val_str not in seen_values: distinct_details.append(f"{b.balance_type}: {val_str}") seen_values.add(val_str) balance_meta["detail"] = " / ".join(distinct_details) # Include custom metadata from config for consistency with transactions balance_meta.update(custom_metadata) meta = data.new_metadata("", 0, balance_meta) balance_entry = data.Balance( meta=meta, date=balance_date, account=asset_account, amount=balance_amount, tolerance=None, diff_amount=None, ) entries.append(balance_entry) logger.debug( "Added balance assertion for account %s using %s balance: %s %s", account_id, selected_balance.balance_type, balance_amount, balance_date, ) logger.info( "Processed %d total transactions across %d accounts, created %d entries", total_transactions, len(accounts), len(entries), ) return entries
def _get_gcl_path(self, root: Any, dotted: str) -> Any: """Resolve a dotted path against a nested object/dict structure. Supports traversal of Pydantic models (by field name or alias), plain dicts, and lists (by numeric index). Args: root: The root object to traverse. dotted: A dot-separated path string (e.g. ``"creditorAccount.iban"``). Returns: The resolved value, or ``None`` if any segment cannot be resolved or the final value is a dict/list. """ cur: Any = root for seg in dotted.split("."): if cur is None: return None if isinstance(cur, list): if not seg.isdigit(): return None idx = int(seg) if idx >= len(cur): return None cur = cur[idx] continue if isinstance(cur, dict): cur = cur.get(seg) continue if hasattr(cur, seg): cur = getattr(cur, seg) continue if hasattr(type(cur), "model_fields"): model_fields = type(cur).model_fields name = next( (n for n, f in model_fields.items() if f.alias == seg), None ) if name and hasattr(cur, name): cur = getattr(cur, name) continue return None if isinstance(cur, (dict, list)): return None return cur cmp = ReferenceDuplicatesComparator(["nordref"])