Examples
Code examples can be found in the examples directory on GitHub
Also take a look at the unit tests in the tests directory.
Get all calling users
Source: calling_users.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org
5"""
6
7from dotenv import load_dotenv
8
9from wxc_sdk import WebexSimpleApi
10
11load_dotenv(override=True)
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))
Get all calling users (async variant)
Source: calling_users_async.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org using the (experimental) async API
5"""
6import asyncio
7import time
8
9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv(override=True)
14
15
16async def get_calling_users():
17 """
18 Get details of all calling enabled users by:
19 1) getting all calling users
20 2) collecting all users that have a calling license
21 3) getting details for all users
22 """
23 async with AsWebexSimpleApi(concurrent_requests=40) as api:
24 print('Collecting calling licenses')
25 calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26 if lic.webex_calling)
27
28 # get users with a calling license
29 calling_users = [user async for user in api.people.list_gen()
30 if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31 print(f'{len(calling_users)} users:')
32 print('\n'.join(user.display_name for user in calling_users))
33
34 # get details for all users
35 start = time.perf_counter()
36 details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37 for user in calling_users])
38 expired = time.perf_counter() - start
39 print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42if __name__ == '__main__':
43 asyncio.run(get_calling_users())
Get all users without phones
Source: users_wo_devices.py
1#!/usr/bin/env python
2"""
3Get calling users without devices
4"""
5import asyncio
6import logging
7import os
8from itertools import chain
9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import Tokens
14from wxc_sdk.as_api import AsWebexSimpleApi
15from wxc_sdk.common import UserType
16from wxc_sdk.integration import Integration
17from wxc_sdk.person_settings import DeviceList
18from wxc_sdk.scopes import parse_scopes
19
20
21def env_path() -> str:
22 """
23 determine path for .env to load environment variables from; based on name of this file
24 :return: .env file path
25 """
26 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
27
28
29def yml_path() -> str:
30 """
31 determine path of YML file to persist tokens
32 :return: path to YML file
33 :rtype: str
34 """
35 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
36
37
38def build_integration() -> Integration:
39 """
40 read integration parameters from environment variables and create an integration
41 :return: :class:`wxc_sdk.integration.Integration` instance
42 """
43 client_id = os.getenv('INTEGRATION_CLIENT_ID')
44 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
45 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
46 redirect_url = 'http://localhost:6001/redirect'
47 if not all((client_id, client_secret, scopes)):
48 raise ValueError('failed to get integration parameters from environment')
49 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
50 redirect_url=redirect_url)
51
52
53def get_tokens() -> Optional[Tokens]:
54 """
55 Tokens are read from a YML file. If needed an OAuth flow is initiated.
56
57 :return: tokens
58 :rtype: :class:`wxc_sdk.tokens.Tokens`
59 """
60
61 integration = build_integration()
62 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
63 return tokens
64
65
66async def main():
67 # get environment variables from .env; required for integration parameters
68 load_dotenv(env_path())
69
70 # get tokens from cache or create a new set of tokens using the integration defined in .env
71 tokens = get_tokens()
72
73 async with AsWebexSimpleApi(tokens=tokens) as api:
74 # get calling users
75 calling_users = [user for user in await api.people.list(calling_data=True)
76 if user.location_id]
77
78 # get device info for all users
79 user_device_infos = await asyncio.gather(*[api.person_settings.devices(person_id=user.person_id)
80 for user in calling_users])
81 user_device_infos: list[DeviceList]
82 users_wo_devices = [user for user, device_info in zip(calling_users, user_device_infos)
83 if not device_info.devices]
84
85 # alternatively we can collect all device owners
86 device_owner_ids = set(owner.owner_id
87 for device in chain.from_iterable(udi.devices for udi in user_device_infos)
88 if (owner := device.owner) and owner.owner_type == UserType.people)
89
90 # ... and collect all other users (the ones not owning a device)
91 users_not_owning_a_device = [user for user in calling_users
92 if user.person_id not in device_owner_ids]
93
94 users_wo_devices.sort(key=lambda u: u.display_name)
95 print(f'{len(users_wo_devices)} users w/o devices:')
96 print('\n'.join(f'{user.display_name} ({user.emails[0]})'
97 for user in users_wo_devices))
98
99 print()
100 users_not_owning_a_device.sort(key=lambda u: u.display_name)
101 print(f'{len(users_not_owning_a_device)} users not owning a device:')
102 print('\n'.join(f'{user.display_name} ({user.emails[0]})'
103 for user in users_not_owning_a_device))
104
105
106if __name__ == '__main__':
107 # enable DEBUG logging to a file; REST log shows all requests
108 logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
109 filemode='w', level=logging.DEBUG, format='%(asctime)s %(threadName)s %(message)s')
110 asyncio.run(main())
Default call forwarding settings for all users
This example start with the list of all calling users and then calls
wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure() for each user. To speed up things a
ThreadPoolExecutor is used and all update operations are scheduled for execution by the thread pool.
Source: reset_call_forwarding.py
1#!/usr/bin/env python
2"""
3Example script
4Reset call forwarding to default for all users in the org
5"""
6import asyncio
7import logging
8import time
9
10from dotenv import load_dotenv
11
12from wxc_sdk.all_types import PersonForwardingSetting
13from wxc_sdk.as_api import AsWebexSimpleApi
14
15
16async def main():
17 # load environment. SDk fetches the access token from WEBEX_ACCESS_TOKEN environment variable
18 load_dotenv(override=True)
19
20 logging.basicConfig(level=logging.INFO)
21
22 # set to DEBUG to see the actual requests
23 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
24
25 async with AsWebexSimpleApi() as api:
26
27 # get all calling users
28 start = time.perf_counter_ns()
29 calling_users = [user for user in await api.people.list(calling_data=True)
30 if user.location_id]
31 print(f'Got {len(calling_users)} calling users in '
32 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
33
34 # set call forwarding to default for all users
35 # default call forwarding settings
36 forwarding = PersonForwardingSetting.default()
37
38 # schedule update for each user and wait for completion
39 start = time.perf_counter_ns()
40 await asyncio.gather(*[api.person_settings.forwarding.configure(entity_id=user.person_id,
41 forwarding=forwarding)
42 for user in calling_users])
43 print(f'Reset call forwarding to default for {len(calling_users)} users in '
44 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
45
46if __name__ == '__main__':
47 asyncio.run(main())
Modify number of rings configuration for users read from CSV
Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.
For all these users a VM setting is updated and the results are written to a CSV for further processing.
Source: modify_voicemail.py
1#!/usr/bin/env python
2"""
3Example Script
4Modifying number of rings configuration in voicemail settings
5Run -> python3 modify_voicemail.py modify_voicemail.csv
6"""
7
8import csv
9import sys
10import traceback
11from concurrent.futures import ThreadPoolExecutor
12
13from dotenv import load_dotenv
14
15from wxc_sdk import WebexSimpleApi
16from wxc_sdk.all_types import *
17
18VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
19
20# loading environment variables - use .env file for development
21load_dotenv(override=True)
22
23
24def update_vm_settings():
25 """
26 actually update VM settings for all users present in input CSV
27 """
28 api = WebexSimpleApi()
29 final_report = []
30 mail_ids = []
31 # using wxc_sdk.people.PeopleApi.list to iterate over persons
32 # Parameter calling_data needs to be set to true to gat calling specific information
33 # calling users have the attribute location_id set
34 calling_users = [user for user in api.people.list(calling_data=True) if user.location_id]
35 print(f'{len(calling_users)} users:')
36 print('\n'.join(user.display_name for user in calling_users))
37
38 # get CSV file name from command line
39 with open(str(sys.argv[1])) as csv_file:
40 reader = csv.DictReader(csv_file)
41 # read all records from CSV. Ony consider records w/o error
42 for col in reader:
43 if not col['ERROR']:
44 # collect email address from USERNAME column for further processing
45 mail_ids.append(col['USERNAME'])
46 else:
47 final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49 # work on calling users that have an email address that we read from the CSV
50 filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52 print('\nCalling Users in CI - Count ', len(calling_users))
53 print('Mail IDs from input file after removing error columns - Count ', len(mail_ids))
54 print('FilteredUsers Users - Count', len(filteredUsers))
55
56 def set_number_of_rings(user: Person):
57 """
58 Read VM config for a user
59 :param user: user to update
60 """
61 try:
62 # shortcut
63 vm = api.person_settings.voicemail
64
65 # Read Current configuration
66 vm_settings = vm.read(person_id=user.person_id)
67 print(f'\n Existing Configuration: {vm_settings} ')
68
69 # Modify number of rings value
70 vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71 vm.configure(user.person_id, settings=vm_settings)
72 # Read configuration after changes
73 vm_settings = vm.read(user.person_id)
74 print(f'\n New Configuration: {vm_settings} ')
75 final_report.append((user.display_name, 'SUCCESS'))
76 except Exception as e:
77 final_report.append((user.display_name, 'FAILURE'))
78 print('type error: ' + str(e))
79 print(traceback.format_exc())
80 return
81
82 # Modify settings for the filtered users
83 with ThreadPoolExecutor() as pool:
84 list(pool.map(lambda user: set_number_of_rings(user), filteredUsers))
85
86 print(final_report)
87 with open('output.csv', 'w') as f:
88 write = csv.writer(f)
89 write.writerow(['USERNAME', 'STATUS'])
90 write.writerows(final_report)
91
92
93if __name__ == '__main__':
94 update_vm_settings()
Holiday schedule w/ US national holidays for all US locations
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY.
Source: us_holidays.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5"""
6
7import logging
8from collections import defaultdict
9from concurrent.futures import ThreadPoolExecutor
10from datetime import date
11from threading import Lock
12from typing import List
13
14from calendarific import CalendarifiyApi, Holiday
15from dotenv import load_dotenv
16
17from wxc_sdk import WebexSimpleApi
18from wxc_sdk.all_types import Event, Schedule, ScheduleType
19from wxc_sdk.locations import Location
20
21log = logging.getLogger(__name__)
22
23# a lock per location to protect observe_in_location()
24location_locks: dict[str, Lock] = defaultdict(Lock)
25
26# Use parallel threads for provisioning?
27USE_THREADING = True
28
29# True: delete holiday schedule instead of creating one
30CLEAN_UP = False
31
32# first and last year for which to create public holiday events
33FIRST_YEAR = 2022
34LAST_YEAR = 2024
35
36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
37
38
39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
40 """
41 create/update a "National Holiday" schedule in one location
42
43 :param api: Webex api
44 :type api: WebexSimpleApi
45 :param location: location to work on
46 :type location: Location
47 :param holidays: list of holidays to observe
48 :type holidays: List[Holiday]
49 """
50 # there should always only one thread messing with the holiday schedule of a location
51 with location_locks[location.location_id]:
52 year = holidays[0].date.year
53 schedule_name = 'National Holidays'
54
55 # shortcut
56 ats = api.telephony.schedules
57
58 # existing "National Holiday" schedule or None
59 schedule = next((schedule
60 for schedule in ats.list(obj_id=location.location_id,
61 schedule_type=ScheduleType.holidays,
62 name=schedule_name)
63 if schedule.name == schedule_name),
64 None)
65 if CLEAN_UP:
66 if schedule:
67 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
68 ats.delete_schedule(obj_id=location.location_id,
69 schedule_type=ScheduleType.holidays,
70 schedule_id=schedule.schedule_id)
71 return
72 if schedule:
73 # we need the details: list response doesn't have events
74 schedule = ats.details(obj_id=location.location_id,
75 schedule_type=ScheduleType.holidays,
76 schedule_id=schedule.schedule_id)
77 # create list of desired schedule entries
78 # * one per holiday
79 # * only future holidays
80 # * not on a Sunday
81 today = date.today()
82 events = [Event(name=f'{holiday.name} {holiday.date.year}',
83 start_date=holiday.date,
84 end_date=holiday.date,
85 all_day_enabled=True)
86 for holiday in holidays
87 if holiday.date >= today and holiday.date.weekday() != 6]
88
89 if not schedule:
90 # create new schedule
91 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
92 if not events:
93 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
94 return
95 schedule = Schedule(name=schedule_name,
96 schedule_type=ScheduleType.holidays,
97 events=events)
98 log.debug(
99 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100 f'events')
101 schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103 return
104
105 # update existing schedule
106 with ThreadPoolExecutor() as pool:
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_THREADING:
114 list(pool.map(
115 lambda event: ats.event_delete(obj_id=location.location_id,
116 schedule_type=ScheduleType.holidays,
117 schedule_id=schedule.schedule_id,
118 event_id=event.event_id),
119 to_delete))
120 else:
121 for event in to_delete:
122 ats.event_delete(obj_id=location.location_id,
123 schedule_type=ScheduleType.holidays,
124 schedule_id=schedule.schedule_id,
125 event_id=event.event_id)
126
127 # add events which don't exist yet
128 existing_dates = set(event.start_date
129 for event in schedule.events)
130 to_add = [event
131 for event in events
132 if event.start_date not in existing_dates]
133 if not to_add:
134 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
135 return
136 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
137 if USE_THREADING:
138 list(pool.map(
139 lambda event: ats.event_create(
140 obj_id=location.location_id,
141 schedule_type=ScheduleType.holidays,
142 schedule_id=schedule.schedule_id,
143 event=event),
144 to_add))
145 else:
146 for event in to_add:
147 ats.event_create(
148 obj_id=location.location_id,
149 schedule_type=ScheduleType.holidays,
150 schedule_id=schedule.schedule_id,
151 event=event)
152 log.info(f'observe_in_location({location.name}, {year}): done.')
153 return
154
155
156def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
157 year: int = None):
158 """
159 US national holidays for given locations
160
161 :param api: Webex api
162 :type api: WebexSimpleApi
163 :param locations: list of locations in which US national holidays should be observed
164 :type locations: List[Location]
165 :param year: year for national holidays. Default: current year
166 :type year: int
167 """
168 # default: this year
169 year = year or date.today().year
170
171 # get national holidays for specified year
172 holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
173
174 # update holiday schedule for each location
175 with ThreadPoolExecutor() as pool:
176 if USE_THREADING:
177 list(pool.map(
178 lambda location: observe_in_location(api=api, location=location, holidays=holidays),
179 locations))
180 else:
181 for location in locations:
182 observe_in_location(api=api, location=location, holidays=holidays)
183 return
184
185
186if __name__ == '__main__':
187 # read dotenv which has some environment variables like Webex API token and Calendarify
188 # API key.
189 load_dotenv(override=True)
190
191 # enable logging
192 logging.basicConfig(level=logging.DEBUG,
193 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
194 logging.getLogger('urllib3').setLevel(logging.INFO)
195 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
196
197 # the actual action
198 with WebexSimpleApi(concurrent_requests=5) as wx_api:
199 # get all US locations
200 log.info('Getting locations...')
201 us_locations = [location
202 for location in wx_api.locations.list()
203 if location.address.country == 'US']
204
205 # set up location locks
206 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207 list(location_locks[loc.location_id] for loc in us_locations)
208
209 # create national holiday schedule for given year(s) and locations
210 if USE_THREADING:
211 with ThreadPoolExecutor() as pool:
212 list(pool.map(
213 lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
214 range(FIRST_YEAR, LAST_YEAR + 1)))
215 else:
216 for year in range(FIRST_YEAR, LAST_YEAR + 1):
217 observe_national_holidays(api=wx_api, year=year, locations=us_locations)
Holiday schedule w/ US national holidays for all US locations (async variant)
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY.
Source: us_holidays_async.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5
6Using the asyc SDK variant
7"""
8import asyncio
9import functools
10import logging
11from collections import defaultdict
12from datetime import date
13from typing import List
14
15from calendarific import CalendarifiyApi, Holiday
16from dotenv import load_dotenv
17
18from wxc_sdk.all_types import Event, Schedule, ScheduleType
19from wxc_sdk.as_api import AsWebexSimpleApi
20from wxc_sdk.locations import Location
21
22log = logging.getLogger(__name__)
23
24# a lock per location to protect observe_in_location()
25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
26
27# Use parallel tasks for provisioning?
28USE_TASKS = True
29
30# True: delete holiday schedule instead of creating one
31CLEAN_UP = False
32
33# first and last year for which to create public holiday events
34FIRST_YEAR = 2024
35LAST_YEAR = 2026
36
37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
38
39
40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
41 """
42 create/update a "National Holiday" schedule in one location
43
44 :param api: Webex api
45 :type api: WebexSimpleApi
46 :param location: location to work on
47 :type location: Location
48 :param holidays: list of holidays to observe
49 :type holidays: List[Holiday]
50 """
51 # there should always only one thread messing with the holiday schedule of a location
52 async with location_locks[location.location_id]:
53 year = holidays[0].date.year
54 schedule_name = 'National Holidays'
55
56 # shortcut
57 ats = api.telephony.schedules
58
59 # existing "National Holiday" schedule or None
60 schedule = next((schedule
61 for schedule in await ats.list(obj_id=location.location_id,
62 schedule_type=ScheduleType.holidays,
63 name=schedule_name)
64 if schedule.name == schedule_name),
65 None)
66 if CLEAN_UP:
67 if schedule:
68 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
69 await ats.delete_schedule(obj_id=location.location_id,
70 schedule_type=ScheduleType.holidays,
71 schedule_id=schedule.schedule_id)
72 return
73 if schedule:
74 # we need the details: list response doesn't have events
75 schedule = await ats.details(obj_id=location.location_id,
76 schedule_type=ScheduleType.holidays,
77 schedule_id=schedule.schedule_id)
78 # create list of desired schedule entries
79 # * one per holiday
80 # * only future holidays
81 # * not on a Sunday
82 today = date.today()
83 events = [Event(name=f'{holiday.name} {holiday.date.year}',
84 start_date=holiday.date,
85 end_date=holiday.date,
86 all_day_enabled=True)
87 for holiday in holidays
88 if holiday.date >= today and holiday.date.weekday() != 6]
89
90 if not schedule:
91 # create new schedule
92 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
93 if not events:
94 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
95 return
96 schedule = Schedule(name=schedule_name,
97 schedule_type=ScheduleType.holidays,
98 events=events)
99 log.debug(
100 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101 f'events')
102 schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104 return
105
106 # update existing schedule
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_TASKS:
114 await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115 schedule_type=ScheduleType.holidays,
116 schedule_id=schedule.schedule_id,
117 event_id=event.event_id)
118 for event in to_delete])
119 else:
120 for event in to_delete:
121 await ats.event_delete(obj_id=location.location_id,
122 schedule_type=ScheduleType.holidays,
123 schedule_id=schedule.schedule_id,
124 event_id=event.event_id)
125
126 # add events which don't exist yet
127 existing_dates = set(event.start_date
128 for event in schedule.events)
129 to_add = [event
130 for event in events
131 if event.start_date not in existing_dates]
132 if not to_add:
133 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
134 return
135 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
136 if USE_TASKS:
137 await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
138 schedule_type=ScheduleType.holidays,
139 schedule_id=schedule.schedule_id,
140 event=event)
141 for event in to_add])
142 else:
143 for event in to_add:
144 await ats.event_create(obj_id=location.location_id,
145 schedule_type=ScheduleType.holidays,
146 schedule_id=schedule.schedule_id,
147 event=event)
148 log.info(f'observe_in_location({location.name}, {year}): done.')
149 return
150
151
152async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
153 year: int = None):
154 """
155 US national holidays for given locations
156
157 :param api: Webex api
158 :type api: WebexSimpleApi
159 :param locations: list of locations in which US national holidays should be observed
160 :type locations: List[Location]
161 :param year: year for national holidays. Default: current year
162 :type year: int
163 """
164 # default: this year
165 year = year or date.today().year
166
167 # get national holidays for specified year
168 loop = asyncio.get_running_loop()
169 # avoid sync all:
170 # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171 holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
172 country='US', year=year, holiday_type='national'))
173
174 # update holiday schedule for each location
175 if USE_TASKS:
176 await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
177 for location in locations])
178 else:
179 for location in locations:
180 await observe_in_location(api=api, location=location, holidays=holidays)
181 return
182
183
184if __name__ == '__main__':
185 # read dotenv which has some environment variables like Webex API token and Calendarify
186 # API key.
187 load_dotenv(override=True)
188
189 # enable logging
190 logging.basicConfig(level=logging.DEBUG,
191 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192 logging.getLogger('urllib3').setLevel(logging.INFO)
193 logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
194
195 # the actual action
196 async def do_provision():
197
198 async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
199 # get all US locations
200 log.info('Getting locations...')
201 us_locations = [location
202 for location in await wx_api.locations.list()
203 if location.address.country == 'US']
204
205 # set up location locks
206 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207 list(location_locks[loc.location_id] for loc in us_locations)
208
209 # create national holiday schedule for given year(s) and locations
210 if USE_TASKS:
211 await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
212 for year in range(FIRST_YEAR, LAST_YEAR + 1)])
213 else:
214 for year in range(FIRST_YEAR, LAST_YEAR + 1):
215 await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
216
217 asyncio.run(do_provision())
Persist tokens and obtain new tokens interactively
A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.
This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python
webbrowser module and calling the open() method with the authorization URL of a given integration to open that
URL in the system web browser. The user can then authenticate and grant access to the integration. In the last
step of a successful authorization flow the web browser is redirected to the redirect_url of the
integration.
The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect.
This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.
The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID,
TOKEN_INTEGRATION_CLIENT_SECRET, TOKEN_INTEGRATION_CLIENT_SCOPES). These variables can also be defined in
get_tokens.env in the current directory:
# rename this to get_tokens.env and set values
# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=
# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=
# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"
The sample code persists the tokens in get_tokens.yml in the current directory. On startup the sample code tries
to read tokens from that file. If needed a new access token is obtained using the refresh token.
An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml
Source: get_tokens.py
1#!/usr/bin/env python
2"""
3Example script
4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
5the local web browser
6"""
7import logging
8import os
9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import WebexSimpleApi
14from wxc_sdk.integration import Integration
15from wxc_sdk.scopes import parse_scopes
16from wxc_sdk.tokens import Tokens
17
18log = logging.getLogger(__name__)
19
20
21def env_path() -> str:
22 """
23 determine path for .env to load environment variables from
24
25 :return: .env file path
26 """
27 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
28
29
30def yml_path() -> str:
31 """
32 determine path of YML file to persist tokens
33
34 :return: path to YML file
35 :rtype: str
36 """
37 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
38
39
40def build_integration() -> Integration:
41 """
42 read integration parameters from environment variables and create an integration
43
44 :return: :class:`wxc_sdk.integration.Integration` instance
45 """
46 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
47 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
48 scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
49 redirect_url = 'http://localhost:6001/redirect'
50 if not all((client_id, client_secret, scopes)):
51 raise ValueError('failed to get integration parameters from environment')
52 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
53 redirect_url=redirect_url)
54
55
56def get_tokens() -> Optional[Tokens]:
57 """
58
59 Tokens are read from a YML file. If needed an OAuth flow is initiated.
60
61 :return: tokens
62 :rtype: :class:`wxc_sdk.tokens.Tokens`
63 """
64
65 integration = build_integration()
66 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
67 return tokens
68
69
70if __name__ == '__main__':
71 logging.basicConfig(level=logging.DEBUG)
72
73 # load environment variables from .env
74 path = env_path()
75 log.info(f'reading {path}')
76 load_dotenv(env_path())
77
78 tokens = get_tokens()
79
80 # use the tokens to get identity of authenticated user
81 api = WebexSimpleApi(tokens=tokens)
82 me = api.people.me()
83 print(f'authenticated as {me.display_name} ({me.emails[0]}) ')
Read/update call intercept settings of a user
Usage: call_intercept.py [OPTIONS] USER_EMAIL [ON_OFF]:[on|off]
read/update call intercept settings of a calling user.
╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
│ * user_email EMAIL_TYPE email address of user [required] │
│ on_off [ON_OFF]:[on|off] operation to apply. │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
│ --token TEXT Access token can be provided using --token argument, set │
│ in WEBEX_ACCESS_TOKEN environment variable or can be a │
│ service app token. For the latter set environment │
│ variables ('SERVICE_APP_REFRESH_TOKEN', │
│ 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET'). │
│ Environment variables can also be set in service_app.env │
│ --install-completion Install completion for the current shell. │
│ --show-completion Show completion for the current shell, to copy it or │
│ customize the installation. │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
Example: ./call_intercept.py bob@example.com on
Source: call_intercept.py
1#!/usr/bin/env -S uv run --script
2# /// script
3# requires-python = ">=3.11,<3.14"
4# dependencies = [
5# "email-validator",
6# "python-dotenv",
7# "typer",
8# "wxc-sdk",
9# ]
10# ///
11"""
12Script to read/update call intercept settings of a calling user.
13
14 Usage: call_intercept.py [OPTIONS] USER_EMAIL [ON_OFF]:[on|off]
15
16 read/update call intercept settings of a calling user.
17
18╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
19│ * user_email EMAIL_TYPE email address of user [required] │
20│ on_off [ON_OFF]:[on|off] operation to apply. │
21╰────────────────────────────────────────────────────────────────────────────────────────────╯
22╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
23│ --token TEXT Access token can be provided using --token argument, set │
24│ in WEBEX_ACCESS_TOKEN environment variable or can be a │
25│ service app token. For the latter set environment │
26│ variables ('SERVICE_APP_REFRESH_TOKEN', │
27│ 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET'). │
28│ Environment variables can also be set in service_app.env │
29│ --install-completion Install completion for the current shell. │
30│ --show-completion Show completion for the current shell, to copy it or │
31│ customize the installation. │
32│ --help Show this message and exit. │
33╰────────────────────────────────────────────────────────────────────────────────────────────╯
34
35 Example: ./call_intercept.py bob@example.com on
36"""
37import logging
38import os
39import sys
40from enum import Enum
41from typing import Optional
42
43import typer
44from dotenv import load_dotenv
45from email_validator import validate_email
46from service_app import SERVICE_APP_ENVS, env_path, get_tokens
47
48from wxc_sdk import WebexSimpleApi
49from wxc_sdk.person_settings.call_intercept import InterceptSetting
50from wxc_sdk.rest import RestError
51from wxc_sdk.tokens import Tokens
52
53log = logging.getLogger(__name__)
54
55
56def email_type(value: str):
57 """
58 Validate email address
59 """
60 validated = validate_email(value, check_deliverability=False)
61 return validated.normalized
62
63
64class OnOff(str, Enum):
65 ON = 'on'
66 OFF = 'off'
67
68
69app = typer.Typer()
70
71
72@app.command(name=os.path.basename(__file__),
73 help='read/update call intercept settings of a calling user.',
74 epilog=f'Example: {sys.argv[0]} bob@example.com on')
75def main(user_email: str = typer.Argument(...,
76 help='email address of user',
77 parser=email_type),
78 on_off: Optional[OnOff] = typer.Argument(None,
79 help='operation to apply.'),
80 token: Optional[str] = typer.Option(None, '--token',
81 help=f'Access token can be provided using --token argument, '
82 f'set in WEBEX_ACCESS_TOKEN environment variable or '
83 f'can be a service app token. For the latter set '
84 f'environment variables {SERVICE_APP_ENVS}. '
85 f'Environment variables can also be set in '
86 f'{env_path()}')):
87 """
88 where the magic happens
89 """
90 # get tokens
91 load_dotenv(env_path(), override=True)
92
93 tokens = get_tokens() if token is None else Tokens(access_token=token)
94 if tokens is None:
95 print(f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable or '
96 f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
97 f'variables can '
98 f'also be set in {env_path()}', file=sys.stderr)
99
100 # set level to DEBUG to see debug of REST requests
101 logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
102
103 with WebexSimpleApi(tokens=tokens) as api:
104 # get user
105 email = user_email.lower()
106 user = next((user
107 for user in api.people.list(email=email)
108 if user.emails[0] == email), None)
109 if user is None:
110 print(f'User "{email}" not found', file=sys.stderr)
111 exit(1)
112
113 # display call intercept status
114 try:
115 intercept = api.person_settings.call_intercept.read(user.person_id)
116 except RestError as e:
117 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
118 exit(1)
119
120 print('on' if intercept.enabled else 'off')
121 if on_off:
122 # action: turn on/off
123 intercept = InterceptSetting.default()
124 intercept.enabled = on_off == OnOff.ON
125 try:
126 api.person_settings.call_intercept.configure(user.person_id,
127 intercept=intercept)
128 except RestError as e:
129 print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
130 exit(1)
131
132 # read call intercept again
133 try:
134 intercept = api.person_settings.call_intercept.read(user.person_id)
135 except RestError as e:
136 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
137 exit(1)
138
139 # display state after update
140 print(f"set to {'on' if intercept.enabled else 'off'}")
141
142 exit(0)
143
144
145if __name__ == '__main__':
146 app()
Read/update call queue agent join states
usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]]
[--queue QUEUE [QUEUE ...]]
[--join JOIN_AGENT [JOIN_AGENT ...]]
[--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
[--remove REMOVE_USER [REMOVE_USER ...]]
[--add ADD_USER [ADD_USER ...]] [--dryrun]
[--token TOKEN]
Modify call queue settings from the CLI
optional arguments:
-h, --help show this help message and exit
--location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
name of location to work on. If missing then work on
all locations.
--queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
name(s) of queue(s) to operate on. If missing then
work on all queues in location.
--join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
Join given user(s) on given queue(s). Can be "all" to
act on all agents.
--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
Unjoin given agent(s) from given queue(s). Can be
"all" to act on all agents.
--remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
Remove given agent from given queue(s). Can be "all"
to act on all agents.
--add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
Add given users to given queue(s).
--dryrun, -d Dry run; don't apply any changes
--token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables
which can be set in a queue_helper.env file in the current directory.
Source: queue_helper.py
1#!/usr/bin/env python
2"""
3usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]] [--queue QUEUE [QUEUE ...]]
4 [--join JOIN_AGENT [JOIN_AGENT ...]] [--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
5 [--remove REMOVE_USER [REMOVE_USER ...]] [--add ADD_USER [ADD_USER ...]] [--dryrun]
6 [--token TOKEN]
7
8Modify call queue settings from the CLI
9
10optional arguments:
11 -h, --help show this help message and exit
12 --location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
13 name of location to work on. If missing then work on all locations.
14 --queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
15 name(s) of queue(s) to operate on. If missing then work on all queues in location.
16 --join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
17 Join given user(s) on given queue(s). Can be "all" to act on all agents.
18 --unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
19 Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.
20 --remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
21 Remove given agent from given queue(s). Can be "all" to act on all agents.
22 --add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
23 Add given users to given queue(s).
24 --dryrun, -d Dry run; don't apply any changes
25 --token TOKEN admin access token to use
26"""
27import asyncio
28import logging
29import os
30import sys
31from argparse import ArgumentParser
32from collections.abc import AsyncGenerator
33from typing import Optional
34
35from dotenv import load_dotenv
36
37from wxc_sdk.as_api import AsWebexSimpleApi
38from wxc_sdk.integration import Integration
39from wxc_sdk.people import Person
40from wxc_sdk.scopes import parse_scopes
41from wxc_sdk.telephony.callqueue import CallQueue
42from wxc_sdk.telephony.hg_and_cq import Agent
43from wxc_sdk.tokens import Tokens
44
45
46def agent_name(agent: Agent) -> str:
47 return f'{agent.first_name} {agent.last_name}'
48
49
50def env_path() -> str:
51 """
52 determine path for .env to load environment variables from; based on name of this file
53 :return: .env file path
54 """
55 return f'{os.path.splitext(__file__)[0]}.env'
56
57
58def yml_path() -> str:
59 """
60 determine path of YML file to persist tokens
61 :return: path to YML file
62 :rtype: str
63 """
64 return f'{os.path.splitext(__file__)[0]}.yml'
65
66
67def build_integration() -> Integration:
68 """
69 read integration parameters from environment variables and create an integration
70 :return: :class:`wxc_sdk.integration.Integration` instance
71 """
72 client_id = os.getenv('INTEGRATION_CLIENT_ID')
73 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
74 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
75 redirect_url = 'http://localhost:6001/redirect'
76 if not all((client_id, client_secret, scopes)):
77 raise ValueError('failed to get integration parameters from environment')
78 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
79 redirect_url=redirect_url)
80
81
82def get_tokens() -> Optional[Tokens]:
83 """
84 Tokens are read from a YML file. If needed an OAuth flow is initiated.
85
86 :return: tokens
87 :rtype: :class:`wxc_sdk.tokens.Tokens`
88 """
89
90 integration = build_integration()
91 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
92 return tokens
93
94
95async def main():
96 async def act_on_queue(queue: CallQueue):
97 """
98 Act on a single queue
99 """
100 # we need the queue details b/c the queue instance passed as parameter is from a list() call
101 # ... and thus is missing all the details like agents
102 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
103 agent_names = set(map(agent_name, details.agents))
104
105 def notify(message: str) -> str:
106 """
107 an action notification with queue information
108 """
109 return f'queue "{details.name:{queue_len}}" in "{queue.location_name:{location_len}}": {message}'
110
111 def validate_agents(names: list[str], operation: str) -> list[str]:
112 """
113 check if all names in given list exist as agents on current queue
114 """
115 if 'all' in names:
116 return set(agent_names)
117
118 not_found = [name for name in names if name not in agent_names]
119 if not_found:
120 print('\n'.join(notify(f'{name} not found for {operation}"')
121 for name in not_found),
122 file=sys.stderr)
123 return set(name for name in names if name not in set(not_found))
124
125 # validate list of names or join, unjoin, and remove against actual list of agents
126 to_join = validate_agents(join_agents, 'join')
127 to_unjoin = validate_agents(unjoin_agents, 'unjoin')
128 to_remove = validate_agents(remove_users, 'remove')
129
130 # check for agents we are asked to add but which already exist as agents on the queue
131 existing_agent_ids = set(agent.agent_id for agent in details.agents)
132 agent_exists = [agent_name(user) for user in add_users
133 if user.person_id in existing_agent_ids]
134 if agent_exists:
135 print('\n'.join(notify(f'{name} already is agent')
136 for name in agent_exists),
137 file=sys.stderr)
138 # reduced set of users to add
139 to_add = [user for user in add_users
140 if agent_name(user) not in set(agent_exists)]
141 else:
142 # ... or add all users
143 to_add = add_users
144
145 # the updated list of agents for the current queue
146 new_agents = []
147
148 # do we actually need an update?
149 update_needed = False
150
151 # create copy of each agent instance; we don't want to update the original agent objects
152 # to make sure that details still holds the state before any update
153 agents = [agent.copy(deep=True) for agent in details.agents]
154
155 # iterate through the existing agents and see if we have to apply any change
156 for agent in agents:
157 name = agent_name(agent)
158 # do we have to take action to join this agent?
159 if name in to_join and not agent.join_enabled:
160 print(notify(f'{name}, join'))
161 update_needed = True
162 agent.join_enabled = True
163 # do we have to take action to unjoin this agent?
164 if name in to_unjoin and agent.join_enabled:
165 print(notify(f'{name}, unjoin'))
166 update_needed = True
167 agent.join_enabled = False
168 # do we have to remove this agent?
169 if name in to_remove:
170 print(notify(f'{name}, remove'))
171 update_needed = True
172 # skip to next agent; so that we don't add this agent to the updated list of agents
173 continue
174 new_agents.append(agent)
175
176 # add new agents
177 new_agents.extend(Agent(id=user.person_id)
178 for user in to_add)
179
180 # update the queue
181 if (update_needed or to_add) and not args.dryrun:
182 # simplified update: we only messed with the agents
183 update = CallQueue(agents=new_agents)
184 await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id,
185 update=update)
186 print(notify('queue updated'))
187 # and get details after the update
188 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
189 print(notify('got details after update'))
190
191 # print summary
192 print(f'queue "{queue.name:{queue_len}}" in "{queue.location_name}"')
193 print(f' phone number: {details.phone_number}')
194 print(f' extension: {details.extension}')
195 print(' agents')
196 if details.agents:
197 name_len = max(map(len, map(agent_name, details.agents)))
198 for agent in details.agents:
199 print(f' {agent_name(agent):{name_len}}: {"not " if not agent.join_enabled else ""}joined')
200 return
201
202 async def validate_users(user_names: list[str]) -> AsyncGenerator[Person, None, None]:
203 """
204 Validate list of names of users to be added and yield a Person instance for each one
205 """
206 # search for all names in parallel
207 lists: list[list[Person]] = await asyncio.gather(
208 *[api.people.list(display_name=name) for name in user_names], return_exceptions=True)
209 for name, user_list in zip(user_names, lists):
210 if isinstance(user_list, Exception):
211 user = None
212 else:
213 user = next((u for u in user_list if name == agent_name(u)), None)
214 if user is None:
215 print(f'user "{name}" not found', file=sys.stderr)
216 continue
217 yield user
218 return
219
220 # parse command line
221 parser = ArgumentParser(description='Modify call queue settings from the CLI')
222 parser.add_argument('--location', '-l', type=str, required=False, nargs='+',
223 help='name of location to work on. If missing then work on all locations.')
224
225 parser.add_argument('--queue', '-q', type=str, required=False, nargs='+',
226 help='name(s) of queue(s) to operate on. If missing then work on all queues in location.')
227
228 parser.add_argument('--join', '-j', type=str, required=False, nargs='+', dest='join_agent',
229 help='Join given user(s) on given queue(s). Can be "all" to act on all agents.')
230
231 parser.add_argument('--unjoin', '-u', type=str, required=False, nargs='+', dest='unjoin_agent',
232 help='Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.')
233
234 parser.add_argument('--remove', '-r', type=str, required=False, nargs='+', dest='remove_user',
235 help='Remove given agent from given queue(s). Can be "all" to act on all agents.')
236
237 parser.add_argument('--add', '-a', type=str, required=False, nargs='+', dest='add_user',
238 help='Add given users to given queue(s).')
239 parser.add_argument('--dryrun', '-d', required=False, action='store_true',
240 help='Dry run; don\'t apply any changes')
241 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
242
243 args = parser.parse_args()
244
245 # get environment variables from .env; required for integration parameters
246 load_dotenv(env_path())
247
248 tokens = args.token or None
249 if tokens is None:
250 # get tokens from cache or create a new set of tokens using the integration defined in .env
251 tokens = get_tokens()
252
253 async with AsWebexSimpleApi(tokens=tokens) as api:
254 # validate location parameter
255 location_names = args.location or []
256
257 # list of all locations with names matching one of the provided names
258 locations = [loc for loc in await api.locations.list()
259 if not location_names or loc.name in set(location_names)]
260
261 if not location_names:
262 print(f'Considering all {len(locations)} locations')
263
264 # set of names of matching locations
265 found_location_names = set(loc.name for loc in locations)
266
267 # Error message for each location name argument not matching an actual location
268 for location_name in location_names:
269 if location_name not in found_location_names:
270 print(f'location "{location_name}" not found', file=sys.stderr)
271
272 if not locations:
273 print('Found no locations to work on', file=sys.stderr)
274 exit(1)
275
276 # which queues do we need to operate on?
277 location_ids = set(loc.location_id for loc in locations)
278 queue_names = args.queue
279 all_queues = queue_names is None
280 # full list of queues
281 queues = await api.telephony.callqueue.list()
282 # filter based on location parameter
283 queues = [queue for queue in queues
284 if (all_queues or queue.name in queue_names) and queue.location_id in location_ids]
285
286 # len of queue names for nicer output
287 queue_len = max(len(queue.name) for queue in queues)
288
289 # now we can actually go back and re-evaluate the list of locations; for the location length we only need
290 # to consider locations we actually have a target queue in
291 location_ids = set(queue.location_id for queue in queues)
292
293 # max length of location names for nicely formatted output
294 location_len = max(len(loc.name)
295 for loc in locations
296 if loc.location_id in location_ids)
297
298 # get the names for join, unjoin, remove, and add
299 join_agents = args.join_agent or []
300 unjoin_agents = args.unjoin_agent or []
301 remove_users = args.remove_user or []
302 add_users = args.add_user or []
303
304 # validate users; make sure that users exist with the provided names
305 add_users = [u async for u in validate_users(user_names=add_users)]
306
307 # apply actions to all queues
308 await asyncio.gather(*[act_on_queue(queue) for queue in queues])
309
310
311if __name__ == '__main__':
312 # enable DEBUG logging to a file; REST log shows all requests
313 logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
314 filemode='w', level=logging.DEBUG)
315 asyncio.run(main())
Using service APP tokens to access a API endpoints
The script uses service app credentials to get an access token and then use this access token to call Webex Calling APIs.
Source: service_app.py
1#!/usr/bin/env -S uv run --script
2# /// script
3# requires-python = ">=3.11,<3.14"
4# dependencies = [
5# "python-dotenv",
6# "wxc-sdk",
7# ]
8# ///
9"""
10Demo for Webex service app: using service APP tokens to access a API endpoints
11"""
12
13import inspect
14import logging
15import os
16import sys
17from json import dumps, loads
18from os import getenv
19from typing import Optional
20
21import yaml
22from dotenv import load_dotenv
23
24from wxc_sdk import WebexSimpleApi
25from wxc_sdk.integration import Integration
26from wxc_sdk.tokens import Tokens
27
28SERVICE_APP_ENVS = ('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET')
29
30
31def yml_path(*, client_id: str) -> str:
32 """
33 Get filename for YML file to cache access and refresh token
34 """
35 return f'tokens_{client_id}.yml'
36
37
38def env_path() -> str:
39 """
40 Get path to .env file to read service app settings from
41 """
42 # get the file name of the calling script to determine the name of the .env file
43 frame = inspect.currentframe().f_back
44 file_name = inspect.getframeinfo(frame).filename
45 env_name = f'{os.path.splitext(os.path.basename(file_name))[0]}.env'
46 if not os.path.isfile(env_name):
47 # fallback to .env specific to this file
48 env_name = f'{os.path.splitext(os.path.basename(__file__))[0]}.env'
49 return env_name
50
51
52def read_tokens_from_file(*, client_id: str) -> Optional[Tokens]:
53 """
54 Get service app tokens from cache file, return None if cache does not exist or read fails
55 """
56 path = yml_path(client_id=client_id)
57 if not os.path.isfile(path):
58 return None
59 try:
60 with open(path) as f:
61 data = yaml.safe_load(f)
62 tokens = Tokens.model_validate(data)
63 except Exception:
64 return None
65 return tokens
66
67
68def write_tokens_to_file(*, client_id: str, tokens: Tokens):
69 """
70 Write tokens to cache
71 """
72 with open(yml_path(client_id=client_id), mode='w') as f:
73 yaml.safe_dump(tokens.model_dump(exclude_none=True), f)
74
75
76def get_access_token(*, client_id: str, client_secret: str, refresh: str) -> Tokens:
77 """
78 Get a new access token using refresh token, service app client id, service app client secret
79 """
80 tokens = Tokens(refresh_token=refresh)
81 integration = Integration(client_id=client_id, client_secret=client_secret, scopes=[], redirect_url=None)
82 integration.refresh(tokens=tokens)
83 write_tokens_to_file(client_id=client_id, tokens=tokens)
84 return tokens
85
86
87def get_tokens() -> Optional[Tokens]:
88 """
89 Get tokens from environment variable, cache or create new access token using service app credentials
90 """
91 refresh, client_id, client_secret = (os.getenv(env) for env in SERVICE_APP_ENVS)
92 if not all((refresh, client_id, client_secret)):
93 token = os.getenv('WEBEX_ACCESS_TOKEN')
94 if token is None:
95 return None
96 tokens = Tokens(access_token=token)
97 else:
98 # try to read from file
99 tokens = read_tokens_from_file(client_id=client_id)
100 # .. or create new access token using refresh token
101 if tokens is None:
102 tokens = get_access_token(client_id=client_id, client_secret=client_secret, refresh=refresh)
103 if tokens.expires_in is not None and tokens.remaining < 24 * 60 * 60:
104 tokens = get_access_token(client_id=client_id, client_secret=client_secret, refresh=refresh)
105 return tokens
106
107
108def service_app():
109 """
110 Use service app access token to call Webex Calling API endpoints
111 """
112 load_dotenv(env_path())
113 # assert that all required environment variable are set
114 if not all(getenv(s) for s in SERVICE_APP_ENVS):
115 print(
116 f'{", ".join(SERVICE_APP_ENVS)} environment variables need to be defined in '
117 f'environment or in "{env_path()}"',
118 file=sys.stderr,
119 )
120 exit(1)
121
122 # get tokens and dump to console
123 tokens = get_tokens()
124
125 print(dumps(loads(tokens.json()), indent=2))
126 print()
127 print('scopes:')
128 print('\n'.join(f' * {s}' for s in sorted(tokens.scope.split())))
129
130 # use tokens to access APIs
131 with WebexSimpleApi(tokens=tokens) as api:
132 users = list(api.people.list())
133 print(f'{len(users)} users')
134
135 queues = list(api.telephony.callqueue.list())
136 print(f'{len(queues)} call queues')
137 return
138
139
140if __name__ == '__main__':
141 logging.basicConfig(level=logging.DEBUG)
142 service_app()
Pool unassigned TNs on hunt groups to catch calls to unassigned TNs
This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number. The idea is to catch all incoming calls to unassigned TNs and handle them accordingly.
usage: catch_tns.py [-h] [--test] [--location LOCATION] [--token TOKEN]
[--cleanup]
optional arguments:
-h, --help show this help message and exit
--test test only; don't actually apply any config
--location LOCATION Location to work on
--token TOKEN admin access token to use.
--cleanup remove all pooling HGs
Source: catch_tns.py
1#!/usr/bin/env python
2"""
3This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number.
4The idea is to catch all incoming calls to unassigned TNs and handle them accordingly
5
6 usage: catch_tns.py [-h] [--test] [--location LOCATION] [--token TOKEN]
7 [--cleanup]
8
9 optional arguments:
10 -h, --help show this help message and exit
11 --test test only; don't actually apply any config
12 --location LOCATION Location to work on
13 --token TOKEN admin access token to use.
14 --cleanup remove all pooling HGs
15"""
16import asyncio
17import logging
18import os
19import sys
20from argparse import ArgumentParser, Namespace
21from collections import defaultdict
22from operator import attrgetter
23from typing import Optional, Union
24
25from dotenv import load_dotenv
26
27from wxc_sdk.as_api import AsWebexSimpleApi
28from wxc_sdk.common import AlternateNumber, IdAndName, RingPattern
29from wxc_sdk.integration import Integration
30from wxc_sdk.scopes import parse_scopes
31from wxc_sdk.telephony import NumberListPhoneNumber
32from wxc_sdk.telephony.forwarding import CallForwarding
33from wxc_sdk.telephony.huntgroup import HuntGroup
34from wxc_sdk.tokens import Tokens
35
36POOL_HG_NAME = 'POOL_'
37
38
39def env_path() -> str:
40 """
41 determine path for .env to load environment variables from; based on name of this file
42 :return: .env file path
43 """
44 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
45
46
47def yml_path() -> str:
48 """
49 determine path of YML file to persist tokens
50 :return: path to YML file
51 :rtype: str
52 """
53 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
54
55
56def build_integration() -> Integration:
57 """
58 read integration parameters from environment variables and create an integration
59 :return: :class:`wxc_sdk.integration.Integration` instance
60 """
61 client_id = os.getenv('INTEGRATION_CLIENT_ID')
62 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
63 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
64 redirect_url = 'http://localhost:6001/redirect'
65 if not all((client_id, client_secret, scopes)):
66 raise ValueError('failed to get integration parameters from environment')
67 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
68 redirect_url=redirect_url)
69
70
71def get_tokens() -> Optional[Tokens]:
72 """
73 Tokens are read from a YML file. If needed an OAuth flow is initiated.
74
75 :return: tokens
76 :rtype: :class:`wxc_sdk.tokens.Tokens`
77 """
78
79 integration = build_integration()
80 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
81 return tokens
82
83
84async def location_id_from_args(api: AsWebexSimpleApi, args: Namespace) -> Optional[str]:
85 """
86 Get location id for --location parameter
87 """
88 if not args.location:
89 return None
90 location = next((loc
91 for loc in await api.locations.list(name=args.location)
92 if loc.name == args.location),
93 None)
94 return location and location.location_id
95
96
97async def cleanup(api: AsWebexSimpleApi, args: Namespace):
98 """
99 clean up (delete) pooling HGs
100 """
101 location_id = await location_id_from_args(api, args)
102 existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location_id)
103 if hg.name.startswith(POOL_HG_NAME)]
104
105 async def delete_one(hg: HuntGroup):
106 if not args.test:
107 await api.telephony.huntgroup.delete_huntgroup(location_id=hg.location_id,
108 huntgroup_id=hg.id)
109 print(f'Deleted HG "{hg.name}" in location "{hg.location_name}"')
110
111 await asyncio.gather(*[delete_one(hg)
112 for hg in existing_hg_list])
113
114
115async def pool_tns_location(api: AsWebexSimpleApi, args: Namespace, location: IdAndName,
116 tns: list[NumberListPhoneNumber]):
117 """
118 Pool unassigned TNs for a given location
119 """
120
121 async def add_to_hg(hg: Union[HuntGroup, str],
122 tns: list[NumberListPhoneNumber]):
123 """
124 Add a bunch of TNs to given HG
125 :param hg: can be an existing HG or a name of a HG to be created
126 :param tns: list of TNs to be added to the HG
127 """
128 if isinstance(hg, str):
129 # create new HG
130 print(f'Creating HG "{hg}" in location "{location.name}" for: '
131 f'{", ".join(tn.phone_number for tn in tns)}')
132 if args.test:
133 return
134 settings = HuntGroup.create(name=hg,
135 phone_number=tns[0].phone_number)
136 new_id = await api.telephony.huntgroup.create(location_id=location.id,
137 settings=settings)
138 # Get details of new HG and also the forwarding settings
139 # * details are needed for the recursive call to add_to_hg .. in case we have more than one TN to add
140 # * ... and forwarding settings are needed b/c we want to set CFwdAll to location's main number
141 details, forwarding = await asyncio.gather(
142 api.telephony.huntgroup.details(location_id=location.id, huntgroup_id=new_id),
143 api.telephony.huntgroup.forwarding.settings(location_id=location.id, feature_id=new_id))
144 forwarding: CallForwarding
145 details: HuntGroup
146
147 # set call forwarding
148 print(f'HG "{hg}" in location "{location.name}": set CFwdAll to {main_number}')
149 forwarding.always.enabled = True
150 forwarding.always.destination = main_number
151 await api.telephony.huntgroup.forwarding.update(location_id=location.id, feature_id=new_id,
152 forwarding=forwarding)
153 if len(tns) > 1:
154 # add the remaining as alternate numbers
155 await add_to_hg(hg=details, tns=tns[1:])
156 return
157
158 # add tns as alternate numbers
159 print(f'HG "{hg.name}" in location "{location.name}", adding: '
160 f'{", ".join(tn.phone_number for tn in tns)}')
161 if args.test:
162 return
163 # weirdly on GET alternate numbers are returned in alternate_number_settings ...
164 alternate_numbers = hg.alternate_number_settings.alternate_numbers
165 alternate_numbers.extend(AlternateNumber(phone_number=tn.phone_number,
166 ring_pattern=RingPattern.normal)
167 for tn in tns)
168 # ... while for an update Wx expects the alternate numbers in an alternate_number attribute
169 update = HuntGroup(alternate_numbers=alternate_numbers)
170 await api.telephony.huntgroup.update(location_id=location.id,
171 huntgroup_id=hg.id,
172 update=update)
173
174 return
175
176 # if the list of available TNs includes the main number then something is wonky. The main number should be owned
177 # by something
178 main_number = next((tn for tn in tns if tn.main_number), None)
179 if main_number is not None:
180 print(f'Error: main number {main_number.phone_number} in location "{location.name}" is not assigned to '
181 f'anything!', file=sys.stderr)
182 return
183
184 # get main number of location
185 main_number = (await api.telephony.location.details(location_id=location.id)).calling_line_id.phone_number
186
187 # get list if "pool" HGs in location
188 existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location.id)
189 if hg.name.startswith(POOL_HG_NAME)]
190
191 # we need the details for all HGs: list() response is missing the alternate number list
192 # get all HG details in parallel
193 existing_hg_list = await asyncio.gather(*[api.telephony.huntgroup.details(location_id=location.id,
194 huntgroup_id=hg.id)
195 for hg in existing_hg_list])
196 existing_hg_list: list[HuntGroup]
197
198 # start with an empty list of tasks
199 tasks = []
200
201 # assign TNs to existing "pool" HGs
202
203 # these are the HGs we can still assign some TNs to
204 hgs_with_open_slots = (hg
205 for hg in existing_hg_list
206 if len(hg.alternate_number_settings.alternate_numbers) < 10)
207 tns.sort(key=attrgetter('phone_number'))
208 while tns:
209 hg_with_open_slots = next(hgs_with_open_slots, None)
210 if hg_with_open_slots is None:
211 # no more HGs we can assign TNs to --> we are done here
212 break
213
214 # add some tns to this hg; hg can have max 10 alternate numbers
215 tns_to_add = 10 - len(hg_with_open_slots.alternate_number_settings.alternate_numbers)
216 tasks.append(add_to_hg(hg=hg_with_open_slots, tns=tns[:tns_to_add]))
217
218 # continue w/ remaining TNs
219 tns = tns[tns_to_add:]
220
221 # assign remaining TNs to new "pool" HGs
222 # .. in batches of 11
223 existing_names = set(hg.name for hg in existing_hg_list)
224 new_hg_names = (name
225 for i in range(1, 1000)
226 if (name := f'{POOL_HG_NAME}{i:03d}') not in existing_names)
227 while tns:
228 tasks.append(add_to_hg(hg=next(new_hg_names), tns=tns[:11]))
229 tns = tns[11:]
230
231 # Now run all tasks
232 await asyncio.gather(*tasks)
233 return
234
235
236async def pool_tns(api: AsWebexSimpleApi, args: Namespace):
237 """
238 Assign unassigned numbers to pool HGs
239 """
240 location_id = await location_id_from_args(api, args)
241
242 # get available TNs. If a location argument was present then limit to that location
243 numbers = await api.telephony.phone_numbers(available=True, location_id=location_id)
244
245 # we need to work on TNs by location ...
246 tns_by_location: dict[str, list[NumberListPhoneNumber]] = defaultdict(list)
247 # ... and we want to collect location information (specifically the name)
248 locations: dict[str, IdAndName] = dict()
249 for tn in numbers:
250 locations[tn.location.id] = tn.location
251 tns_by_location[tn.location.id].append(tn)
252
253 # work on all locations in parallel
254 await asyncio.gather(*[pool_tns_location(api=api, args=args,
255 location=locations[location_id], tns=tns)
256 for location_id, tns in tns_by_location.items()])
257
258
259async def catch_tns():
260 """
261 Main async logic
262 """
263 parser = ArgumentParser()
264 parser.add_argument('--test', required=False, help='test only; don\'t actually apply any config',
265 action='store_true')
266 parser.add_argument('--location', required=False, help='Location to work on', type=str)
267 parser.add_argument('--token', type=str, required=False, help='admin access token to use.')
268 parser.add_argument('--cleanup', required=False, help='remove all pooling HGs', action='store_true')
269 args = parser.parse_args()
270
271 load_dotenv(env_path(), override=True)
272
273 tokens = args.token or None
274
275 if tokens is None:
276 # get tokens from cache or create a new set of tokens using the integration defined in .env
277 tokens = get_tokens()
278 async with AsWebexSimpleApi(tokens=tokens) as api:
279 if args.cleanup:
280 await cleanup(api, args)
281 else:
282 await pool_tns(api, args)
283
284
285if __name__ == '__main__':
286 logging.basicConfig(level=logging.INFO)
287 asyncio.run(catch_tns())
Downgrade room device workspaces from Webex Calling to free calling
This script looks for workspaces in a given location (or all workspaces) and downgrades them from Webex Calling to free calling.
usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}
CLI tool to manage room device calling entitlements
positional arguments:
{show,clear} show: show all room devices with their calling settings, clear:
remove calling license from devices
options:
-h, --help show this help message and exit
--location LOCATION work on devices in given location
--wsnames WSNAMES file name of a file with workspace names to operate on; one name per
line
--test test run only
(wxc-sdk-py3.11) ➜ examples git:(master) ✗ ./room_devices.py --help
usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}
CLI tool to manage room device calling entitlements
positional arguments:
{show,clear} show: show all room devices with their calling settings, clear: remove
calling license from devices
options:
-h, --help show this help message and exit
--location LOCATION work on devices in given location
--wsnames WSNAMES file name of a file with workspace names to operate on; one name per
line
--test test run only
Source: room_devices.py
1#!/usr/bin/env python
2"""
3usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}
4
5CLI tool to manage room device calling entitlements
6
7positional arguments:
8 {show,clear} show: show all room devices with their calling settings, clear: remove calling
9 license from devices
10
11optional arguments:
12 -h, --help show this help message and exit
13 --location LOCATION work on devices in given location
14 --wsnames WSNAMES file name of a file with workspace names to operate on; one name per line
15 --test test run only
16
17The tool reads environment variables from room_devices.env:
18 SERVICE_APP_CLIENT_ID=<clients id of a service app created on developer.webex.com>
19 SERVICE_APP_CLIENT_SECRET=<clients secret of a service app created on developer.webex.com>
20 SERVICE_APP_REFRESH_TOKEN=<refresh token of the service app obtained after the service app has been
21 authorized for an org>
22
23This information is used to obtain an access token required to authorize API access
24
25This is a super-set of the scopes the service app needs:
26 * spark-admin:workspaces_write
27 * Identity:one_time_password
28 * identity:placeonetimepassword_create
29 * spark:people_read
30 * spark-admin:workspace_locations_read
31 * spark-admin:workspaces_read
32 * spark:devices_write
33 * spark:devices_read
34 * spark:kms
35 * spark-admin:devices_read
36 * spark-admin:workspace_locations_write
37 * spark-admin:licenses_read
38 * spark-admin:telephony_config_read
39 * spark-admin:telephony_config_write
40 * spark-admin:devices_write
41 * spark-admin:people_read
42
43More service app details: https://developer.webex.com/docs/service-apps
44
45Tokens get persisted in room_devices.yml.
46"""
47
48import asyncio
49import logging
50import sys
51import time
52from argparse import ArgumentParser
53from collections import defaultdict
54from functools import reduce
55from itertools import chain
56from os import getcwd, getenv
57from os.path import basename, isfile, join, splitext
58from typing import Optional
59
60from dotenv import load_dotenv
61from yaml import safe_dump, safe_load
62
63from wxc_sdk.as_api import AsWebexSimpleApi
64from wxc_sdk.base import webex_id_to_uuid
65from wxc_sdk.common import OwnerType
66from wxc_sdk.devices import Device
67from wxc_sdk.integration import Integration
68from wxc_sdk.locations import Location
69from wxc_sdk.telephony import NumberListPhoneNumber
70from wxc_sdk.tokens import Tokens
71from wxc_sdk.workspace_locations import WorkspaceLocation
72from wxc_sdk.workspaces import CallingType, Workspace, WorkspaceCalling, WorkspaceSupportedDevices
73
74
75def yml_path() -> str:
76 """
77 Get filename for YML file to cache access and refresh token
78 """
79 return f'{splitext(basename(__file__))[0]}.yml'
80
81
82def env_path() -> str:
83 """
84 Get path to .env file to read service app settings from
85 """
86 return f'{splitext(basename(__file__))[0]}.env'
87
88
89def read_tokens_from_file() -> Optional[Tokens]:
90 """
91 Get service app tokens from cache file, return None if cache does not exist or read fails
92 """
93 path = yml_path()
94 if not isfile(path):
95 return None
96 try:
97 with open(path) as f:
98 data = safe_load(f)
99 tokens = Tokens.model_validate(data)
100 except Exception:
101 return None
102 return tokens
103
104
105def write_tokens_to_file(tokens: Tokens):
106 """
107 Write tokens to cache
108 """
109 with open(yml_path(), mode='w') as f:
110 safe_dump(tokens.model_dump(exclude_none=True), f)
111
112
113def get_access_token() -> Tokens:
114 """
115 Get a new access token using refresh token, service app client id, service app client secret
116 """
117 tokens = Tokens(refresh_token=getenv('SERVICE_APP_REFRESH_TOKEN'))
118 integration = Integration(
119 client_id=getenv('SERVICE_APP_CLIENT_ID'),
120 client_secret=getenv('SERVICE_APP_CLIENT_SECRET'),
121 scopes=[],
122 redirect_url=None,
123 )
124 integration.refresh(tokens=tokens)
125 write_tokens_to_file(tokens)
126 return tokens
127
128
129def get_tokens() -> Optional[Tokens]:
130 """
131 Get tokens from cache or create new access token using service app credentials
132 """
133 # try to read from file
134 tokens = read_tokens_from_file()
135 # .. or create new access token using refresh token
136 if tokens is None:
137 tokens = get_access_token()
138 if tokens.remaining < 24 * 60 * 60:
139 tokens = get_access_token()
140 return tokens
141
142
143def main() -> int:
144 """
145 Main code
146 """
147 # parse args
148 parser = ArgumentParser(prog=basename(__file__), description='CLI tool to manage room device calling entitlements')
149 parser.add_argument(
150 'operation',
151 choices=['show', 'clear'],
152 help='show: show all room devices with their calling settings, clear: remove calling license from devices',
153 )
154 parser.add_argument('--location', type=str, help='work on devices in given location')
155 parser.add_argument(
156 '--wsnames', type=str, help='file name of a file with workspace names to operate on; one name per line'
157 )
158 parser.add_argument('--test', action='store_true', help='test run only')
159 args = parser.parse_args()
160 operation = args.operation
161 test_run = args.test
162 location = args.location
163 ws_names = args.wsnames
164
165 # get tokens; as an alternative you can just get a developer token from developer.webex.com and use:
166 # tokens = '<developer token from developer.webex.com>'
167 load_dotenv(dotenv_path=env_path())
168 err = ''
169 tokens = None
170 try:
171 tokens = get_tokens()
172 except Exception as e:
173 err = f'{e}'
174 if not tokens:
175 print(f'failed to obtain access tokens: {err}', file=sys.stderr)
176 return 1
177
178 async def as_main() -> int:
179 """
180 Async main to be able to use concurrency
181 """
182
183 async def downgrade_workspace(ws: Workspace):
184 """
185 Downgrade one workspace to free calling
186 """
187
188 def log(s: str, file=sys.stdout):
189 print(f'downgrade workspace "{ws.display_name:{ws_name_len}}": {s}', file=file)
190
191 if ws.calling.type != CallingType.webex:
192 raise ValueError(f'calling type is "{ws.calling.type}", not "{CallingType.webex.value}"')
193 if test_run:
194 log('skipping update, test run only')
195 else:
196 log('updating calling settings')
197 update = ws.model_copy(deep=True)
198 update.calling = WorkspaceCalling(type=CallingType.free)
199 update.workspace_location_id = None
200 update.location_id = None
201 await api.workspaces.update(workspace_id=ws.workspace_id, settings=update)
202 log('done')
203
204 async with AsWebexSimpleApi(tokens=tokens) as api:
205 # get list of locations and workspace locations
206 ws_location_list, location_list = await asyncio.gather(
207 api.workspace_locations.list(display_name=location), api.locations.list(name=location)
208 )
209 location_list: list[Location]
210 ws_location_list: list[WorkspaceLocation]
211
212 # validate location argument
213 if location:
214 target_location = next((loc for loc in location_list if loc.name == location), None)
215 target_ws_location = next((loc for loc in ws_location_list if loc.display_name == location), None)
216 if not all((target_ws_location, target_location)):
217 print(f'location "{location}" not found', file=sys.stderr)
218 return 1
219 else:
220 target_location = None
221 target_ws_location = None
222
223 # get workspaces, numbers, and devices (in target location)
224 workspaces, numbers, devices = await asyncio.gather(
225 api.workspaces.list(workspace_location_id=target_ws_location and target_ws_location.id),
226 api.telephony.phone_numbers(
227 location_id=target_location and target_location.location_id, owner_type=OwnerType.place
228 ),
229 api.devices.list(
230 workspace_location_id=target_ws_location and target_ws_location.id, product_type='roomdesk'
231 ),
232 )
233 workspaces: list[Workspace]
234 numbers: list[NumberListPhoneNumber]
235 devices: list[Device]
236
237 # only workspaces supporting desk devices
238 workspaces = [
239 ws for ws in workspaces if ws.supported_devices == WorkspaceSupportedDevices.collaboration_devices
240 ]
241
242 # if a path to a file with workspace names was given, then filter based on the file contents
243 if ws_names:
244 with open(ws_names) as f:
245 workspace_names = set(s_line for line in f if (s_line := line.strip()))
246 workspaces = [ws for ws in workspaces if ws.display_name in workspace_names]
247 if not workspaces:
248 print('No workspaces', file=sys.stderr)
249 return 1
250
251 # only devices in workspaces (no personal devices)
252 devices = [d for d in devices if d.workspace_id is not None]
253
254 # prepare some lookups
255 workspace_locations_by_id: dict[str, WorkspaceLocation] = {wsl.id: wsl for wsl in ws_location_list}
256 numbers_by_workspace_uuid: dict[str, list[NumberListPhoneNumber]] = reduce(
257 lambda r, el: r[webex_id_to_uuid(el.owner.owner_id)].append(el) or r, numbers, defaultdict(list)
258 )
259 devices_by_workspace_id: dict[str, list[Device]] = reduce(
260 lambda r, el: r[el.workspace_id].append(el) or r, devices, defaultdict(list)
261 )
262
263 # sort workspaces by workspace location name and workspace name; workspace location can be unset
264 workspaces.sort(
265 key=lambda ws: (
266 ''
267 if not ws.workspace_location_id
268 else workspace_locations_by_id[ws.workspace_location_id].display_name,
269 ws.display_name,
270 )
271 )
272 # some field lengths for nicer output
273 wsl_name_len = max(len(wsl.display_name) for wsl in ws_location_list)
274 ws_name_len = max(len(ws.display_name) for ws in workspaces)
275
276 # ... chain([1], ...) to avoid max() on empty sequence
277 pn_len = max(chain([1], (len(n.phone_number) for n in numbers if n.phone_number)))
278 ext_len = max(chain([1], (len(n.extension) for n in numbers if n.extension)))
279
280 # print workspaces with workspace locations, numbers, and devices
281 for workspace in workspaces:
282 if not workspace.workspace_location_id:
283 wsl_name = ''
284 else:
285 wsl_name = workspace_locations_by_id[workspace.workspace_location_id].display_name
286 print(
287 f'workspace location "{wsl_name:{wsl_name_len}}", '
288 f'workspace "{workspace.display_name:{ws_name_len}}"'
289 )
290
291 # are there any numbers in that workspace?
292 numbers = numbers_by_workspace_uuid.get(webex_id_to_uuid(workspace.workspace_id))
293 if numbers:
294 for number in numbers:
295 print(
296 f' number: {number.phone_number or "-" * pn_len:{pn_len}}/'
297 f'{number.extension or "-" * ext_len:{ext_len}}'
298 )
299 devices = devices_by_workspace_id.get(workspace.workspace_id)
300 if devices:
301 for device in devices:
302 print(f' device: {device.display_name}')
303
304 if operation == 'show':
305 # we are done here
306 return 0
307
308 # now we want to downgrade (disable calling) on all workspaces
309 print()
310 print('Starting downgrade')
311 results = await asyncio.gather(*[downgrade_workspace(ws) for ws in workspaces], return_exceptions=True)
312
313 # print errors ... if any
314 for ws, result in zip(workspaces, results):
315 ws: Workspace
316 if isinstance(result, Exception):
317 print(f'Failed to downgrade "{ws.display_name:{ws_name_len}}": {result}', file=sys.stderr)
318
319 if any(isinstance(r, Exception) for r in results):
320 return 1
321 return 0
322
323 return asyncio.run(as_main())
324
325
326if __name__ == '__main__':
327 root_logger = logging.getLogger()
328 h = logging.StreamHandler(stream=sys.stderr)
329 h.setLevel(logging.INFO)
330 root_logger.setLevel(logging.INFO)
331 root_logger.addHandler(h)
332
333 # log REST API interactions to file
334 file_fmt = logging.Formatter(fmt='%(asctime)s %(levelname)s %(message)s')
335 file_fmt.converter = time.gmtime
336
337 rest_log_name = join(getcwd(), f'{splitext(basename(__file__))[0]}.log')
338 rest_log_handler = logging.FileHandler(rest_log_name, mode='w')
339 rest_log_handler.setLevel(logging.DEBUG)
340 rest_log_handler.setFormatter(file_fmt)
341 rest_logger = logging.getLogger('wxc_sdk.as_rest')
342 rest_logger.setLevel(logging.DEBUG)
343 rest_logger.addHandler(rest_log_handler)
344
345 exit(main())
Leave spaces with no activity in the last n days
Leave spaces w/o recent activity.
usage: leave_spaces.py [-h] [--days DAYS] [--token TOKEN] [--no_test] [--no_messages]
[--keep KEEP]
leave spaces with no activity
options:
-h, --help show this help message and exit
--days DAYS, -d DAYS days since last activity; default: 1095
--token TOKEN Personal access token to use. If not provided script will try to read
token from WEBEX_ACCESS_TOKEN environment variable.
--no_test Don't test; actually leave the spaces
--no_messages Only leave spaces that have no messages
--keep KEEP, -k KEEP file with list of spaces to keep
Source: leave_spaces.py
1#!/usr/bin/env python
2import asyncio
3import logging
4import os
5import sys
6from argparse import ArgumentParser
7from collections.abc import Iterable
8from datetime import datetime, timedelta
9from typing import Optional
10
11from dateutil import tz
12from dotenv import load_dotenv
13
14from wxc_sdk.as_api import AsWebexSimpleApi
15from wxc_sdk.as_rest import AsRestError
16from wxc_sdk.common import RoomType
17from wxc_sdk.messages import Message
18from wxc_sdk.rooms import Room
19from wxc_sdk.teams import Team
20
21
22async def last_n_messages(api: AsWebexSimpleApi, space: Room, n: int = 10) -> list[Message]:
23 """
24 Get last n messages in a space
25 """
26 messages = []
27 if not n:
28 return messages
29 async for message in api.messages.list_gen(room_id=space.id, max=min(n, 1000)):
30 messages.append(message)
31 n -= 1
32 if not n:
33 break
34 return messages
35
36
37async def latest_message_in_space(api: AsWebexSimpleApi, space: Room) -> Optional[Message]:
38 """
39 Get latest message in a space
40 """
41 last_messages = await last_n_messages(api, space, 1)
42 if not last_messages:
43 return None
44 return last_messages[0]
45
46
47async def verify_leave_space(api: AsWebexSimpleApi, space: Room, cutoff: datetime) -> bool:
48 """
49 Get latest message in a space and check if it's older than cutoff
50 """
51 latest = await latest_message_in_space(api, space)
52 if not latest:
53 return True
54 return latest.created <= cutoff
55
56
57async def leave_spaces(api: AsWebexSimpleApi, spaces: Iterable[Room]):
58 """
59 Leave some spaces
60 """
61 me = await api.people.me()
62 person_id = me.person_id
63
64 async def leave_space(space: Room):
65 # get membership to delete
66 try:
67 memberships = await api.membership.list(room_id=space.id, person_id=person_id)
68 if not memberships:
69 print(f'No membership in space "{space.title}", skipping', file=sys.stderr)
70 return
71 # delete membership
72 print(f'Leaving space "{space.title}"...')
73 await api.membership.delete(memberships[0].id)
74 print(f'Left space "{space.title}"')
75 except AsRestError as e:
76 print(f'Error leaving space "{space.title}": {e}', file=sys.stderr)
77 return
78
79 await asyncio.gather(*[leave_space(space) for space in spaces])
80
81
82async def as_main():
83 # parse args
84 parser = ArgumentParser(description='leave spaces with no activity')
85 parser.add_argument(
86 '--days', '-d', type=int, required=False, default=3 * 365, help=f'days since last activity; default: {3 * 365}'
87 )
88 parser.add_argument(
89 '--token',
90 type=str,
91 required=False,
92 help='Personal access token to use. If not provided script will try to read token from '
93 'WEBEX_ACCESS_TOKEN environment variable.',
94 )
95 parser.add_argument('--no_test', action='store_true', required=False, help="Don't test; actually leave the spaces")
96 parser.add_argument(
97 '--no_messages', action='store_true', required=False, help='Only leave spaces that have no messages'
98 )
99 parser.add_argument('--keep', '-k', type=str, required=False, help='file with list of spaces to keep')
100 args = parser.parse_args()
101
102 load_dotenv(override=True)
103 token = args.token or os.getenv('WEBEX_ACCESS_TOKEN')
104 if not token:
105 print('No token provided and WEBEX_ACCESS_TOKEN not set in environment', file=sys.stderr)
106 exit(1)
107 cutoff = datetime.now(tz=tz.UTC) - timedelta(days=args.days)
108 if args.keep:
109 try:
110 with open(args.keep) as f:
111 keep = set(l.strip() for l in f if l)
112 except FileNotFoundError:
113 print(f'file "{args.keep}" not found', file=sys.stderr)
114 exit(1)
115 else:
116 keep = set()
117 async with AsWebexSimpleApi(tokens=token, concurrent_requests=100) as api:
118 # check token
119 try:
120 await api.people.me()
121 except AsRestError as e:
122 if e.status == 401:
123 print(f'Token seems to be invalid: {e}', file=sys.stderr)
124 exit(1)
125 raise
126
127 print('Listing spaces and teams...')
128 spaces, teams_list = await asyncio.gather(api.rooms.list(max=1000), api.teams.list())
129 # we can't leave direct spaces anyway; so ignore them from the start
130 spaces = [space for space in spaces if space.type != RoomType.direct]
131 print(f'Found {len(spaces)} spaces')
132 print(f'Found {len(teams_list)} teams')
133 teams = {team.id: team for team in teams_list}
134
135 # identify spaces to leave based on last activity
136 leave = [space for space in spaces if space.last_activity and space.last_activity <= cutoff]
137
138 # sometimes the last_activity information seems to be "off" try to get the latest message for each space to
139 # verify latest activity
140 print(f'Getting latest message for {len(leave)} spaces...')
141 latest_messages = await asyncio.gather(*[latest_message_in_space(api, space) for space in leave])
142 validated_leave: list[tuple[Room, Optional[datetime]]] = []
143 for space, latest_message in zip(leave, latest_messages):
144 space: Room
145 latest_message: Message
146 # don't leave general spaces of Teams
147 # A general space is a space that has the same name as the Team it belongs to
148 team: Team
149 if space.team_id and (team := teams.get(space.team_id, None)) and space.title == team.name:
150 print(f'Don\'t leave general space "{space.title}" of Team "{team.name}"')
151 continue
152
153 # don't leave spaces in keep file
154 if space.title.strip() in keep:
155 print(f'Don\'t leave space "{space.title}", found space name in keep file')
156 continue
157
158 # if only spaces with no messages should be considered and we have a message in the space then don't leave
159 if args.no_messages and latest_message is not None:
160 print(
161 f'Space "{space.title}" has messages, latest is {latest_message.created:%Y.%m.%d %H:%M:%S}, '
162 f'last activity is {space.last_activity:%Y.%m.%d %H:%M:%S} - not leaving'
163 )
164 continue
165
166 # if no latest message or latest message is older than cutoff, consider leaving
167 if latest_message is None or latest_message.created <= cutoff:
168 latest_messages: Message
169 validated_leave.append((space, latest_message and latest_message.created))
170 continue
171 print(
172 f'Latest message in "{space.title}" is {latest_message.created:%Y.%m.%d %H:%M:%S}, '
173 f'last activity is {space.last_activity:%Y.%m.%d %H:%M:%S} - not leaving'
174 )
175 print()
176 print(f'Found {len(validated_leave)} spaces to leave:')
177
178 # sort spaces by latest activity or latest message
179 validated_leave.sort(key=lambda x: max(x[0].last_activity, x[1]) if x[1] else x[0].last_activity, reverse=False)
180
181 for space, latest in validated_leave:
182 print(f'Leave "{space.title}"')
183 print(
184 f' last activity {space.last_activity:%Y.%m.%d %H:%M:%S}, latest message '
185 f'{f"{latest:%Y.%m.%d %H:%M:%S}" if latest else "none"}'
186 )
187
188 if args.no_test:
189 # actually try to leave the spaces
190 await leave_spaces(api, (space for space, _ in validated_leave))
191
192
193if __name__ == '__main__':
194 logging.basicConfig(level=logging.INFO)
195 asyncio.run(as_main())
Provision location level access codes from a CSV file
Provision location level access codes from a CSV file
Usage: access_codes.py [OPTIONS] CSV_FILE
Provision location level access codes from a CSV file
╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
│ * csv_file PATH CSV file with access codes [required] │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
│ --dry-run Do not make any changes │
│ --token TEXT Access token can be provided using --token argument, set │
│ in WEBEX_ACCESS_TOKEN environment variable or can be a │
│ service app token. For the latter set environment │
│ variables ('SERVICE_APP_REFRESH_TOKEN', │
│ 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET'). │
│ Environment variables can also be set in │
│ service_app.env │
│ --install-completion Install completion for the current shell. │
│ --show-completion Show completion for the current shell, to copy it or │
│ customize the installation. │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
Source: access_codes.py
1#!/usr/bin/env -S uv run --script
2# /// script
3# requires-python = ">=3.11,<3.14"
4# dependencies = [
5# "typer",
6# "wxc-sdk",
7# "python-dotenv",
8# ]
9# ///
10"""
11Provision location level access codes from a CSV file
12the CSV file should have the following columns:
13- location: location name
14- code: the access code
15- description: a description of the access code
16- operation: one of 'add', 'delete'
17
18Usage: access_codes.py [OPTIONS] CSV_FILE
19
20 Provision location level access codes from a CSV file
21
22╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
23│ * csv_file PATH CSV file with access codes [required] │
24╰────────────────────────────────────────────────────────────────────────────────────────────╯
25╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
26│ --dry-run Do not make any changes │
27│ --token TEXT Access token can be provided using --token argument, set │
28│ in WEBEX_ACCESS_TOKEN environment variable or can be a │
29│ service app token. For the latter set environment │
30│ variables ('SERVICE_APP_REFRESH_TOKEN', │
31│ 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET'). │
32│ Environment variables can also be set in │
33│ access_codes.env │
34│ --install-completion Install completion for the current shell. │
35│ --show-completion Show completion for the current shell, to copy it or │
36│ customize the installation. │
37│ --help Show this message and exit. │
38╰────────────────────────────────────────────────────────────────────────────────────────────╯
39
40"""
41import asyncio
42import csv
43import logging
44import sys
45from collections import defaultdict
46from functools import reduce
47from os.path import basename
48from pathlib import Path
49from typing import Literal, Optional
50
51import typer
52from dotenv import load_dotenv
53from pydantic import BaseModel, TypeAdapter
54from service_app import SERVICE_APP_ENVS, env_path, get_tokens
55
56from wxc_sdk.as_api import AsWebexSimpleApi
57from wxc_sdk.common import AuthCode
58from wxc_sdk.telephony.location import TelephonyLocation
59from wxc_sdk.tokens import Tokens
60
61
62class CSVOperation(BaseModel):
63 """
64 row in CSV file: Operation to be performed on an access code
65 """
66 location: str
67 code: str
68 description: str
69 operation: Literal['add', 'delete']
70
71
72async def process_operations(*, operations: list[CSVOperation], token: str, dry_run:bool):
73 """
74 Process the operations from the CSV file in parallel
75 """
76
77 # noinspection PyShadowingNames
78 async def process_one_location(*, location: str, operations: list[CSVOperation]):
79 """
80 process operations for one location
81 """
82 # find the location
83 loc = locations.get(location)
84 if loc is None:
85 print(f'Location {location} not found', sys.stderr)
86 return
87
88 # get existing codes in location
89 ac_codes: dict[str, AuthCode] = {ac.code: ac
90 for ac in await api.telephony.access_codes.read(location_id=loc.location_id)}
91
92 tasks = []
93 delete_operations = [operation
94 for operation in operations
95 if operation.operation == 'delete']
96 non_existing = [op
97 for op in delete_operations
98 if op.code not in ac_codes]
99 if non_existing:
100 print(f'Location {location}: access codes not found: {", ".join(op.code for op in non_existing)}',
101 file=sys.stderr)
102 to_delete = [ac
103 for ac in delete_operations
104 if ac.code in ac_codes]
105 print(f'Location {location}: deleting access codes: {", ".join(op.code for op in to_delete)}')
106 if to_delete:
107 tasks.append(api.telephony.access_codes.delete_codes(location_id=loc.location_id,
108 access_codes=[op.code for op in to_delete]))
109
110 add_operations = [operation
111 for operation in operations
112 if operation.operation == 'add']
113 existing = [op for op in add_operations if op.code in ac_codes]
114 if existing:
115 print(f'Location {location}: access codes already exist: {", ".join(op.code for op in existing)}',
116 file=sys.stderr)
117 to_add = [ac for ac in add_operations if ac.code not in ac_codes]
118 print(f'Location {location}: adding access codes: {", ".join(op.code for op in to_add)}')
119 if to_add:
120 tasks.append(api.telephony.access_codes.create(location_id=loc.location_id,
121 access_codes=[AuthCode(code=op.code,
122 description=op.description)
123 for op in to_add]))
124 if tasks and not dry_run:
125 await asyncio.gather(*tasks)
126
127 async with AsWebexSimpleApi(tokens=token) as api:
128 # get locations
129 locations: dict[str, TelephonyLocation] = {loc.name: loc for loc in await api.telephony.locations.list()}
130
131 # group operations by location
132 operations_by_location: dict[str, list[CSVOperation]] = reduce(
133 lambda acc, op: acc[op.location].append(op) or acc,
134 operations,
135 defaultdict(list))
136
137 # process all locations in parallel
138 await asyncio.gather(*[process_one_location(location=loc, operations=ops)
139 for loc, ops in operations_by_location.items()])
140 return
141
142
143app = typer.Typer()
144
145
146@app.command(name=basename(__file__),
147 help='Provision location level access codes from a CSV file')
148def main(csv_file: Path = typer.Argument(exists=True,
149 help='CSV file with access codes'),
150 dry_run: bool = typer.Option(False, '--dry-run',
151 help='Do not make any changes'),
152
153 token: Optional[str] = typer.Option(None, '--token',
154 help=f'Access token can be provided using --token argument, '
155 f'set in WEBEX_ACCESS_TOKEN environment variable or '
156 f'can be a service app token. For the latter set '
157 f'environment variables {SERVICE_APP_ENVS}. '
158 f'Environment variables can also be set in '
159 f'{env_path()}')):
160 # get access token
161 load_dotenv(env_path(), override=True)
162 tokens = get_tokens() if token is None else Tokens(access_token=token)
163 if tokens is None:
164 print(f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable or '
165 f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
166 f'variables can '
167 f'also be set in {env_path()}', file=sys.stderr)
168 exit(1)
169
170 csv_file = str(csv_file)
171 # read CSV file
172 with open(csv_file) as f:
173 reader = csv.DictReader(f)
174 data = list(reader)
175
176 # validate the data from the CSV file
177 csv_operations = TypeAdapter(list[CSVOperation]).validate_python(data)
178
179 asyncio.run(process_operations(operations=csv_operations,
180 token=tokens,
181 dry_run=dry_run))
182
183
184if __name__ == '__main__':
185 logging.basicConfig(level=logging.DEBUG)
186 app()
Bulk assign/unassign agents to/from call queues
Bulk assign/unassign agents to/from call queues
usage: queue_agents.py [-h] (--add | --remove) --queues QUEUES --agent AGENT [--token TOKEN]
[--debug] [--har] [--dry-run]
Bulk manage agents in call queues
options:
-h, --help show this help message and exit
--add Add agent(s) to specified queues
--remove Remove agent(s) from specified queues
--queues QUEUES Text file with list of call queue names (one per line). Each line should
be the a location name and a queue name separated by a colon. Example:
"Location1:Queue1"
--agent AGENT Single agent email address or text file with agent email addresses (one
per line)
--token TOKEN admin access token to use. If no token is given then the script will try
to use service app tokens. The service app parameters are read from
environment variables SERVICE_APP_ID, SERvICE_APP_SECRET, and
SERVICE_APP_REFRESH. These parameters can also be defined in
"queue_agents.env" file. Service app tokens are cached in
"queue_agents.yml". If no access token is passed and no service app is
defined then the script falls back to try to read an access token from
environment variable WEBEX_ACCESS_TOKEN.
--debug Enable debug output
--har Enable HAR output
--dry-run Simulate the operation without making actual changes
Example: queue_agents.py --queues queues.txt --agent agents.txt --add
Source: queue_agents.py
1#!/usr/bin/env python3
2"""
3Bulk manage agents in call queues
4
5usage: queue_agents.py [-h] (--add | --remove) --queues QUEUES --agent AGENT [--token TOKEN]
6
7Bulk manage agents in call queues
8
9options:
10 -h, --help show this help message and exit
11 --add Add agent(s) to specified queues
12 --remove Remove agent(s) from specified queues
13 --queues QUEUES Text file with list of call queue names (one per line). Each line should be the a
14 location name and a queue name separated by a colon. Example: "Location1:Queue1"
15 --agent AGENT Single agent email address or text file with agent email addresses (one per line)
16 --token TOKEN admin access token to use. If no token is given then the script will try to use
17 service app tokens. The service app parameters are read from environment variables
18 SERVICE_APP_ID, SERvICE_APP_SECRET, and SERVICE_APP_REFRESH. These parameters can
19 also be defined in "queue_agents.env" file. Service app tokens are cached in
20 "queue_agents.yml". If no access token is passed and no service app is defined then
21 the script falls back to try to read an access token from environment variable
22 WEBEX_ACCESS_TOKEN.
23
24Example: queue_agents.py --queues queues.txt --agent agents.txt --add
25"""
26
27import argparse
28import asyncio
29import logging
30import os
31import sys
32from collections.abc import Iterable
33from contextlib import contextmanager
34from typing import List, Optional
35
36import yaml
37from dotenv import load_dotenv
38
39from wxc_sdk import Tokens
40from wxc_sdk.as_api import AsWebexSimpleApi
41from wxc_sdk.har_writer import HarWriter
42from wxc_sdk.integration import Integration
43from wxc_sdk.people import Person
44from wxc_sdk.telephony.callqueue import CallQueue
45from wxc_sdk.telephony.hg_and_cq import Agent
46
47
48def yml_path() -> str:
49 """
50 Get filename for YML file to cache access and refresh token
51 """
52 return f'{os.path.splitext(os.path.basename(__file__))[0]}.yml'
53
54
55def env_path() -> str:
56 """
57 Get path to .env file to read service app settings from
58 :return:
59 """
60 return f'{os.path.splitext(os.path.basename(__file__))[0]}.env'
61
62
63def read_tokens_from_file() -> Optional[Tokens]:
64 """
65 Get service app tokens from cache file, return None if cache does not exist
66 """
67 path = yml_path()
68 if not os.path.isfile(path):
69 return None
70 try:
71 with open(path) as f:
72 data = yaml.safe_load(f)
73 tokens = Tokens.model_validate(data)
74 except Exception:
75 return None
76 return tokens
77
78
79def write_tokens_to_file(tokens: Tokens):
80 """
81 Write tokens to cache
82 """
83 with open(yml_path(), mode='w') as f:
84 yaml.safe_dump(tokens.model_dump(exclude_none=True), f)
85
86
87def get_access_token() -> Optional[Tokens]:
88 """
89 Get a new access token using refresh token, service app client id, service app client secret
90 """
91 env_vars = ('SERVICE_APP_ID', 'SERVICE_APP_SECRET', 'SERVICE_APP_REFRESH')
92 app_id, app_secret, app_refresh = (os.getenv(var) for var in env_vars)
93 if not all((app_id, app_secret, app_refresh)):
94 return None
95 tokens = Tokens(refresh_token=app_refresh)
96 integration = Integration(client_id=app_id, client_secret=app_secret, scopes=[], redirect_url=None)
97 integration.refresh(tokens=tokens)
98 write_tokens_to_file(tokens)
99 return tokens
100
101
102def get_tokens() -> Optional[Tokens]:
103 """
104 Get tokens from cache or create new access token using service app credentials
105 """
106 # try to read from file
107 tokens = read_tokens_from_file()
108 # .. or create new access token using refresh token
109 if tokens is None:
110 tokens = get_access_token()
111 if tokens is None:
112 return None
113 if tokens.remaining < 24 * 60 * 60:
114 tokens = get_access_token()
115 return tokens
116
117
118def read_file_lines(filename: str) -> List[str]:
119 """
120 Read lines from a file, stripping whitespace and removing empty lines.
121
122 Args:
123 filename (str): Path to the input file
124
125 Returns:
126 List[str]: Cleaned list of lines from the file
127
128 Raises:
129 FileNotFoundError: If the specified file does not exist
130 PermissionError: If there are permission issues reading the file
131 """
132 try:
133 with open(filename) as f:
134 return [line.strip() for line in f if line.strip()]
135 except FileNotFoundError:
136 print(f"Error: File '{filename}' not found.", file=sys.stderr)
137 sys.exit(1)
138 except PermissionError:
139 print(f"Error: Permission denied reading file '{filename}'.", file=sys.stderr)
140 sys.exit(1)
141
142
143async def process_one_queue(
144 *, api: AsWebexSimpleApi, queue: CallQueue, agents: list[Person], action: str, dry_run: bool = True
145):
146 """
147 Process adding or removing agents from a single call queue.
148 """
149 # get agents
150 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
151
152 agent: Agent
153 agents_in_queue = set(agent.agent_id for agent in details.agents)
154 agent_ids = set(person.person_id for person in agents)
155 if action == 'add':
156 agents_to_add = agent_ids - agents_in_queue
157 if not agents_to_add:
158 print(f'All agents are already in queue {queue.name} in location {queue.location_name}. Skipping.')
159 return
160 agent_str = ', '.join(
161 sorted(
162 f'{person.display_name}({person.emails[0]})' for person in agents if person.person_id in agents_to_add
163 )
164 )
165 print(f'Adding agents to queue {queue.name} in location {queue.location_name}: {agent_str}')
166 details.agents.extend([Agent(agent_id=agent_id) for agent_id in agents_to_add])
167 else:
168 agents_to_remove = agents_in_queue & agent_ids
169 if not agents_to_remove:
170 print(f'No agents to remove from queue {queue.name} in location {queue.location_name}. Skipping.')
171 return
172 agent_str = ', '.join(
173 sorted(
174 f'{person.display_name}({person.emails[0]})'
175 for person in agents
176 if person.person_id in agents_to_remove
177 )
178 )
179 print(f'Removing agents from queue {queue.name} in location {queue.location_name}: {agent_str}')
180 details.agents = [agent for agent in details.agents if agent.agent_id not in agents_to_remove]
181 update = CallQueue(agents=details.agents)
182 if not dry_run:
183 await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id, update=update)
184
185
186async def validate_queues(api: AsWebexSimpleApi, queues: Iterable[str]) -> list[CallQueue]:
187 """
188 Validate queue names and return a list of CallQueue objects
189 """
190 # validate queue names
191 existing_queues = {f'{queue.location_name}:{queue.name}': queue for queue in await api.telephony.callqueue.list()}
192 validated_queues = []
193 for queue in queues:
194 if queue not in existing_queues:
195 print(f"Queue '{queue}' does not exist. Skipping.", file=sys.stderr)
196 else:
197 validated_queues.append(existing_queues[queue])
198
199 return validated_queues
200
201
202async def validate_users(api: AsWebexSimpleApi, users: Iterable[str]) -> list[Person]:
203 """
204 Validate user emails and return a list of Person objects
205 """
206 existing_users = {user.emails[0].lower(): user for user in await api.people.list()}
207 validated_users = []
208 for user in users:
209 if user.lower() not in existing_users:
210 print(f"User '{user}' does not exist. Skipping.", file=sys.stderr)
211 else:
212 validated_users.append(existing_users[user.lower()])
213
214 return validated_users
215
216
217def main():
218 """
219 Main CLI script entry point for managing call queue agents.
220 """
221 parser = argparse.ArgumentParser(
222 description='Bulk manage agents in call queues',
223 epilog='Example: %(prog)s --queues queues.txt --agent agents.txt --add',
224 )
225
226 # Mutually exclusive group for add/remove actions
227 action_group = parser.add_mutually_exclusive_group(required=True)
228 action_group.add_argument(
229 '--add', action='store_const', const='add', dest='action', help='Add agent(s) to specified queues'
230 )
231 action_group.add_argument(
232 '--remove', action='store_const', const='remove', dest='action', help='Remove agent(s) from specified queues'
233 )
234
235 # Input source arguments
236 parser.add_argument(
237 '--queues',
238 required=True,
239 help='Text file with list of call queue names (one per line). Each line should be the a '
240 'location name and a queue name separated by a colon. Example: "Location1:Queue1"',
241 )
242 parser.add_argument(
243 '--agent',
244 required=True,
245 help='Single agent email address or text file with agent email addresses (one per line)',
246 )
247 parser.add_argument(
248 '--token',
249 type=str,
250 required=False,
251 help=f'admin access token to use. If no token is given then the script will try to use '
252 f'service app tokens. The service app parameters are read from environment variables '
253 f'SERVICE_APP_ID, SERvICE_APP_SECRET, and SERVICE_APP_REFRESH. These parameters can also '
254 f'be defined in "{os.path.splitext(os.path.basename(__file__))[0]}.env" file. Service '
255 f'app tokens are cached in "'
256 f'{os.path.splitext(os.path.basename(__file__))[0]}.yml". If no access token is passed '
257 f'and no service app '
258 f'is defined then the script falls back to try to read an access token from environment '
259 f'variable WEBEX_ACCESS_TOKEN.',
260 )
261
262 # Debug and HAR output arguments
263 parser.add_argument('--debug', action='store_true', help='Enable debug output')
264 parser.add_argument('--har', action='store_true', help='Enable HAR output')
265
266 # fry run option
267 parser.add_argument('--dry-run', action='store_true', help='Simulate the operation without making actual changes')
268
269 # Parse arguments
270 args = parser.parse_args()
271
272 # Get access token
273 token = args.token
274 load_dotenv(os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env'))
275
276 token = token or (tokens := get_tokens()) and tokens.access_token
277
278 # Read queues from file
279 queues = read_file_lines(args.queues)
280
281 # Determine if agent is a file or a single agent email
282 try:
283 agents = read_file_lines(args.agent) if os.path.isfile(args.agent) else [args.agent]
284 except FileNotFoundError:
285 agents = [args.agent]
286
287 if args.debug:
288 logging.basicConfig(level=logging.DEBUG)
289
290 async def as_main():
291 """
292 Async main entry point
293 """
294
295 @contextmanager
296 def har_writer():
297 """
298 optional context manager to write HAR file
299 """
300 if args.har:
301 with HarWriter(f'{os.path.splitext(os.path.basename(__file__))[0]}.har', api):
302 yield None
303 else:
304 yield None
305
306 async with AsWebexSimpleApi(tokens=token) as api:
307 with har_writer():
308 # validate queues and agents
309 validated_queues, validated_agents = await asyncio.gather(
310 validate_queues(api, queues), validate_users(api, agents)
311 )
312 if not validated_queues:
313 print('No valid queues found. Exiting.', file=sys.stderr)
314 sys.exit(1)
315 if not validated_agents:
316 print('No valid agents found. Exiting.', file=sys.stderr)
317 sys.exit(1)
318 validated_queues: List[CallQueue]
319 validated_agents: List[Person]
320
321 # process all queues in parallel
322 await asyncio.gather(
323 *[
324 process_one_queue(
325 api=api, queue=queue, agents=validated_agents, action=args.action, dry_run=args.dry_run
326 )
327 for queue in validated_queues
328 ],
329 return_exceptions=False,
330 )
331 # with
332 return
333
334 asyncio.run(as_main())
335
336 print(f'Completed {args.action} operation for {len(agents)} agent(s) across {len(queues)} queue(s).')
337
338
339if __name__ == '__main__':
340 main()
Bulk add phone numbers to locations
Bulk add TNs to Webex Calling locations
Usage: add_numbers.py [OPTIONS] FILE
Add TNs to Webex Calling locations
╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
│ * file PATH CSV file with location names and TNs [required] │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
│ --dry-run Do not make any changes │
│ --verbose Print debug information │
│ --log-file PATH Log file. If extension is .har, log in HAR format │
│ --token TEXT Access token can be provided using --token argument, set │
│ in WEBEX_ACCESS_TOKEN environment variable or can be a │
│ service app token. For the latter set environment │
│ variables ('SERVICE_APP_REFRESH_TOKEN', │
│ 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET'). │
│ Environment variables can also be set in add_numbers.env │
│ --inactive Add TNs as inactive │
│ --install-completion Install completion for the current shell. │
│ --show-completion Show completion for the current shell, to copy it or │
│ customize the installation. │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
Example: ./add_numbers.py add_numbers.csv --log-file add_numbers.har --dry-run
Source: add_numbers.py
1#!/usr/bin/env -S uv run --script
2#
3# /// script
4# requires-python = ">=3.11,<3.14"
5# dependencies = [
6# "python-dotenv",
7# "typer",
8# "wxc-sdk",
9# ]
10# ///
11"""
12Add TNs to a locations.
13
14 Usage: add_numbers.py [OPTIONS] FILE
15
16 Add TNs to Webex Calling locations
17
18╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
19│ * file PATH CSV file with location names and TNs [required] │
20╰────────────────────────────────────────────────────────────────────────────────────────────╯
21╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
22│ --dry-run Do not make any changes │
23│ --verbose Print debug information │
24│ --log-file PATH Log file. If extension is .har, log in HAR format │
25│ --token TEXT Access token can be provided using --token argument, set │
26│ in WEBEX_ACCESS_TOKEN environment variable or can be a │
27│ service app token. For the latter set environment │
28│ variables ('SERVICE_APP_REFRESH_TOKEN', │
29│ 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET'). │
30│ Environment variables can also be set in add_numbers.env │
31│ --inactive Add TNs as inactive │
32│ --install-completion Install completion for the current shell. │
33│ --show-completion Show completion for the current shell, to copy it or │
34│ customize the installation. │
35│ --help Show this message and exit. │
36╰────────────────────────────────────────────────────────────────────────────────────────────╯
37
38 Example: ./add_numbers.py add_numbers.csv --log-file add_numbers.har --dry-run
39
40"""
41
42import asyncio
43import csv
44import logging
45import os
46import sys
47from collections import Counter, defaultdict
48from contextlib import contextmanager
49from functools import wraps
50from itertools import chain
51from pathlib import Path
52from typing import Optional
53
54import typer
55from dotenv import load_dotenv
56from service_app import SERVICE_APP_ENVS, env_path, get_tokens
57
58from wxc_sdk.as_api import AsWebexSimpleApi
59from wxc_sdk.common import NumberState
60from wxc_sdk.har_writer import HarWriter
61from wxc_sdk.tokens import Tokens
62
63BATCH_SIZE = 10
64
65
66@contextmanager
67def setup_logging(*, api: AsWebexSimpleApi, verbose: bool, log_file: Optional[Path]):
68 """
69 Set up logging
70 """
71
72 @contextmanager
73 def file_handler():
74 if not log_file:
75 yield
76 else:
77 # log to file or to HAR
78 log_file_str = str(log_file)
79 if os.path.splitext(log_file_str)[-1].lower() == '.har':
80 with HarWriter(api=api, path=log_file_str):
81 yield
82 else:
83 f_handler = logging.FileHandler(log_file_str)
84 f_handler.setLevel(logging.DEBUG)
85 f_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
86 logging.getLogger().addHandler(f_handler)
87 yield
88 return
89
90 logging.getLogger().setLevel(logging.DEBUG)
91 # create a console logging handler
92 console_handler = logging.StreamHandler()
93 console_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
94 # console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
95 logging.getLogger().addHandler(console_handler)
96 with file_handler():
97 yield
98
99
100def read_csv(file: str) -> dict[str, list[str]]:
101 """
102 Read CSV file with location names and TNs
103 """
104 if not os.path.isfile(file):
105 logging.error(f'File {file} does not exist')
106 exit(1)
107 err = False
108 locations_and_tns: dict[str, list[str]] = defaultdict(list)
109 with open(file) as f:
110 reader = csv.reader(f)
111 for row in reader:
112 if not row:
113 continue
114 if len(row) != 2:
115 logging.error(f'Invalid row: {row}')
116 err = True
117 continue
118 location, tn = row
119 logging.debug(f'Location: {location}, TN: {tn}')
120 locations_and_tns[location].append(tn)
121 if err:
122 logging.error('Errors in the CSV file')
123 exit(1)
124 return locations_and_tns
125
126
127async def verify_location(api: AsWebexSimpleApi, location_name: str) -> Optional[str]:
128 """
129 Verify that a location exists and return location id
130 """
131 try:
132 location = next(
133 (l for l in await api.telephony.locations.list(name=location_name) if l.name == location_name), None
134 )
135 return location and location.location_id
136 except Exception as e:
137 logging.error(f'Failed to get location {location_name}: {e}')
138 return None
139
140
141async def validate_tns(api: AsWebexSimpleApi, tns: list[str]) -> bool:
142 """
143 Validate TNs and return validity
144 """
145 tn_counter = Counter(tns)
146 err = False
147 for tn, count in tn_counter.items():
148 if count > 1:
149 err = True
150 logging.error(f'TN {tn} is duplicated')
151 if err:
152 return False
153
154 # validate numbers in batches
155 try:
156 validations = await asyncio.gather(
157 *[api.telephony.validate_phone_numbers(tns[i : i + BATCH_SIZE]) for i in range(0, len(tns), BATCH_SIZE)]
158 )
159 except Exception as e:
160 logging.error(f'Failed to validate TNs: {e}')
161 return False
162
163 ok_tns = []
164 err = False
165 for status in chain.from_iterable(v.phone_numbers for v in validations):
166 if status.ok:
167 ok_tns.append(status.phone_number)
168 else:
169 err = True
170 logging.error(f'TN {status.phone_number}: {status.state} ')
171 return not err
172
173
174async def add_tns(api: AsWebexSimpleApi, location_id: str, tns: list[str], inactive: bool):
175 """
176 Add TNs to a location
177 """
178 number_state = NumberState.inactive if inactive else NumberState.active
179 try:
180 # add TNs in batches
181 await asyncio.gather(
182 *[
183 api.telephony.location.number.add(
184 location_id=location_id, phone_numbers=tns[i : i + BATCH_SIZE], state=number_state
185 )
186 for i in range(0, len(tns), BATCH_SIZE)
187 ]
188 )
189 except Exception as e:
190 logging.error(f'Failed to add TNs: {e}')
191 return False
192 return True
193
194
195def async_command(f):
196 @wraps(f)
197 def wrapper(*args, **kwargs):
198 return asyncio.run(f(*args, **kwargs))
199
200 return wrapper
201
202
203app = typer.Typer()
204
205
206@app.command(
207 epilog=f'Example: {sys.argv[0]} add_numbers.csv --log-file add_numbers.har --dry-run',
208 help='Add TNs to Webex Calling locations',
209)
210@async_command
211async def add_numbers(
212 file: Path = typer.Argument(exists=True, help='CSV file with location names and TNs'),
213 dry_run: bool = typer.Option(False, '--dry-run', help='Do not make any changes'),
214 verbose: bool = typer.Option(False, '--verbose', help='Print debug information'),
215 log_file: Optional[Path] = typer.Option(
216 None, '--log-file', help='Log file. If extension is .har, log in HAR format'
217 ),
218 token: Optional[str] = typer.Option(
219 None,
220 '--token',
221 help=f'Access token can be provided using --token argument, '
222 f'set in WEBEX_ACCESS_TOKEN environment variable or '
223 f'can be a service app token. For the latter set '
224 f'environment variables {SERVICE_APP_ENVS}. '
225 f'Environment variables can also be set in '
226 f'{env_path()}',
227 ),
228 inactive: bool = typer.Option(False, '--inactive', help='Add TNs as inactive'),
229):
230 """
231 Add TNs to Webex Calling locations
232 """
233 load_dotenv(env_path(), override=True)
234 tokens = get_tokens() if token is None else Tokens(access_token=token)
235 if tokens is None:
236 print(
237 f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable or '
238 f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
239 f'variables can '
240 f'also be set in {env_path()}',
241 file=sys.stderr,
242 )
243 exit(1)
244
245 async with AsWebexSimpleApi(tokens=tokens) as api:
246 with setup_logging(api=api, verbose=verbose, log_file=log_file):
247 # validate the access token
248 try:
249 await api.people.me()
250 except Exception as e:
251 logging.error(f'Failed to get identity: {e}')
252 logging.error('Token might be invalid')
253 exit(1)
254 # read location names and TNs from a CSV file
255 logging.info(f'Reading file {file}')
256 locations_and_tns = read_csv(file)
257
258 # validate the location names
259 logging.info('Validating location names...')
260 location_ids = await asyncio.gather(
261 *[verify_location(api, location) for location in locations_and_tns.keys()]
262 )
263 if not all(location_ids):
264 for location_name, location_id in zip(locations_and_tns.keys(), location_ids):
265 if not location_id:
266 logging.error(f'Location {location_name} does not exist')
267 exit(1)
268
269 # validate the TNs
270 logging.info('Validating TNs...')
271 tn_list = list(chain.from_iterable(locations_and_tns.values()))
272 validation = await validate_tns(api, tn_list)
273 if not validation:
274 exit(1)
275
276 # add TNs to the locations
277 if not dry_run:
278 logging.info('Adding TNs...')
279 results = await asyncio.gather(
280 *[
281 add_tns(api, location_id, tns, inactive)
282 for location_id, tns in zip(location_ids, locations_and_tns.values())
283 ]
284 )
285 if not all(results):
286 exit(1)
287 #
288 logging.info('Done')
289 # end of logging context
290 # end of API context
291 return
292
293
294if __name__ == '__main__':
295 app()
Bulk add outgoing call permission patterns to locations
Bulk add outgoing call permission patterns to locations
usage: ocp_pattern.py [-h] [--token TOKEN] [--dry-run] [--verbose] [--log-file LOG_FILE]
location patterns
Provision OCP patterns for one or all locations
positional arguments:
location Location to provision OCP patterns for. Use "all" to provision for
all locations
patterns File with patterns to provision. File has one pattern per line. Use
"remove" to remove all patterns previously provisioned by the
script
optional arguments:
-h, --help show this help message and exit
--token TOKEN Access token can be provided using --token argument, set in
WEBEX_ACCESS_TOKEN environment variable or can be a service app
token. For the latter set environment variables
('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID',
'SERVICE_APP_CLIENT_SECRET'). Environment variables can also be set
in ocp_pattern.env
--dry-run Dry run, do not provision anything
--verbose Print debug information
--log-file LOG_FILE Log file. If extension is .har, log in HAR format
Example: ocp_pattern.py all ocp_pattern.txt --log-file ocp_pattern.har --dry-run
Source: ocp_pattern.py
1#!/usr/bin/env python3
2"""
3Provision OCP patterns for one or all locations
4
5usage: ocp_pattern.py [-h] [--token TOKEN] [--dry-run] [--verbose] [--log-file LOG_FILE]
6 location patterns
7
8Provision OCP patterns for one or all locations
9
10positional arguments:
11 location Location to provision OCP patterns for. Use "all" to provision for
12 all locations
13 patterns File with patterns to provision. File has one pattern per line. Use
14 "remove" to remove all patterns previously provisioned by the
15 script
16
17optional arguments:
18 -h, --help show this help message and exit
19 --token TOKEN Access token can be provided using --token argument, set in
20 WEBEX_ACCESS_TOKEN environment variable or can be a service app
21 token. For the latter set environment variables
22 ('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID',
23 'SERVICE_APP_CLIENT_SECRET'). Environment variables can also be set
24 in ocp_pattern.env
25 --dry-run Dry run, do not provision anything
26 --verbose Print debug information
27 --log-file LOG_FILE Log file. If extension is .har, log in HAR format
28
29Example: ocp_pattern.py all ocp_pattern.txt --log-file ocp_pattern.har --dry-run
30"""
31
32import argparse
33import asyncio
34import logging
35import os
36import sys
37from contextlib import contextmanager
38
39from dotenv import load_dotenv
40
41from examples.service_app import SERVICE_APP_ENVS, env_path, get_tokens
42from wxc_sdk.as_api import AsWebexSimpleApi
43from wxc_sdk.as_rest import AsRestError
44from wxc_sdk.har_writer import HarWriter
45from wxc_sdk.person_settings.permissions_out import Action, DigitPattern
46from wxc_sdk.telephony.location import TelephonyLocation
47from wxc_sdk.tokens import Tokens
48
49
50async def work_on_one_location(
51 api: AsWebexSimpleApi, location: TelephonyLocation, pattern_list: list[str], dry_run: bool
52):
53 """
54 Work on one location, create, update or remove patterns
55 """
56
57 async def create(pattern: str):
58 try:
59 await dapi.create(
60 location.location_id,
61 DigitPattern(pattern=pattern, name=f'ocp-{pattern}', action=Action.allow, transfer_enabled=True),
62 )
63 print(f'{location.name}: created pattern {pattern}')
64 except AsRestError as e:
65 print(f'{location.name}: failed to create pattern {pattern}, error: {e}')
66 raise
67
68 async def remove(pattern: DigitPattern):
69 try:
70 await dapi.delete(location.location_id, pattern.id)
71 print(f'{location.name}: removed pattern {pattern.pattern}')
72 except AsRestError as e:
73 print(f'{location.name}: failed to remove pattern {pattern.pattern}, error: {e}')
74 raise
75
76 async def update(pattern: DigitPattern):
77 try:
78 await dapi.update(
79 location.location_id,
80 DigitPattern(
81 pattern=pattern.pattern, name=f'ocp-{pattern.pattern}', action=Action.allow, transfer_enabled=True
82 ),
83 )
84 print(f'{location.name}: updated pattern {pattern.pattern}')
85 except AsRestError as e:
86 print(f'{location.name}: failed to update pattern {pattern.pattern}, error: {e}')
87 raise
88
89 pattern_set = set(pattern_list)
90 dapi = api.telephony.permissions_out.digit_patterns
91
92 # get ocp patterns for location
93 location_digit_patterns = await dapi.get_digit_patterns(location.location_id)
94
95 existing_patterns = [
96 pattern for pattern in location_digit_patterns.digit_patterns if pattern.pattern in pattern_set
97 ]
98 missing_patterns = [
99 pattern
100 for pattern in pattern_list
101 if pattern not in set(map(lambda p: p.pattern, location_digit_patterns.digit_patterns))
102 ]
103 to_be_removed = [
104 pattern
105 for pattern in location_digit_patterns.digit_patterns
106 if pattern.name.startswith('ocp-') and pattern.pattern not in pattern_set
107 ]
108 tasks = []
109
110 # remove patterns
111 for remove_pattern in to_be_removed:
112 # remove pattern
113 print(f'{location.name}: remove pattern {remove_pattern.pattern}')
114 if not dry_run:
115 tasks.append(remove(remove_pattern))
116
117 # add missing patterns
118 for pattern in missing_patterns:
119 # add pattern
120 print(f'{location.name}: add pattern {pattern}')
121 if not dry_run:
122 tasks.append(create(pattern))
123
124 # check existing patterns and update if action is not "allow"
125 for existing in existing_patterns:
126 if existing.action == Action.allow and existing.transfer_enabled:
127 # pattern already exists and is allowed
128 continue
129 # update existing pattern
130 print(f'{location.name}: update pattern {existing.pattern}')
131 if not dry_run:
132 tasks.append(update(existing))
133 if tasks:
134 # run tasks
135 await asyncio.gather(*tasks)
136 return
137
138
139@contextmanager
140def setup_logging(args: argparse.Namespace, api: AsWebexSimpleApi):
141 """
142 Set up logging
143 """
144
145 @contextmanager
146 def file_handler(log_file: str):
147 if not log_file:
148 yield
149 else:
150 # log to file or to HAR
151 if os.path.splitext(log_file)[-1].lower() == '.har':
152 with HarWriter(api=api, path=log_file):
153 yield
154 else:
155 f_handler = logging.FileHandler(args.log_file)
156 f_handler.setLevel(logging.DEBUG)
157 f_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
158 logging.getLogger().addHandler(f_handler)
159 yield
160 return
161
162 logging.getLogger().setLevel(logging.DEBUG)
163 # create a console logging handler
164 console_handler = logging.StreamHandler()
165 console_handler.setLevel(logging.DEBUG if args.verbose else logging.INFO)
166 console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
167 logging.getLogger().addHandler(console_handler)
168 with file_handler(args.log_file):
169 yield
170
171
172async def main():
173 parser = argparse.ArgumentParser(
174 description='Provision OCP patterns for one or all locations',
175 epilog='Example: %(prog)s all ocp_pattern.txt --log-file ocp_pattern.har --dry-run',
176 )
177 parser.add_argument(
178 'location', type=str, help='Location to provision OCP patterns for. Use "all" to provision for all locations'
179 )
180 parser.add_argument(
181 'patterns',
182 type=str,
183 help='File with patterns to provision. File has one pattern '
184 'per line. Use "remove" to remove all patterns previously '
185 'provisioned '
186 'by the script',
187 )
188 parser.add_argument(
189 '--token',
190 help=f'Access token can be provided using --token argument, set in '
191 f'WEBEX_ACCESS_TOKEN environment variable or can be a service app token. For '
192 f'the latter set environment variables {SERVICE_APP_ENVS}. Environment '
193 f'variables can also be set in {env_path()}',
194 )
195 parser.add_argument('--dry-run', action='store_true', help='Dry run, do not provision anything')
196 parser.add_argument('--verbose', action='store_true', help='Print debug information')
197 parser.add_argument('--log-file', help='Log file. If extension is .har, log in HAR format')
198
199 args = parser.parse_args()
200 location_name = args.location
201 pattern_file = args.patterns
202 dry_run = args.dry_run
203
204 # get tokens
205 # if token is provided use that token, else try to read from file
206 load_dotenv(env_path(), override=True)
207 tokens = get_tokens() if args.token is None else Tokens(access_token=args.token)
208 if tokens is None:
209 print(
210 f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable '
211 f'or '
212 f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
213 f'variables can '
214 f'also be set in {env_path()}',
215 file=sys.stderr,
216 )
217 exit(1)
218 async with AsWebexSimpleApi(tokens=tokens, concurrent_requests=100) as api:
219 with setup_logging(args, api):
220 # validate location
221 if location_name.lower() == 'all':
222 # all locations
223 location_list = await api.telephony.locations.list()
224 location_list.sort(key=lambda loc: loc.name)
225 else:
226 # single location
227 location_list = [
228 loc for loc in await api.telephony.locations.list(name=location_name) if loc.name == location_name
229 ]
230 if not location_list:
231 print(f'Location {location_name} not found', file=sys.stderr)
232 exit(1)
233
234 # read patterns from given file
235 if pattern_file.lower() == 'remove':
236 # remove all patterns
237 pattern_list = []
238 else:
239 try:
240 with open(pattern_file) as f:
241 pattern_list = [ps for p in f.readlines() if (ps := p.strip()) and not p.startswith('#')]
242 except FileNotFoundError:
243 print(f'File {pattern_file} not found', file=sys.stderr)
244 exit(1)
245
246 # apply changes to all locations
247 print(f'Working on {len(location_list)} location(s), {len(pattern_list)} patterns')
248 await asyncio.gather(*[work_on_one_location(api, loc, pattern_list, dry_run) for loc in location_list])
249 return
250
251
252if __name__ == '__main__':
253 asyncio.run(main())
Bulk provisioning of 3rd party devices in Workspaces
Bulk provisioning of 3rd party devices in Workspaces
usage: workspace_w_3rd_party.py [-h] [--token TOKEN] [--dry-run]
[--log-file LOG_FILE] [--cleanup]
csv [output]
Provision workspaces with 3rd party devices.
positional arguments:
csv CSV with workspaces to provision. CSV has the
following columns: * workspace name: the workspace
will be created * location name: must be an
existing location * extension (optional); if
missing a new extension will be generated starting
at 2000 * MAC address; if empty a new (dummy) MAC
address will be generated as DEAD-DEAD-XXXX *
password (optional); if missing a new (random
password will be generated
output Output CSV with the provisioning results. Not
required in dry-run mode
optional arguments:
-h, --help show this help message and exit
--token TOKEN Access token can be provided using --token
argument, set in WEBEX_ACCESS_TOKEN environment
variable or can be a service app token. For the
latter set environment variables
('SERVICE_APP_REFRESH_TOKEN',
'SERVICE_APP_CLIENT_ID',
'SERVICE_APP_CLIENT_SECRET'). Environment variables
can also be set in workspace_w_3rd_party.env
--dry-run Dry run, do not provision anything
--log-file LOG_FILE Log file. If extension is .har, log in HAR format
--cleanup remove workspaces
Example: workspace_w_3rd_party.py input.csv output.csv --log-file log.har
Source: workspace_w_3rd_party.py
1#!/usr/bin/env python
2"""
3Create workspaces with 3rd party devices.
4From a CSV read:
5 * workspace name: if workspace doesn't exist a new workspace will be created
6 * location name: must be an existing location
7 * extension (optional); if missing a new extension will be generated starting at 2000
8 * MAC address; if empty a new (dummy) MAC address will be generated as DEAD-DEAD-XXXX
9 * password (optional); if missing a new (random password will be generated
10The output is a CSV file with the following columns:
11 * workspace name
12 * location name
13 * extension
14 * MAC address
15 * password
16 * outbound proxy
17 * SIP user name
18 * line/port
19
20usage: workspace_w_3rd_party.py [-h] [--token TOKEN] [--dry-run]
21 [--log-file LOG_FILE] [--cleanup]
22 csv [output]
23
24Provision workspaces with 3rd party devices.
25
26positional arguments:
27 csv CSV with workspaces to provision. CSV has the
28 following columns: * workspace name: the workspace
29 will be created * location name: must be an
30 existing location * extension (optional); if
31 missing a new extension will be generated starting
32 at 2000 * MAC address; if empty a new (dummy) MAC
33 address will be generated as DEAD-DEAD-XXXX *
34 password (optional); if missing a new (random
35 password will be generated
36 output Output CSV with the provisioning results. Not
37 required in dry-run mode
38
39optional arguments:
40 -h, --help show this help message and exit
41 --token TOKEN Access token can be provided using --token
42 argument, set in WEBEX_ACCESS_TOKEN environment
43 variable or can be a service app token. For the
44 latter set environment variables
45 ('SERVICE_APP_REFRESH_TOKEN',
46 'SERVICE_APP_CLIENT_ID',
47 'SERVICE_APP_CLIENT_SECRET'). Environment variables
48 can also be set in workspace_w_3rd_party.env
49 --dry-run Dry run, do not provision anything
50 --log-file LOG_FILE Log file. If extension is .har, log in HAR format
51 --cleanup remove workspaces
52
53Example: workspace_w_3rd_party.py input.csv output.csv --log-file log.har
54"""
55
56import argparse
57import asyncio
58import csv
59import logging
60import os
61import sys
62from collections import defaultdict
63from collections.abc import Generator
64from contextlib import contextmanager
65from dataclasses import dataclass, field
66from itertools import chain, zip_longest
67
68from dotenv import load_dotenv
69
70from examples.service_app import SERVICE_APP_ENVS, env_path, get_tokens
71from open_api.generated.Shared.workspaces_auto import WorkspaceType
72from wxc_sdk.as_api import AsWebexSimpleApi
73from wxc_sdk.common import DevicePlatform
74from wxc_sdk.har_writer import HarWriter
75from wxc_sdk.licenses import License
76from wxc_sdk.telephony.devices import MACState, MACValidationResponse
77from wxc_sdk.telephony.location import TelephonyLocation
78from wxc_sdk.tokens import Tokens
79from wxc_sdk.workspaces import (
80 CallingType,
81 Workspace,
82 WorkspaceCalling,
83 WorkspaceSupportedDevices,
84 WorkspaceWebexCalling,
85)
86
87MAC_VALIDATION_BATCH_SIZE = 100
88
89
90@dataclass
91class CSVRow:
92 """
93 CSV row with workspace and location name
94 """
95
96 workspace_name: str
97 location_name: str
98 extension: str
99 mac_address: str
100 password: str
101 workspace: Workspace = field(default=None, init=False)
102 calling_license_id: str = field(default=None, init=False)
103 location: TelephonyLocation = field(default=None, init=False)
104 outbound_proxy: str = field(default=None, init=False)
105 sip_user_name: str = field(default=None, init=False)
106 line_port: str = field(default=None, init=False)
107
108 def __post_init__(self):
109 # clean up some data
110 self.workspace_name = self.workspace_name.strip()
111 self.location_name = self.location_name.strip()
112 self.extension = self.extension.strip() or None
113 self.mac_address = self.mac_address.strip().lower() or None
114 self.password = self.password.strip() or None
115
116 @classmethod
117 def from_csv(cls, csv_path: str) -> Generator['CSVRow', None, None]:
118 """
119 Yield CSVRow instances from CSV file
120 """
121 err = False
122 with open(csv_path, newline='') as csv_file:
123 reader = csv.reader(csv_file)
124 for row_number, row in enumerate(reader, 1):
125 try:
126 yield cls(*row)
127 except TypeError as te:
128 err = True
129 print(f'Failed to parse row {row_number}: {te}', file=sys.stderr)
130 continue
131 if err:
132 print(f'Failed to parse {csv_path}', file=sys.stderr)
133 exit(1)
134 return
135
136
137@contextmanager
138def setup_logging(args: argparse.Namespace, api: AsWebexSimpleApi):
139 """
140 Set up logging
141 """
142
143 @contextmanager
144 def file_handler(log_file: str):
145 if not log_file:
146 yield
147 else:
148 # log to file or to HAR
149 if os.path.splitext(log_file)[-1].lower() == '.har':
150 with HarWriter(api=api, path=log_file):
151 yield
152 else:
153 f_handler = logging.FileHandler(args.log_file)
154 f_handler.setLevel(logging.DEBUG)
155 f_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
156 logging.getLogger().addHandler(f_handler)
157 yield
158 return
159
160 logging.getLogger().setLevel(logging.DEBUG)
161 # create a console logging handler
162 console_handler = logging.StreamHandler()
163 console_handler.setLevel(logging.INFO)
164 console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
165 logging.getLogger().addHandler(console_handler)
166 with file_handler(args.log_file):
167 yield
168
169
170# list of validation errors
171ValidationResult = list[tuple[int, str]]
172
173
174async def validate_extensions(
175 *, api: AsWebexSimpleApi, location: TelephonyLocation, indexed_rows: list[tuple[int, CSVRow]]
176) -> ValidationResult:
177 """
178 Make sure that extensions are unique; if no extension is provided, generate a new one
179 """
180 extensions_in_location = {
181 number.extension
182 for number in await api.telephony.phone_numbers(location_id=location.location_id)
183 if number.extension is not None
184 }
185 errors = []
186 for row_index, csv_row in indexed_rows:
187 if csv_row.extension:
188 if csv_row.extension in extensions_in_location:
189 errors.append((row_index, f'Duplicate extension "{csv_row.extension}" in location "{location.name}"'))
190 else:
191 # generate new extension
192 csv_row.extension = next(str(ext) for ext in range(2000, 10000) if str(ext) not in extensions_in_location)
193 print(f'Row {row_index}: Generated new extension "{csv_row.extension}"')
194 extensions_in_location.add(csv_row.extension)
195
196 return errors
197
198
199async def generate_passwords(
200 *, api: AsWebexSimpleApi, location: TelephonyLocation, indexed_rows: list[tuple[int, CSVRow]]
201) -> ValidationResult:
202 """
203 Generate random passwords for rows without a password
204 """
205 new_passwords = await asyncio.gather(
206 *[
207 api.telephony.location.generate_password(location_id=location.location_id)
208 for _, csv_row in indexed_rows
209 if not csv_row.password
210 ]
211 )
212
213 for row_index, csv_row in indexed_rows:
214 if not csv_row.password:
215 # generate new password
216 csv_row.password = new_passwords.pop(0)
217 print(f'Row {row_index}: Generated new password "{csv_row.password}"')
218 return []
219
220
221async def validate_locations_and_extensions(
222 *, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool
223) -> ValidationResult:
224 """
225 Locations must exist and extensions must be unique
226 """
227 if cleanup:
228 return []
229 locations = {location.name: location for location in await api.telephony.locations.list()}
230 errors = []
231 for row_index, csv_row in enumerate(csv_rows, 1):
232 location = locations.get(csv_row.location_name)
233 if not location:
234 errors.append((row_index, f'Location "{csv_row.location_name}" does not exist'))
235 csv_row.location = location
236
237 # for all existing locations validate extensions
238 csv_rows_by_location: dict[str, list[tuple[int, CSVRow]]] = defaultdict(list)
239 for row_index, csv_row in enumerate(csv_rows, 1):
240 if csv_row.location_name in locations:
241 csv_rows_by_location[csv_row.location_name].append((row_index, csv_row))
242 tasks = [
243 validate_extensions(api=api, location=locations[l_name], indexed_rows=indexed_rows)
244 for l_name, indexed_rows in csv_rows_by_location.items()
245 ]
246 # also generate passwords for rows without a password
247 tasks.extend(
248 generate_passwords(api=api, location=locations[l_name], indexed_rows=indexed_rows)
249 for l_name, indexed_rows in csv_rows_by_location.items()
250 )
251 location_results = await asyncio.gather(*tasks)
252 errors.extend(chain.from_iterable(location_results))
253 return errors
254
255
256async def assign_new_mac_addresses(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow]) -> ValidationResult:
257 """
258 Get new MAC addresses for rows where no MAC address is provided
259 """
260 number_of_missing_mac_addresses = sum(1 for csv_row in csv_rows if not csv_row.mac_address)
261
262 if not number_of_missing_mac_addresses:
263 return []
264 mac_prefix = 'DEADDEAD'
265 mac_addresses_in_csv = {csv_row.mac_address for csv_row in csv_rows if csv_row.mac_address}
266
267 def mac_candidates() -> Generator[str, None, None]:
268 for v in range(0, 65536):
269 candidate = f'{mac_prefix}{hex(v)[2:].zfill(4).upper()}'
270 if candidate in mac_addresses_in_csv:
271 # skip mac addresses that are already in the csv
272 continue
273 yield f'{mac_prefix}{hex(v)[2:].zfill(4).upper()}'
274
275 new_mac_addresses = []
276
277 # test macs in batches
278 batch_args = [mac_candidates()] * MAC_VALIDATION_BATCH_SIZE
279
280 # noinspection PyArgumentList
281 batches = zip_longest(*batch_args)
282 for batch in batches:
283 validation_result = await api.telephony.devices.validate_macs(macs=list(batch))
284 errored_macs = set(ms.mac for ms in (validation_result.mac_status or []) if ms.state != MACState.available)
285 new_mac_addresses.extend(mac for mac in batch if mac not in errored_macs)
286 if len(new_mac_addresses) >= number_of_missing_mac_addresses:
287 break
288 for row_index, csv_row in enumerate(csv_rows, 1):
289 if not csv_row.mac_address:
290 # generate new MAC address
291 csv_row.mac_address = new_mac_addresses.pop(0)
292 print(f'Row {row_index}: Generated new MAC address "{csv_row.mac_address}"')
293 return []
294
295
296async def mac_addresses_available(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow]) -> ValidationResult:
297 """
298 Check if MAC addresses provided in CSV are available
299 """
300 errors = []
301 mac_addresses = list(set(csv_row.mac_address for csv_row in csv_rows if csv_row.mac_address))
302 if not mac_addresses:
303 return []
304
305 # validate in batches
306 batches = [
307 mac_addresses[i : i + MAC_VALIDATION_BATCH_SIZE]
308 for i in range(0, len(mac_addresses), MAC_VALIDATION_BATCH_SIZE)
309 ]
310 results = await asyncio.gather(*[api.telephony.devices.validate_macs(macs=batch) for batch in batches])
311 results: list[MACValidationResponse]
312
313 errored_macs: dict[str, str] = dict()
314 for result in results:
315 errored_macs.update(
316 (ms.mac, f'{ms.state}, {ms.message}') for ms in (result.mac_status or []) if ms.state != MACState.available
317 )
318
319 if not errored_macs:
320 return []
321 for row_index, csv_row in enumerate(csv_rows, 1):
322 if csv_row.mac_address and csv_row.mac_address in errored_macs:
323 errors.append((row_index, f'MAC address "{csv_row.mac_address}": {errored_macs[csv_row.mac_address]}'))
324 return errors
325
326
327async def validate_mac_addresses(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool) -> ValidationResult:
328 """
329 mac addresses must be unique and available
330 """
331 if cleanup:
332 return []
333 mac_addresses = set()
334 errors = []
335 # check if provided MAC addresses are unique
336 for row_index, csv_row in enumerate(csv_rows, 1):
337 if csv_row.mac_address:
338 if csv_row.mac_address in mac_addresses:
339 errors.append((row_index, f'Duplicate MAC address "{csv_row.mac_address}"'))
340 mac_addresses.add(csv_row.mac_address)
341
342 results = await asyncio.gather(
343 assign_new_mac_addresses(api=api, csv_rows=csv_rows), mac_addresses_available(api=api, csv_rows=csv_rows)
344 )
345 errors.extend(chain.from_iterable(results))
346 return errors
347
348
349async def validate_workspaces_and_licenses(
350 *, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool
351) -> ValidationResult:
352 """
353 Workspaces should not exist, also get license for new workspaces
354 """
355 tasks = [api.workspaces.list()]
356 if not cleanup:
357 tasks.append(api.licenses.list())
358 results = await asyncio.gather(*tasks)
359 workspace_list = results[0]
360 if cleanup:
361 licenses = []
362 else:
363 licenses = results[1]
364 workspace_list: list[Workspace]
365 licenses: list[License]
366 workspaces = {ws.display_name: ws for ws in workspace_list}
367
368 def calling_license_id() -> Generator[str, None, None]:
369 """
370 calling license id for license with available entitlement
371 """
372 candidate_licenses = [lic for lic in licenses if lic.webex_calling_workspaces or lic.webex_calling_professional]
373 # make sure to consume workspace licenses first
374 candidate_licenses.sort(key=lambda x: x.name, reverse=True)
375 for lic in candidate_licenses:
376 while lic.consumed_units < lic.total_units:
377 lic.consumed_units += 1
378 yield lic.license_id
379 return
380
381 errors = []
382 license_id_gen = calling_license_id()
383 for row_index, csv_row in enumerate(csv_rows, 1):
384 # check if workspace exists
385 ws = workspaces.get(csv_row.workspace_name)
386 csv_row.workspace = ws
387 if cleanup:
388 if not ws:
389 errors.append((row_index, f'Workspace "{csv_row.workspace_name}" does not exist'))
390 continue
391 if ws:
392 errors.append((row_index, f'Workspace "{csv_row.workspace_name}" already exists'))
393 # get a license for the workspace
394 try:
395 license_id = next(license_id_gen)
396 except StopIteration:
397 errors.append((row_index, f'No more licenses available for "{csv_row.workspace_name}"'))
398 continue
399 csv_row.calling_license_id = license_id
400 return errors
401
402
403async def validate_and_prepare(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool) -> None:
404 """
405 validate csv and prepare for provisioning
406 * location exists
407 * extensions are unique (if provided)
408 * MAC addresses are unique (if provided)
409 * workspace names are unique
410 * no devices in the workspace
411 """
412 results = await asyncio.gather(
413 validate_locations_and_extensions(api=api, csv_rows=csv_rows, cleanup=cleanup),
414 validate_mac_addresses(api=api, csv_rows=csv_rows, cleanup=cleanup),
415 validate_workspaces_and_licenses(api=api, csv_rows=csv_rows, cleanup=cleanup),
416 )
417 errors = list(chain.from_iterable(results))
418 errors: ValidationResult
419 errors.sort(key=lambda x: x[0])
420 if errors:
421 print('Validation errors:', file=sys.stderr)
422 for row_index, error in errors:
423 print(f'Row {row_index}: {error}', file=sys.stderr)
424 exit(1)
425 return
426
427
428async def delete_workspaces(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow], dry_run: bool) -> None:
429 """
430 cleanup: delete workspaces
431 """
432
433 async def delete_one_workspace(workspace: Workspace) -> None:
434 if dry_run:
435 print(f'Delete workspace "{workspace.display_name}"')
436 else:
437 await api.workspaces.delete_workspace(workspace_id=workspace.workspace_id)
438 print(f'Deleted workspace "{workspace.display_name}"')
439 return
440
441 await asyncio.gather(*[delete_one_workspace(csv_row.workspace) for csv_row in csv_rows if csv_row.workspace])
442
443
444async def provision_row(*, api: AsWebexSimpleApi, csv_row: CSVRow) -> None:
445 """
446 Provision a single row
447 """
448 # create workspace
449 settings = Workspace(
450 location_id=csv_row.location.location_id,
451 display_name=csv_row.workspace_name,
452 type=WorkspaceType.desk,
453 capacity=1,
454 supported_devices=WorkspaceSupportedDevices.phones,
455 device_platform=DevicePlatform.cisco,
456 calling=WorkspaceCalling(
457 type=CallingType.webex,
458 webex_calling=WorkspaceWebexCalling(
459 licenses=[csv_row.calling_license_id],
460 extension=csv_row.extension,
461 location_id=csv_row.location.location_id,
462 ),
463 ),
464 )
465 workspace = await api.workspaces.create(settings=settings)
466 print(f'Provisioned workspace "{workspace.display_name}"')
467
468 # create device
469 device = await api.devices.create_by_mac_address(
470 mac=csv_row.mac_address,
471 workspace_id=workspace.workspace_id,
472 model='Generic IPPhone Customer Managed',
473 password=csv_row.password,
474 )
475
476 details = await api.telephony.devices.details(device_id=device.device_id)
477 print(f'Provisioned device in workspace "{workspace.display_name}"')
478 csv_row.sip_user_name = details.owner.sip_user_name
479 csv_row.line_port = details.owner.line_port
480 csv_row.outbound_proxy = details.proxy.outbound_proxy
481
482 return
483
484
485def main():
486 async def as_main():
487 # read CSV file
488 csv_rows = list(CSVRow.from_csv(csv_file))
489 async with AsWebexSimpleApi(tokens=tokens) as api:
490 with setup_logging(args, api):
491 # validation and preparation for provisioning
492 await validate_and_prepare(api=api, csv_rows=csv_rows, cleanup=args.cleanup)
493 if args.cleanup:
494 await delete_workspaces(api=api, csv_rows=csv_rows, dry_run=args.dry_run)
495 return
496
497 if args.dry_run:
498 print('Dry run, not provisioning anything')
499 return
500 await asyncio.gather(*[provision_row(api=api, csv_row=csv_row) for csv_row in csv_rows])
501 # write output
502 if args.output:
503 with open(args.output, 'w', newline='') as output:
504 writer = csv.writer(output)
505 writer.writerow(
506 [
507 'workspace_name',
508 'location_name',
509 'extension',
510 'mac_address',
511 'password',
512 'outbound_proxy',
513 'sip_user_name',
514 'line_port',
515 ]
516 )
517 for csv_row in csv_rows:
518 writer.writerow(
519 [
520 csv_row.workspace_name,
521 csv_row.location_name,
522 csv_row.extension,
523 csv_row.mac_address,
524 csv_row.password,
525 csv_row.outbound_proxy,
526 csv_row.sip_user_name,
527 csv_row.line_port,
528 ]
529 )
530 # for
531 # with open
532 # with setup_logging
533 # async with AsWebexSimpleApi
534 return
535
536 # parse arguments
537 parser = argparse.ArgumentParser(
538 description='Provision workspaces with 3rd party devices.',
539 epilog='Example: %(prog)s input.csv output.csv --log-file log.har',
540 )
541 parser.add_argument(
542 'csv',
543 type=str,
544 help="""CSV with workspaces to provision. CSV has the following columns:
545 * workspace name: the workspace will be created
546 * location name: must be an existing location
547 * extension (optional); if missing a new extension will be generated starting at 2000
548 * MAC address; if empty a new (dummy) MAC address will be generated as DEAD-DEAD-XXXX
549 * password (optional); if missing a new (random password will be generated""",
550 )
551 parser.add_argument(
552 'output', nargs='?', type=str, help='Output CSV with the provisioning results. Not required in dry-run mode'
553 )
554 parser.add_argument(
555 '--token',
556 help=f'Access token can be provided using --token argument, set in '
557 f'WEBEX_ACCESS_TOKEN environment variable or can be a service app token. For '
558 f'the latter set environment variables {SERVICE_APP_ENVS}. Environment '
559 f'variables can also be set in {env_path()}',
560 )
561 parser.add_argument('--dry-run', action='store_true', help='Dry run, do not provision anything')
562 parser.add_argument('--log-file', help='Log file. If extension is .har, log in HAR format')
563 parser.add_argument('--cleanup', action='store_true', help='remove workspaces')
564 args = parser.parse_args()
565 if not any((args.output, args.dry_run, args.cleanup)):
566 parser.error('Output file is required if not dry-run cleanup mode')
567 if args.output and (args.dry_run or args.cleanup):
568 parser.error('Output file is not used in dry-run cleanup mode')
569 csv_file = args.csv
570 if not os.path.isfile(csv_file):
571 print(f'File {csv_file} does not exist', file=sys.stderr)
572 exit(1)
573
574 # read tokens
575 load_dotenv(env_path())
576 tokens = get_tokens() if args.token is None else Tokens(access_token=args.token)
577 if tokens is None:
578 print(
579 f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable '
580 f'or can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
581 f'variables can also be set in {env_path()}',
582 file=sys.stderr,
583 )
584 exit(1)
585
586 asyncio.run(as_main())
587
588
589if __name__ == '__main__':
590 main()