Powerful runtime functions for data exchange, state management, and workflow control across Python, Node.js, and Bash scripts.
Cronium provides a consistent set of runtime helper functions that are automatically available in all scripts. These functions enable powerful workflow capabilities including data exchange, persistent storage, conditional logic, and metadata access.
cronium.input() and cronium.output() for seamless data exchange between workflow nodes
cronium.getVariable() and cronium.setVariable() for persistent user data storage
cronium.setCondition() and cronium.getCondition() for workflow routing control
cronium.event() for accessing event information, execution stats, and server details
cronium object available globallycronium object available globallycronium_input() availableExchange data between workflow nodes and external systems using cronium.input() and cronium.output(). Input data can come from API calls, previous workflow nodes, or manual triggers.
# cronium is automatically available - no imports needed!
# Access input data
input_data = cronium.input()
print(f"Received: {input_data}")
# Access specific fields with defaults
user_id = input_data.get('user_id', 'unknown')
message = input_data.get('message', 'Hello World')
config = input_data.get('config', {})
# Use input in processing
if message:
print(f"Processing message: {message}")# cronium is automatically available - no imports needed!
# Create structured output
result = {
"success": True,
"message": "Data processed successfully",
"data": {
"processed_items": 42,
"total_time": "2.5s",
"errors": []
},
"next_action": "send_notification"
}
# Set output for next workflow node
cronium.output(result)
# The output will be available as input to the next connected node"input": {...} in request bodyStore and retrieve persistent data across script executions using cronium.getVariable() and cronium.setVariable(). Variables are scoped per user and persist across all events and workflows.
# cronium is automatically available - no imports needed!
# Get a stored variable
example_api_key = cronium.getVariable('EXAMPLE_API_KEY')
database_url = cronium.getVariable('DATABASE_URL')
# Use variables with defaults
redis_host = cronium.getVariable('REDIS_HOST') or 'localhost'
# Set/update variables
cronium.setVariable('LAST_PROCESSED', '2025-06-17T10:30:00Z')
cronium.setVariable('COUNTER', str(int(cronium.getVariable('COUNTER') or '0') + 1))
# Variables persist across all script executions for the user
print(f"Processing count: {cronium.getVariable('COUNTER')}")Control workflow routing using cronium.setCondition() and cronium.getCondition(). Set boolean conditions to determine which workflow paths are executed based on runtime logic.
# cronium is automatically available - no imports needed!
# Process some data
input_data = cronium.input()
threshold = input_data.get('threshold', 100)
current_value = process_data()
# Set condition based on processing result
if current_value > threshold:
print(f"Value {current_value} exceeds threshold {threshold}")
cronium.setCondition(True) # Trigger "On Condition" connections
else:
print(f"Value {current_value} is within threshold")
cronium.setCondition(False) # Don't trigger "On Condition" connections
# Check existing condition (useful for complex logic)
existing_condition = cronium.getCondition()
if existing_condition:
print("Condition was previously set to True")cronium.setCondition(true) was calledAccess comprehensive information about the current event execution using cronium.event(). This includes event details, execution statistics, and server information.
# cronium is automatically available - no imports needed!
# Access event metadata
event = cronium.event()
print(f"Event ID: {event.get('id')}")
print(f"Event Name: {event.get('name')}")
print(f"Script Type: {event.get('type')}")
print(f"Run Location: {event.get('runLocation')}")
# Check execution statistics
print(f"Success Count: {event.get('successCount', 0)}")
print(f"Failure Count: {event.get('failureCount', 0)}")
# Server information (for remote execution)
server = event.get('server')
if server:
print(f"Running on server: {server.get('name')} ({server.get('address')})")
else:
print("Running locally")
# Use metadata for conditional logic
if event.get('type') == 'PYTHON':
import json
print("Running in Python environment")Comprehensive examples demonstrating how to combine all runtime helpers for powerful workflow automation.
# cronium is automatically available - no imports needed!
# Complete workflow example
def main():
# Get input data
input_data = cronium.input()
event = cronium.event()
print(f"Processing event: {event.get('name')}")
# Get configuration from variables
max_retries = int(cronium.getVariable('MAX_RETRIES') or '3')
api_endpoint = cronium.getVariable('API_ENDPOINT')
# Process data
items = input_data.get('items', [])
processed_items = []
errors = []
for item in items:
try:
result = process_item(item, api_endpoint)
processed_items.append(result)
except Exception as e:
errors.append(str(e))
# Update processing stats
total_processed = int(cronium.getVariable('TOTAL_PROCESSED') or '0')
cronium.setVariable('TOTAL_PROCESSED', str(total_processed + len(processed_items)))
# Set condition for workflow routing
success_rate = len(processed_items) / len(items) if items else 1
cronium.setCondition(success_rate > 0.8) # 80% success threshold
# Set output for next workflow node
result = {
"success": len(errors) == 0,
"processed_count": len(processed_items),
"error_count": len(errors),
"success_rate": success_rate,
"processed_items": processed_items,
"errors": errors
}
cronium.output(result)
print(f"Processed {len(processed_items)} items with {len(errors)} errors")
if __name__ == "__main__":
main()Always provide defaults when accessing input fields to handle missing data gracefully.
Use consistent JSON structures with success flags, data, and error information.
Include error details in output for debugging and monitoring purposes.
Use UPPERCASE for configuration variables, lowercase for runtime state.
Remember variables are stored as strings - convert to needed types when reading.
Always provide fallback values when reading variables that might not exist.
Use meaningful conditions that reflect business logic, not just technical success/failure.
Keep each event focused on one task, use output to pass data to specialized nodes.
Use standard logging methods while leveraging cronium functions for data flow.
Don't overuse variables for temporary data - use output for workflow data passing.
Use getCondition() to build complex logic without unnecessary processing.
Cache event() calls if you need to access metadata multiple times in a script.