or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auth.mdbom.mdconfig.mdfilestore.mdindex.mdinference.mdmetadata.mdpolicy.md

auth.mddocs/

0

# Authentication and JWT Handling

1

2

Enterprise-grade security framework with JWT token management, Java keystore integration, and configurable authentication providers. The authentication system supports both token validation and generation capabilities with policy decision point integration for comprehensive authorization workflows.

3

4

## Capabilities

5

6

### Authentication Configuration

7

8

Comprehensive configuration management for authentication and authorization systems including public key paths, Java keystore settings, and policy decision point server integration.

9

10

```python { .api }

11

class AuthConfig:

12

"""

13

Configurations for authentication and authorization.

14

"""

15

16

def __init__(self) -> None:

17

"""Initialize with auth.properties"""

18

...

19

20

def public_key_path(self) -> str:

21

"""Returns path of the public key"""

22

...

23

24

def jks_path(self) -> str:

25

"""Returns path of the keystore"""

26

...

27

28

def jks_password(self) -> str:

29

"""Returns keystore password"""

30

...

31

32

def jks_key_alias(self) -> str:

33

"""Returns key alias in keystore"""

34

...

35

36

def pdp_host_url(self) -> str:

37

"""Returns host URL for policy decision point server"""

38

...

39

40

def is_authorization_enabled(self) -> bool:

41

"""Returns whether authorization is enabled"""

42

...

43

```

44

45

### JWT Token Utilities

46

47

Comprehensive JWT token management utilities for parsing, validation, and generation with Java keystore integration for enterprise security requirements.

48

49

```python { .api }

50

class JsonWebTokenUtil:

51

"""

52

Utility for parsing JWT for authentication.

53

"""

54

55

def __init__(self) -> None:

56

"""Constructor"""

57

...

58

59

def parse_token(self, token: str):

60

"""

61

Parses JWT token using public key from auth.properties.

62

63

Parameters:

64

- token: str - JWT token string to parse

65

66

Returns:

67

Parsed token payload

68

"""

69

...

70

71

def create_token(self):

72

"""

73

Convenience method to generate valid token with hardcoded subject.

74

75

Returns:

76

Generated JWT token string

77

"""

78

...

79

80

def validate_token(self, token: str) -> None:

81

"""

82

Validates token and raises AissembleSecurityException on failure.

83

84

Parameters:

85

- token: str - JWT token to validate

86

87

Raises:

88

AissembleSecurityException - If token validation fails

89

"""

90

...

91

92

def get_sign_key(self) -> str:

93

"""

94

Retrieves signing key from Java keystore.

95

96

Returns:

97

str - Signing key for token generation

98

"""

99

...

100

```

101

102

### Security Exception Handling

103

104

Custom exception class for handling aiSSEMBLE security-related errors with specific error contexts and appropriate error handling patterns.

105

106

```python { .api }

107

class AissembleSecurityException(Exception):

108

"""

109

Custom exception for aiSSEMBLE security errors.

110

"""

111

pass

112

```

113

114

## Usage Examples

115

116

### Basic JWT Token Validation

117

118

```python

119

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException

120

from aissembleauth.auth_config import AuthConfig

121

122

# Initialize JWT utility

123

jwt_util = JsonWebTokenUtil()

124

125

def authenticate_request(authorization_header: str) -> dict:

126

"""Authenticate incoming request using JWT token"""

127

128

if not authorization_header or not authorization_header.startswith("Bearer "):

129

raise AissembleSecurityException("Missing or invalid authorization header")

130

131

# Extract token from Bearer header

132

token = authorization_header[7:] # Remove "Bearer " prefix

133

134

try:

135

# Validate token

136

jwt_util.validate_token(token)

137

138

# Parse token for user information

139

parsed_token = jwt_util.parse_token(token)

140

141

print(f"Authentication successful for user: {parsed_token.get('sub', 'unknown')}")

142

return parsed_token

143

144

except AissembleSecurityException as e:

145

print(f"Authentication failed: {e}")

146

raise

147

except Exception as e:

148

print(f"Unexpected authentication error: {e}")

149

raise AissembleSecurityException(f"Token validation failed: {e}")

150

151

# Usage example

152

try:

153

auth_header = "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."

154

user_info = authenticate_request(auth_header)

155

print(f"Authenticated user: {user_info}")

156

except AissembleSecurityException as e:

157

print(f"Access denied: {e}")

158

```

159

160

### JWT Token Generation and Management

161

162

```python

163

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException

164

from aissembleauth.auth_config import AuthConfig

165

from datetime import datetime, timedelta

166

import jwt

167

168

class TokenManager:

169

"""Comprehensive token management for authentication workflows"""

170

171

def __init__(self):

172

self.jwt_util = JsonWebTokenUtil()

173

self.auth_config = AuthConfig()

174

175

def generate_user_token(self, user_id: str, roles: list, expires_in_hours: int = 24) -> str:

176

"""Generate JWT token for user with roles and expiration"""

177

try:

178

# Get signing key from keystore

179

signing_key = self.jwt_util.get_sign_key()

180

181

# Create token payload

182

now = datetime.utcnow()

183

payload = {

184

"sub": user_id,

185

"roles": roles,

186

"iat": now,

187

"exp": now + timedelta(hours=expires_in_hours),

188

"iss": "aissemble-auth-service",

189

"aud": "aissemble-platform"

190

}

191

192

# Generate token using RS256 algorithm

193

token = jwt.encode(payload, signing_key, algorithm="RS256")

194

195

print(f"Generated token for user {user_id} with roles: {roles}")

196

return token

197

198

except Exception as e:

199

raise AissembleSecurityException(f"Token generation failed: {e}")

200

201

def refresh_token(self, existing_token: str) -> str:

202

"""Refresh existing valid token with new expiration"""

203

try:

204

# Validate existing token

205

self.jwt_util.validate_token(existing_token)

206

207

# Parse existing token

208

parsed = self.jwt_util.parse_token(existing_token)

209

210

# Generate new token with same claims but new expiration

211

return self.generate_user_token(

212

user_id=parsed.get("sub"),

213

roles=parsed.get("roles", []),

214

expires_in_hours=24

215

)

216

217

except Exception as e:

218

raise AissembleSecurityException(f"Token refresh failed: {e}")

219

220

def extract_user_context(self, token: str) -> dict:

221

"""Extract user context from valid token"""

222

try:

223

self.jwt_util.validate_token(token)

224

parsed = self.jwt_util.parse_token(token)

225

226

return {

227

"user_id": parsed.get("sub"),

228

"roles": parsed.get("roles", []),

229

"issued_at": parsed.get("iat"),

230

"expires_at": parsed.get("exp"),

231

"issuer": parsed.get("iss")

232

}

233

234

except Exception as e:

235

raise AissembleSecurityException(f"Context extraction failed: {e}")

236

237

def is_token_expired(self, token: str) -> bool:

238

"""Check if token is expired without full validation"""

239

try:

240

parsed = jwt.decode(token, options={"verify_signature": False})

241

exp_timestamp = parsed.get("exp")

242

243

if exp_timestamp:

244

return datetime.utcnow() > datetime.fromtimestamp(exp_timestamp)

245

return True

246

247

except Exception:

248

return True

249

250

# Usage example

251

token_manager = TokenManager()

252

253

# Generate token for user

254

user_token = token_manager.generate_user_token(

255

user_id="john.doe@company.com",

256

roles=["data_scientist", "ml_engineer"],

257

expires_in_hours=8

258

)

259

260

# Extract user context

261

user_context = token_manager.extract_user_context(user_token)

262

print(f"User context: {user_context}")

263

264

# Check token expiration

265

is_expired = token_manager.is_token_expired(user_token)

266

print(f"Token expired: {is_expired}")

267

268

# Refresh token if needed

269

if not is_expired:

270

refreshed_token = token_manager.refresh_token(user_token)

271

print("Token refreshed successfully")

272

```

273

274

### Role-Based Access Control Integration

275

276

```python

277

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException

278

from aissembleauth.auth_config import AuthConfig

279

from functools import wraps

280

from typing import List

281

282

class RoleBasedAccessControl:

283

"""Role-based access control using JWT tokens"""

284

285

def __init__(self):

286

self.jwt_util = JsonWebTokenUtil()

287

self.auth_config = AuthConfig()

288

289

def require_roles(self, required_roles: List[str]):

290

"""Decorator for enforcing role-based access control"""

291

def decorator(func):

292

@wraps(func)

293

def wrapper(*args, **kwargs):

294

# Extract token from request context (simplified)

295

token = kwargs.get('auth_token') or args[0] if args else None

296

297

if not token:

298

raise AissembleSecurityException("Authentication token required")

299

300

try:

301

# Validate and parse token

302

self.jwt_util.validate_token(token)

303

parsed_token = self.jwt_util.parse_token(token)

304

305

# Extract user roles

306

user_roles = parsed_token.get("roles", [])

307

308

# Check if user has required roles

309

if not any(role in user_roles for role in required_roles):

310

raise AissembleSecurityException(

311

f"Access denied. Required roles: {required_roles}, User roles: {user_roles}"

312

)

313

314

print(f"Access granted for user: {parsed_token.get('sub')}")

315

return func(*args, **kwargs)

316

317

except AissembleSecurityException:

318

raise

319

except Exception as e:

320

raise AissembleSecurityException(f"Authorization failed: {e}")

321

322

return wrapper

323

return decorator

324

325

def check_permissions(self, token: str, resource: str, action: str) -> bool:

326

"""Check if user has permissions for specific resource and action"""

327

try:

328

self.jwt_util.validate_token(token)

329

parsed_token = self.jwt_util.parse_token(token)

330

331

user_roles = parsed_token.get("roles", [])

332

333

# Define role-based permissions (in practice, this would be configurable)

334

permissions = {

335

"admin": {"*": ["*"]}, # Admin has all permissions

336

"data_scientist": {

337

"datasets": ["read", "analyze"],

338

"models": ["read", "train", "evaluate"]

339

},

340

"ml_engineer": {

341

"models": ["read", "deploy", "monitor"],

342

"infrastructure": ["read", "configure"]

343

},

344

"viewer": {

345

"*": ["read"]

346

}

347

}

348

349

# Check permissions

350

for role in user_roles:

351

role_perms = permissions.get(role, {})

352

353

# Check wildcard permissions

354

if "*" in role_perms and ("*" in role_perms["*"] or action in role_perms["*"]):

355

return True

356

357

# Check specific resource permissions

358

if resource in role_perms and (action in role_perms[resource] or "*" in role_perms[resource]):

359

return True

360

361

return False

362

363

except Exception as e:

364

print(f"Permission check failed: {e}")

365

return False

366

367

# Usage examples

368

rbac = RoleBasedAccessControl()

369

370

# Decorator usage

371

@rbac.require_roles(["data_scientist", "ml_engineer"])

372

def train_model(auth_token: str, model_config: dict):

373

"""Function that requires data scientist or ML engineer role"""

374

print("Training model with configuration:", model_config)

375

return {"status": "training_started"}

376

377

@rbac.require_roles(["admin"])

378

def delete_dataset(auth_token: str, dataset_id: str):

379

"""Function that requires admin role"""

380

print(f"Deleting dataset: {dataset_id}")

381

return {"status": "deleted"}

382

383

# Direct permission checking

384

def access_resource(token: str, resource: str, action: str):

385

"""Check access to specific resource and action"""

386

if rbac.check_permissions(token, resource, action):

387

print(f"Access granted to {action} on {resource}")

388

return True

389

else:

390

print(f"Access denied to {action} on {resource}")

391

return False

392

393

# Example usage

394

user_token = "eyJhbGciOiJSUzI1NiJ9..." # Valid JWT token

395

396

try:

397

# Use decorated functions

398

result = train_model(auth_token=user_token, model_config={"algorithm": "RandomForest"})

399

print("Training result:", result)

400

401

# Check specific permissions

402

can_read_datasets = access_resource(user_token, "datasets", "read")

403

can_deploy_models = access_resource(user_token, "models", "deploy")

404

405

except AissembleSecurityException as e:

406

print(f"Security error: {e}")

407

```

408

409

### Configuration-Driven Security Setup

410

411

```python

412

from aissembleauth.auth_config import AuthConfig

413

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException

414

import os

415

416

class SecurityConfigurationManager:

417

"""Manage security configuration across different environments"""

418

419

def __init__(self):

420

self.auth_config = AuthConfig()

421

self.jwt_util = JsonWebTokenUtil()

422

423

def validate_configuration(self) -> dict:

424

"""Validate authentication configuration"""

425

config_status = {

426

"public_key_accessible": False,

427

"keystore_accessible": False,

428

"authorization_enabled": False,

429

"pdp_configured": False,

430

"configuration_valid": False

431

}

432

433

try:

434

# Check public key

435

public_key_path = self.auth_config.public_key_path()

436

if public_key_path and os.path.exists(public_key_path):

437

config_status["public_key_accessible"] = True

438

439

# Check keystore

440

jks_path = self.auth_config.jks_path()

441

if jks_path and os.path.exists(jks_path):

442

config_status["keystore_accessible"] = True

443

444

# Check authorization

445

config_status["authorization_enabled"] = self.auth_config.is_authorization_enabled()

446

447

# Check PDP configuration

448

pdp_url = self.auth_config.pdp_host_url()

449

if pdp_url:

450

config_status["pdp_configured"] = True

451

452

# Overall status

453

config_status["configuration_valid"] = (

454

config_status["public_key_accessible"] and

455

config_status["keystore_accessible"]

456

)

457

458

except Exception as e:

459

print(f"Configuration validation error: {e}")

460

461

return config_status

462

463

def get_security_summary(self) -> dict:

464

"""Get comprehensive security configuration summary"""

465

return {

466

"public_key_path": self.auth_config.public_key_path(),

467

"keystore_path": self.auth_config.jks_path(),

468

"keystore_alias": self.auth_config.jks_key_alias(),

469

"authorization_enabled": self.auth_config.is_authorization_enabled(),

470

"pdp_host": self.auth_config.pdp_host_url(),

471

"configuration_status": self.validate_configuration()

472

}

473

474

def test_jwt_operations(self) -> dict:

475

"""Test JWT token operations"""

476

test_results = {

477

"token_generation": False,

478

"token_validation": False,

479

"token_parsing": False,

480

"signing_key_access": False

481

}

482

483

try:

484

# Test signing key access

485

signing_key = self.jwt_util.get_sign_key()

486

if signing_key:

487

test_results["signing_key_access"] = True

488

489

# Test token generation

490

test_token = self.jwt_util.create_token()

491

if test_token:

492

test_results["token_generation"] = True

493

494

# Test token validation

495

try:

496

self.jwt_util.validate_token(test_token)

497

test_results["token_validation"] = True

498

except AissembleSecurityException:

499

pass

500

501

# Test token parsing

502

try:

503

parsed = self.jwt_util.parse_token(test_token)

504

if parsed:

505

test_results["token_parsing"] = True

506

except Exception:

507

pass

508

509

except Exception as e:

510

print(f"JWT operations test error: {e}")

511

512

return test_results

513

514

def initialize_security_system(self) -> bool:

515

"""Initialize and validate entire security system"""

516

print("Initializing aiSSEMBLE security system...")

517

518

# Validate configuration

519

config_status = self.validate_configuration()

520

if not config_status["configuration_valid"]:

521

print("Security configuration validation failed")

522

return False

523

524

# Test JWT operations

525

jwt_status = self.test_jwt_operations()

526

if not all(jwt_status.values()):

527

print("JWT operations test failed")

528

return False

529

530

print("Security system initialized successfully")

531

return True

532

533

# Usage example

534

security_manager = SecurityConfigurationManager()

535

536

# Get security summary

537

summary = security_manager.get_security_summary()

538

print("Security Configuration Summary:")

539

for key, value in summary.items():

540

print(f" {key}: {value}")

541

542

# Initialize security system

543

if security_manager.initialize_security_system():

544

print("Ready to handle authentication requests")

545

else:

546

print("Security system initialization failed")

547

548

# Validate configuration in different environments

549

config_validation = security_manager.validate_configuration()

550

print(f"Configuration validation: {config_validation}")

551

552

# Test JWT operations

553

jwt_tests = security_manager.test_jwt_operations()

554

print(f"JWT operations test: {jwt_tests}")

555

```

556

557

### Policy Decision Point Integration

558

559

```python

560

from aissembleauth.auth_config import AuthConfig

561

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException

562

import requests

563

import json

564

565

class PolicyDecisionPointClient:

566

"""Client for integrating with Policy Decision Point (PDP) server"""

567

568

def __init__(self):

569

self.auth_config = AuthConfig()

570

self.jwt_util = JsonWebTokenUtil()

571

self.pdp_url = self.auth_config.pdp_host_url()

572

573

def authorize_action(self, token: str, resource: str, action: str, context: dict = None) -> bool:

574

"""Check authorization through PDP server"""

575

if not self.auth_config.is_authorization_enabled():

576

print("Authorization is disabled, allowing access")

577

return True

578

579

try:

580

# Validate JWT token first

581

self.jwt_util.validate_token(token)

582

user_info = self.jwt_util.parse_token(token)

583

584

# Prepare authorization request

585

auth_request = {

586

"subject": user_info.get("sub"),

587

"resource": resource,

588

"action": action,

589

"context": context or {},

590

"roles": user_info.get("roles", [])

591

}

592

593

# Send request to PDP

594

response = requests.post(

595

f"{self.pdp_url}/authorize",

596

json=auth_request,

597

headers={"Content-Type": "application/json"},

598

timeout=5

599

)

600

601

if response.status_code == 200:

602

decision = response.json()

603

return decision.get("permit", False)

604

else:

605

print(f"PDP error: {response.status_code}")

606

return False

607

608

except AissembleSecurityException:

609

print("Token validation failed")

610

return False

611

except Exception as e:

612

print(f"Authorization check failed: {e}")

613

return False

614

615

def get_user_permissions(self, token: str) -> list:

616

"""Get all permissions for authenticated user"""

617

try:

618

self.jwt_util.validate_token(token)

619

user_info = self.jwt_util.parse_token(token)

620

621

# Request user permissions from PDP

622

permission_request = {

623

"subject": user_info.get("sub"),

624

"roles": user_info.get("roles", [])

625

}

626

627

response = requests.post(

628

f"{self.pdp_url}/permissions",

629

json=permission_request,

630

headers={"Content-Type": "application/json"},

631

timeout=5

632

)

633

634

if response.status_code == 200:

635

return response.json().get("permissions", [])

636

else:

637

return []

638

639

except Exception as e:

640

print(f"Permission retrieval failed: {e}")

641

return []

642

643

class SecureAPIEndpoint:

644

"""Example of secure API endpoint with PDP integration"""

645

646

def __init__(self):

647

self.pdp_client = PolicyDecisionPointClient()

648

649

def secure_endpoint(self, token: str, resource_id: str, action: str, **kwargs):

650

"""Generic secure endpoint with PDP authorization"""

651

652

# Check authorization through PDP

653

if not self.pdp_client.authorize_action(

654

token=token,

655

resource=f"api/resource/{resource_id}",

656

action=action,

657

context={

658

"ip_address": kwargs.get("client_ip"),

659

"user_agent": kwargs.get("user_agent"),

660

"timestamp": kwargs.get("timestamp")

661

}

662

):

663

raise AissembleSecurityException(f"Access denied for {action} on resource {resource_id}")

664

665

# Proceed with authorized action

666

return f"Successfully executed {action} on resource {resource_id}"

667

668

# Usage example

669

pdp_client = PolicyDecisionPointClient()

670

secure_api = SecureAPIEndpoint()

671

672

# Example authorization check

673

user_token = "eyJhbGciOiJSUzI1NiJ9..." # Valid JWT token

674

675

try:

676

# Check specific authorization

677

can_access = pdp_client.authorize_action(

678

token=user_token,

679

resource="ml_models/fraud_detection",

680

action="deploy",

681

context={"environment": "production", "approval_id": "DEPLOY-001"}

682

)

683

684

if can_access:

685

print("User authorized to deploy model")

686

else:

687

print("User not authorized to deploy model")

688

689

# Get user permissions

690

permissions = pdp_client.get_user_permissions(user_token)

691

print(f"User permissions: {permissions}")

692

693

# Use secure endpoint

694

result = secure_api.secure_endpoint(

695

token=user_token,

696

resource_id="dataset_001",

697

action="read",

698

client_ip="192.168.1.100"

699

)

700

print(f"API result: {result}")

701

702

except AissembleSecurityException as e:

703

print(f"Security error: {e}")

704

```

705

706

## Best Practices

707

708

### Token Management

709

- Use appropriate token expiration times based on security requirements

710

- Implement token refresh mechanisms for long-running sessions

711

- Store tokens securely on client side (HttpOnly cookies, secure storage)

712

- Implement proper token revocation mechanisms

713

714

### Security Configuration

715

- Use strong cryptographic keys for token signing

716

- Regularly rotate signing keys and certificates

717

- Implement proper keystore security and access controls

718

- Use environment-specific configuration management

719

720

### Authorization Patterns

721

- Implement role-based access control consistently

722

- Use Policy Decision Points for complex authorization logic

723

- Audit and log all authentication and authorization events

724

- Implement proper error handling without information leakage

725

726

### Production Considerations

727

- Use HTTPS for all authentication endpoints

728

- Implement rate limiting for authentication attempts

729

- Monitor for suspicious authentication patterns

730

- Regular security audits and penetration testing