Examples
Code examples can be found in the examples directory on GitHub
Also take a look at the unit tests in the tests directory.
Get all calling users
Source: calling_users.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org
5"""
6
7from dotenv import load_dotenv
8
9from wxc_sdk import WebexSimpleApi
10
11load_dotenv()
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))
Get all calling users (async variant)
Source: calling_users_async.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org using the (experimental) async API
5"""
6import asyncio
7import time
8
9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv()
14
15
16async def get_calling_users():
17 """
18 Get details of all calling enabled users by:
19 1) getting all calling users
20 2) collecting all users that have a calling license
21 3) getting details for all users
22 """
23 async with AsWebexSimpleApi(concurrent_requests=40) as api:
24 print('Collecting calling licenses')
25 calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26 if lic.webex_calling)
27
28 # get users with a calling license
29 calling_users = [user async for user in api.people.list_gen()
30 if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31 print(f'{len(calling_users)} users:')
32 print('\n'.join(user.display_name for user in calling_users))
33
34 # get details for all users
35 start = time.perf_counter()
36 details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37 for user in calling_users])
38 expired = time.perf_counter() - start
39 print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42if __name__ == '__main__':
43 asyncio.run(get_calling_users())
Get all users without phones
Source: users_wo_devices.py
1#!/usr/bin/env python
2"""
3Get calling users without devices
4"""
5import asyncio
6import logging
7import os
8from itertools import chain
9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import Tokens
14from wxc_sdk.as_api import AsWebexSimpleApi
15from wxc_sdk.common import UserType
16from wxc_sdk.integration import Integration
17from wxc_sdk.person_settings import PersonDevicesResponse
18from wxc_sdk.scopes import parse_scopes
19
20
21def env_path() -> str:
22 """
23 determine path for .env to load environment variables from; based on name of this file
24 :return: .env file path
25 """
26 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
27
28
29def yml_path() -> str:
30 """
31 determine path of YML file to persist tokens
32 :return: path to YML file
33 :rtype: str
34 """
35 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
36
37
38def build_integration() -> Integration:
39 """
40 read integration parameters from environment variables and create an integration
41 :return: :class:`wxc_sdk.integration.Integration` instance
42 """
43 client_id = os.getenv('INTEGRATION_CLIENT_ID')
44 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
45 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
46 redirect_url = 'http://localhost:6001/redirect'
47 if not all((client_id, client_secret, scopes)):
48 raise ValueError('failed to get integration parameters from environment')
49 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
50 redirect_url=redirect_url)
51
52
53def get_tokens() -> Optional[Tokens]:
54 """
55 Tokens are read from a YML file. If needed an OAuth flow is initiated.
56
57 :return: tokens
58 :rtype: :class:`wxc_sdk.tokens.Tokens`
59 """
60
61 integration = build_integration()
62 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
63 return tokens
64
65
66async def main():
67 # get environment variables from .env; required for integration parameters
68 load_dotenv(env_path())
69
70 # get tokens from cache or create a new set of tokens using the integration defined in .env
71 tokens = get_tokens()
72
73 async with AsWebexSimpleApi(tokens=tokens) as api:
74 # get calling users
75 calling_users = [user for user in await api.people.list(calling_data=True)
76 if user.location_id]
77
78 # get device info for all users
79 user_device_infos = await asyncio.gather(*[api.person_settings.devices(person_id=user.person_id)
80 for user in calling_users])
81 user_device_infos: list[PersonDevicesResponse]
82 users_wo_devices = [user for user, device_info in zip(calling_users, user_device_infos)
83 if not device_info.devices]
84
85 # alternatively we can collect all device owners
86 device_owner_ids = set(owner.owner_id
87 for device in chain.from_iterable(udi.devices for udi in user_device_infos)
88 if (owner := device.owner) and owner.owner_type == UserType.people)
89
90 # ... and collect all other users (the ones not owning a device)
91 users_not_owning_a_device = [user for user in calling_users
92 if user.person_id not in device_owner_ids]
93
94 users_wo_devices.sort(key=lambda u: u.display_name)
95 print(f'{len(users_wo_devices)} users w/o devices:')
96 print('\n'.join(f'{user.display_name} ({user.emails[0]})'
97 for user in users_wo_devices))
98
99 print()
100 users_not_owning_a_device.sort(key=lambda u: u.display_name)
101 print(f'{len(users_not_owning_a_device)} users not owning a device:')
102 print('\n'.join(f'{user.display_name} ({user.emails[0]})'
103 for user in users_not_owning_a_device))
104
105
106if __name__ == '__main__':
107 # enable DEBUG logging to a file; REST log shows all requests
108 logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
109 filemode='w', level=logging.DEBUG, format='%(asctime)s %(threadName)s %(message)s')
110 asyncio.run(main())
Default call forwarding settings for all users
This example start with the list of all calling users and then calls
wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure()
for each user. To speed up things a
ThreadPoolExecutor
is used and all update operations are scheduled for execution by the thread pool.
Source: reset_call_forwarding.py
1#!/usr/bin/env python
2"""
3Example script
4Reset call forwarding to default for all users in the org
5"""
6import asyncio
7import logging
8import time
9
10from dotenv import load_dotenv
11
12from wxc_sdk.all_types import PersonForwardingSetting
13from wxc_sdk.as_api import AsWebexSimpleApi
14
15
16async def main():
17 # load environment. SDk fetches the access token from WEBEX_ACCESS_TOKEN environment variable
18 load_dotenv()
19
20 logging.basicConfig(level=logging.INFO)
21
22 # set to DEBUG to see the actual requests
23 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
24
25 async with AsWebexSimpleApi() as api:
26
27 # get all calling users
28 start = time.perf_counter_ns()
29 calling_users = [user for user in await api.people.list(calling_data=True)
30 if user.location_id]
31 print(f'Got {len(calling_users)} calling users in '
32 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
33
34 # set call forwarding to default for all users
35 # default call forwarding settings
36 forwarding = PersonForwardingSetting.default()
37
38 # schedule update for each user and wait for completion
39 start = time.perf_counter_ns()
40 await asyncio.gather(*[api.person_settings.forwarding.configure(person_id=user.person_id,
41 forwarding=forwarding)
42 for user in calling_users])
43 print(f'Reset call forwarding to default for {len(calling_users)} users in '
44 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
45
46if __name__ == '__main__':
47 asyncio.run(main())
Modify number of rings configuration for users read from CSV
Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.
For all these users a VM setting is updated and the results are written to a CSV for further processing.
Source: modify_voicemail.py
1#!/usr/bin/env python
2"""
3Example Script
4Modifying number of rings configuration in voicemail settings
5Run -> python3 modify_voicemail.py modify_voicemail.csv
6"""
7import csv
8import sys
9import traceback
10from concurrent.futures import ThreadPoolExecutor
11
12from dotenv import load_dotenv
13
14from wxc_sdk import WebexSimpleApi
15from wxc_sdk.all_types import *
16
17VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
18
19# loading environment variables - use .env file for development
20load_dotenv()
21
22
23def update_vm_settings():
24 """
25 actually update VM settings for all users present in input CSV
26 """
27 api = WebexSimpleApi()
28 final_report = []
29 mail_ids = []
30 # using wxc_sdk.people.PeopleApi.list to iterate over persons
31 # Parameter calling_data needs to be set to true to gat calling specific information
32 # calling users have the attribute location_id set
33 calling_users = [user for user in api.people.list(calling_data=True)
34 if user.location_id]
35 print(f'{len(calling_users)} users:')
36 print('\n'.join(user.display_name for user in calling_users))
37
38 # get CSV file name from command line
39 with open(str(sys.argv[1]), 'r') as csv_file:
40 reader = csv.DictReader(csv_file)
41 # read all records from CSV. Ony consider records w/o error
42 for col in reader:
43 if not col['ERROR']:
44 # collect email address from USERNAME column for further processing
45 mail_ids.append(col['USERNAME'])
46 else:
47 final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49 # work on calling users that have an email address that we read from the CSV
50 filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52 print("\nCalling Users in CI - Count ", len(calling_users))
53 print("Mail IDs from input file after removing error columns - Count ", len(mail_ids))
54 print("FilteredUsers Users - Count", len(filteredUsers))
55
56 def set_number_of_rings(user: Person):
57 """
58 Read VM config for a user
59 :param user: user to update
60 """
61 try:
62 # shortcut
63 vm = api.person_settings.voicemail
64
65 # Read Current configuration
66 vm_settings = vm.read(person_id=user.person_id)
67 print(f'\n Existing Configuration: {vm_settings} ')
68
69 # Modify number of rings value
70 vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71 vm.configure(person_id=user.person_id, settings=vm_settings)
72 # Read configuration after changes
73 vm_settings = vm.read(person_id=user.person_id)
74 print(f'\n New Configuration: {vm_settings} ')
75 final_report.append((user.display_name, 'SUCCESS'))
76 except Exception as e:
77 final_report.append((user.display_name, 'FAILURE'))
78 print("type error: " + str(e))
79 print(traceback.format_exc())
80 return
81
82 # Modify settings for the filtered users
83 with ThreadPoolExecutor() as pool:
84 list(pool.map(lambda user: set_number_of_rings(user),
85 filteredUsers))
86
87 print(final_report)
88 with open('output.csv', 'w') as f:
89 write = csv.writer(f)
90 write.writerow(["USERNAME", "STATUS"])
91 write.writerows(final_report)
92
93
94if __name__ == '__main__':
95 update_vm_settings()
Holiday schedule w/ US national holidays for all US locations
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5"""
6
7import logging
8from collections import defaultdict
9from concurrent.futures import ThreadPoolExecutor
10from datetime import date
11from threading import Lock
12from typing import List
13
14from dotenv import load_dotenv
15
16from calendarific import CalendarifiyApi, Holiday
17from wxc_sdk import WebexSimpleApi
18from wxc_sdk.locations import Location
19from wxc_sdk.all_types import ScheduleType, Event, Schedule
20
21log = logging.getLogger(__name__)
22
23# a lock per location to protect observe_in_location()
24location_locks: dict[str, Lock] = defaultdict(Lock)
25
26# Use parallel threads for provisioning?
27USE_THREADING = True
28
29# True: delete holiday schedule instead of creating one
30CLEAN_UP = False
31
32# first and last year for which to create public holiday events
33FIRST_YEAR = 2022
34LAST_YEAR = 2024
35
36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
37
38
39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
40 """
41 create/update a "National Holiday" schedule in one location
42
43 :param api: Webex api
44 :type api: WebexSimpleApi
45 :param location: location to work on
46 :type location: Location
47 :param holidays: list of holidays to observe
48 :type holidays: List[Holiday]
49 """
50 # there should always only one thread messing with the holiday schedule of a location
51 with location_locks[location.location_id]:
52 year = holidays[0].date.year
53 schedule_name = 'National Holidays'
54
55 # shortcut
56 ats = api.telephony.schedules
57
58 # existing "National Holiday" schedule or None
59 schedule = next((schedule
60 for schedule in ats.list(obj_id=location.location_id,
61 schedule_type=ScheduleType.holidays,
62 name=schedule_name)
63 if schedule.name == schedule_name),
64 None)
65 if CLEAN_UP:
66 if schedule:
67 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
68 ats.delete_schedule(obj_id=location.location_id,
69 schedule_type=ScheduleType.holidays,
70 schedule_id=schedule.schedule_id)
71 return
72 if schedule:
73 # we need the details: list response doesn't have events
74 schedule = ats.details(obj_id=location.location_id,
75 schedule_type=ScheduleType.holidays,
76 schedule_id=schedule.schedule_id)
77 # create list of desired schedule entries
78 # * one per holiday
79 # * only future holidays
80 # * not on a Sunday
81 today = date.today()
82 events = [Event(name=f'{holiday.name} {holiday.date.year}',
83 start_date=holiday.date,
84 end_date=holiday.date,
85 all_day_enabled=True)
86 for holiday in holidays
87 if holiday.date >= today and holiday.date.weekday() != 6]
88
89 if not schedule:
90 # create new schedule
91 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
92 if not events:
93 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
94 return
95 schedule = Schedule(name=schedule_name,
96 schedule_type=ScheduleType.holidays,
97 events=events)
98 log.debug(
99 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100 f'events')
101 schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103 return
104
105 # update existing schedule
106 with ThreadPoolExecutor() as pool:
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_THREADING:
114 list(pool.map(
115 lambda event: ats.event_delete(obj_id=location.location_id,
116 schedule_type=ScheduleType.holidays,
117 schedule_id=schedule.schedule_id,
118 event_id=event.event_id),
119 to_delete))
120 else:
121 for event in to_delete:
122 ats.event_delete(obj_id=location.location_id,
123 schedule_type=ScheduleType.holidays,
124 schedule_id=schedule.schedule_id,
125 event_id=event.event_id)
126
127 # add events which don't exist yet
128 existing_dates = set(event.start_date
129 for event in schedule.events)
130 to_add = [event
131 for event in events
132 if event.start_date not in existing_dates]
133 if not to_add:
134 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
135 return
136 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
137 if USE_THREADING:
138 list(pool.map(
139 lambda event: ats.event_create(
140 obj_id=location.location_id,
141 schedule_type=ScheduleType.holidays,
142 schedule_id=schedule.schedule_id,
143 event=event),
144 to_add))
145 else:
146 for event in to_add:
147 ats.event_create(
148 obj_id=location.location_id,
149 schedule_type=ScheduleType.holidays,
150 schedule_id=schedule.schedule_id,
151 event=event)
152 log.info(f'observe_in_location({location.name}, {year}): done.')
153 return
154
155
156def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
157 year: int = None):
158 """
159 US national holidays for given locations
160
161 :param api: Webex api
162 :type api: WebexSimpleApi
163 :param locations: list of locations in which US national holidays should be observed
164 :type locations: List[Location]
165 :param year: year for national holidays. Default: current year
166 :type year: int
167 """
168 # default: this year
169 year = year or date.today().year
170
171 # get national holidays for specified year
172 holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
173
174 # update holiday schedule for each location
175 with ThreadPoolExecutor() as pool:
176 if USE_THREADING:
177 list(pool.map(
178 lambda location: observe_in_location(api=api, location=location, holidays=holidays),
179 locations))
180 else:
181 for location in locations:
182 observe_in_location(api=api, location=location, holidays=holidays)
183 return
184
185
186if __name__ == '__main__':
187 # read dotenv which has some environment variables like Webex API token and Calendarify
188 # API key.
189 load_dotenv()
190
191 # enable logging
192 logging.basicConfig(level=logging.DEBUG,
193 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
194 logging.getLogger('urllib3').setLevel(logging.INFO)
195 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
196
197 # the actual action
198 with WebexSimpleApi(concurrent_requests=5) as wx_api:
199 # get all US locations
200 log.info('Getting locations...')
201 us_locations = [location
202 for location in wx_api.locations.list()
203 if location.address.country == 'US']
204
205 # set up location locks
206 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207 list(location_locks[loc.location_id] for loc in us_locations)
208
209 # create national holiday schedule for given year(s) and locations
210 if USE_THREADING:
211 with ThreadPoolExecutor() as pool:
212 list(pool.map(
213 lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
214 range(FIRST_YEAR, LAST_YEAR + 1)))
215 else:
216 for year in range(FIRST_YEAR, LAST_YEAR + 1):
217 observe_national_holidays(api=wx_api, year=year, locations=us_locations)
Holiday schedule w/ US national holidays for all US locations (async variant)
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays_async.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5
6Using the asyc SDK variant
7"""
8import asyncio
9import functools
10import logging
11from collections import defaultdict
12from datetime import date
13from typing import List
14
15from dotenv import load_dotenv
16
17from calendarific import CalendarifiyApi, Holiday
18from wxc_sdk.all_types import ScheduleType, Event, Schedule
19from wxc_sdk.as_api import AsWebexSimpleApi
20from wxc_sdk.locations import Location
21
22log = logging.getLogger(__name__)
23
24# a lock per location to protect observe_in_location()
25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
26
27# Use parallel tasks for provisioning?
28USE_TASKS = True
29
30# True: delete holiday schedule instead of creating one
31CLEAN_UP = False
32
33# first and last year for which to create public holiday events
34FIRST_YEAR = 2022
35LAST_YEAR = 2024
36
37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
38
39
40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
41 """
42 create/update a "National Holiday" schedule in one location
43
44 :param api: Webex api
45 :type api: WebexSimpleApi
46 :param location: location to work on
47 :type location: Location
48 :param holidays: list of holidays to observe
49 :type holidays: List[Holiday]
50 """
51 # there should always only one thread messing with the holiday schedule of a location
52 async with location_locks[location.location_id]:
53 year = holidays[0].date.year
54 schedule_name = 'National Holidays'
55
56 # shortcut
57 ats = api.telephony.schedules
58
59 # existing "National Holiday" schedule or None
60 schedule = next((schedule
61 for schedule in await ats.list(obj_id=location.location_id,
62 schedule_type=ScheduleType.holidays,
63 name=schedule_name)
64 if schedule.name == schedule_name),
65 None)
66 if CLEAN_UP:
67 if schedule:
68 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
69 await ats.delete_schedule(obj_id=location.location_id,
70 schedule_type=ScheduleType.holidays,
71 schedule_id=schedule.schedule_id)
72 return
73 if schedule:
74 # we need the details: list response doesn't have events
75 schedule = await ats.details(obj_id=location.location_id,
76 schedule_type=ScheduleType.holidays,
77 schedule_id=schedule.schedule_id)
78 # create list of desired schedule entries
79 # * one per holiday
80 # * only future holidays
81 # * not on a Sunday
82 today = date.today()
83 events = [Event(name=f'{holiday.name} {holiday.date.year}',
84 start_date=holiday.date,
85 end_date=holiday.date,
86 all_day_enabled=True)
87 for holiday in holidays
88 if holiday.date >= today and holiday.date.weekday() != 6]
89
90 if not schedule:
91 # create new schedule
92 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
93 if not events:
94 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
95 return
96 schedule = Schedule(name=schedule_name,
97 schedule_type=ScheduleType.holidays,
98 events=events)
99 log.debug(
100 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101 f'events')
102 schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104 return
105
106 # update existing schedule
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_TASKS:
114 await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115 schedule_type=ScheduleType.holidays,
116 schedule_id=schedule.schedule_id,
117 event_id=event.event_id)
118 for event in to_delete])
119 else:
120 for event in to_delete:
121 await ats.event_delete(obj_id=location.location_id,
122 schedule_type=ScheduleType.holidays,
123 schedule_id=schedule.schedule_id,
124 event_id=event.event_id)
125
126 # add events which don't exist yet
127 existing_dates = set(event.start_date
128 for event in schedule.events)
129 to_add = [event
130 for event in events
131 if event.start_date not in existing_dates]
132 if not to_add:
133 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
134 return
135 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
136 if USE_TASKS:
137 await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
138 schedule_type=ScheduleType.holidays,
139 schedule_id=schedule.schedule_id,
140 event=event)
141 for event in to_add])
142 else:
143 for event in to_add:
144 await ats.event_create(obj_id=location.location_id,
145 schedule_type=ScheduleType.holidays,
146 schedule_id=schedule.schedule_id,
147 event=event)
148 log.info(f'observe_in_location({location.name}, {year}): done.')
149 return
150
151
152async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
153 year: int = None):
154 """
155 US national holidays for given locations
156
157 :param api: Webex api
158 :type api: WebexSimpleApi
159 :param locations: list of locations in which US national holidays should be observed
160 :type locations: List[Location]
161 :param year: year for national holidays. Default: current year
162 :type year: int
163 """
164 # default: this year
165 year = year or date.today().year
166
167 # get national holidays for specified year
168 loop = asyncio.get_running_loop()
169 # avoid sync all:
170 # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171 holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
172 country='US', year=year, holiday_type='national'))
173
174 # update holiday schedule for each location
175 if USE_TASKS:
176 await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
177 for location in locations])
178 else:
179 for location in locations:
180 await observe_in_location(api=api, location=location, holidays=holidays)
181 return
182
183
184if __name__ == '__main__':
185 # read dotenv which has some environment variables like Webex API token and Calendarify
186 # API key.
187 load_dotenv()
188
189 # enable logging
190 logging.basicConfig(level=logging.DEBUG,
191 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192 logging.getLogger('urllib3').setLevel(logging.INFO)
193 logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
194
195 # the actual action
196 async def do_provision():
197
198 async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
199 # get all US locations
200 log.info('Getting locations...')
201 us_locations = [location
202 for location in await wx_api.locations.list()
203 if location.address.country == 'US']
204
205 # set up location locks
206 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207 list(location_locks[loc.location_id] for loc in us_locations)
208
209 # create national holiday schedule for given year(s) and locations
210 if USE_TASKS:
211 await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
212 for year in range(FIRST_YEAR, LAST_YEAR + 1)])
213 else:
214 for year in range(FIRST_YEAR, LAST_YEAR + 1):
215 await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
216
217 asyncio.run(do_provision())
Persist tokens and obtain new tokens interactively
A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.
This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python
webbrowser module and calling the open()
method with the authorization URL of a given integration to open that
URL in the system web browser. The user can then authenticate and grant access to the integration. In the last
step of a successful authorization flow the web browser is redirected to the redirect_url
of the
integration.
The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect
.
This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.
The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID
,
TOKEN_INTEGRATION_CLIENT_SECRET
, TOKEN_INTEGRATION_CLIENT_SCOPES
). These variables can also be defined in
get_tokens.env
in the current directory:
# rename this to get_tokens.env and set values
# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=
# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=
# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"
The sample code persists the tokens in get_tokens.yml
in the current directory. On startup the sample code tries
to read tokens from that file. If needed a new access token is obtained using the refresh token.
An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml
Source: get_tokens.py
1#!/usr/bin/env python
2"""
3Example script
4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
5the local web browser
6"""
7import logging
8import os
9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import WebexSimpleApi
14from wxc_sdk.integration import Integration
15from wxc_sdk.scopes import parse_scopes
16from wxc_sdk.tokens import Tokens
17
18log = logging.getLogger(__name__)
19
20
21def env_path() -> str:
22 """
23 determine path for .env to load environment variables from
24
25 :return: .env file path
26 """
27 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
28
29
30def yml_path() -> str:
31 """
32 determine path of YML file to persist tokens
33
34 :return: path to YML file
35 :rtype: str
36 """
37 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
38
39
40def build_integration() -> Integration:
41 """
42 read integration parameters from environment variables and create an integration
43
44 :return: :class:`wxc_sdk.integration.Integration` instance
45 """
46 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
47 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
48 scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
49 redirect_url = 'http://localhost:6001/redirect'
50 if not all((client_id, client_secret, scopes)):
51 raise ValueError('failed to get integration parameters from environment')
52 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
53 redirect_url=redirect_url)
54
55
56def get_tokens() -> Optional[Tokens]:
57 """
58
59 Tokens are read from a YML file. If needed an OAuth flow is initiated.
60
61 :return: tokens
62 :rtype: :class:`wxc_sdk.tokens.Tokens`
63 """
64
65 integration = build_integration()
66 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
67 return tokens
68
69
70if __name__ == '__main__':
71 logging.basicConfig(level=logging.DEBUG)
72
73 # load environment variables from .env
74 path = env_path()
75 log.info(f'reading {path}')
76 load_dotenv(env_path())
77
78 tokens = get_tokens()
79
80 # use the tokens to get identity of authenticated user
81 api = WebexSimpleApi(tokens=tokens)
82 me = api.people.me()
83 print(f'authenticated as {me.display_name} ({me.emails[0]}) ')
Read/update call intercept settings of a user
usage: call_intercept.py [-h] [–token TOKEN] user_email [{on,off}]positional arguments:user_email email address of user{on,off} operation to applyoptions:-h, –help show this help message and exit–token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a .env
file
Source: call_intercept.py
1#!/usr/bin/env python
2"""
3Script to read/update call intercept settings of a calling user.
4
5The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
6obtains tokens via an OAuth flow.
7
8 usage: call_intercept.py [-h] [--token TOKEN] user_email [{on,off}]
9
10 positional arguments:
11 user_email email address of user
12 {on,off} operation to apply
13
14 options:
15 -h, --help show this help message and exit
16 --token TOKEN admin access token to use
17"""
18import argparse
19import logging
20import os
21import re
22import sys
23from json import loads
24from typing import Optional
25
26from dotenv import load_dotenv
27from wxc_sdk import WebexSimpleApi
28from wxc_sdk.integration import Integration
29from wxc_sdk.person_settings.call_intercept import InterceptSetting
30from wxc_sdk.rest import RestError
31from wxc_sdk.scopes import parse_scopes
32from wxc_sdk.tokens import Tokens
33from yaml import safe_dump, safe_load
34
35log = logging.getLogger(__name__)
36
37
38def env_path() -> str:
39 """
40 determine path for .env to load environment variables from
41
42 :return: .env file path
43 """
44 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
45
46
47def yml_path() -> str:
48 """
49 determine path of YML file to persist tokens
50
51 :return: path to YML file
52 :rtype: str
53 """
54 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
55
56
57def build_integration() -> Integration:
58 """
59 read integration parameters from environment variables and create an integration
60
61 :return: :class:`wxc_sdk.integration.Integration` instance
62 """
63 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
64 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
65 scopes = os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES')
66 if scopes:
67 scopes = parse_scopes(scopes)
68 if not all((client_id, client_secret, scopes)):
69 raise ValueError('failed to get integration parameters from environment')
70 redirect_url = 'http://localhost:6001/redirect'
71 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
72 redirect_url=redirect_url)
73
74
75def get_tokens() -> Optional[Tokens]:
76 """
77
78 Tokens are read from a YML file. If needed an OAuth flow is initiated.
79
80 :return: tokens
81 :rtype: :class:`wxc_sdk.tokens.Tokens`
82 """
83
84 def write_tokens(tokens_to_cache: Tokens):
85 with open(yml_path(), mode='w') as f:
86 safe_dump(loads(tokens_to_cache.json()), f)
87 return
88
89 def read_tokens() -> Optional[Tokens]:
90 try:
91 with open(yml_path(), mode='r') as f:
92 data = safe_load(f)
93 tokens_read = Tokens.model_validate(data)
94 except Exception as e:
95 log.info(f'failed to read tokens from file: {e}')
96 tokens_read = None
97 return tokens_read
98
99 integration = build_integration()
100 tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
101 write_to_cache=write_tokens)
102 return tokens
103
104
105RE_EMAIL = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
106
107
108def email_type(value):
109 if not RE_EMAIL.match(value):
110 raise argparse.ArgumentTypeError(f"'{value}' is not a valid email")
111 return value
112
113
114def main():
115 """
116 where the magic happens
117 """
118 # read .env file with the settings for the integration to be used to obtain tokens
119 load_dotenv(env_path())
120
121 parser = argparse.ArgumentParser()
122 parser.add_argument('user_email', type=email_type, help='email address of user')
123 parser.add_argument('on_off', choices=['on', 'off'], nargs='?', help='operation to apply')
124 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
125 args = parser.parse_args()
126
127 if args.token:
128 tokens = args.token
129 elif (tokens := os.getenv('WEBEX_ACCESS_TOKEN')) is None:
130 tokens = get_tokens()
131
132 if not tokens:
133 print('Failed to get tokens', file=sys.stderr)
134 exit(1)
135
136 # set level to DEBUG to see debug of REST requests
137 logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
138
139 with WebexSimpleApi(tokens=tokens) as api:
140 # get user
141 email = args.user_email.lower()
142 user = next((user
143 for user in api.people.list(email=email)
144 if user.emails[0] == email), None)
145 if user is None:
146 print(f'User "{email}" not found', file=sys.stderr)
147 exit(1)
148
149 # display call intercept status
150 try:
151 intercept = api.person_settings.call_intercept.read(user.person_id)
152 except RestError as e:
153 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
154 exit(1)
155
156 print('on' if intercept.enabled else 'off')
157 if args.on_off:
158 # action: turn on/off
159 intercept = InterceptSetting.default()
160 intercept.enabled = args.on_off == 'on'
161 try:
162 api.person_settings.call_intercept.configure(user.person_id,
163 intercept=intercept)
164 except RestError as e:
165 print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
166 exit(1)
167
168 # read call intercept again
169 try:
170 intercept = api.person_settings.call_intercept.read(user.person_id)
171 except RestError as e:
172 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
173 exit(1)
174
175 # display state after update
176 print(f"set to {'on' if intercept.enabled else 'off'}")
177
178 exit(0)
179
180
181if __name__ == '__main__':
182 main()
Read/update call queue agent join states
usage: queue_helper.py [-h] [–location LOCATION [LOCATION …]][–queue QUEUE [QUEUE …]][–join JOIN_AGENT [JOIN_AGENT …]][–unjoin UNJOIN_AGENT [UNJOIN_AGENT …]][–remove REMOVE_USER [REMOVE_USER …]][–add ADD_USER [ADD_USER …]] [–dryrun][–token TOKEN]Modify call queue settings from the CLIoptional arguments:-h, –help show this help message and exit–location LOCATION [LOCATION …], -l LOCATION [LOCATION …]name of location to work on. If missing then work onall locations.–queue QUEUE [QUEUE …], -q QUEUE [QUEUE …]name(s) of queue(s) to operate on. If missing thenwork on all queues in location.–join JOIN_AGENT [JOIN_AGENT …], -j JOIN_AGENT [JOIN_AGENT …]Join given user(s) on given queue(s). Can be “all” toact on all agents.–unjoin UNJOIN_AGENT [UNJOIN_AGENT …], -u UNJOIN_AGENT [UNJOIN_AGENT …]Unjoin given agent(s) from given queue(s). Can be“all” to act on all agents.–remove REMOVE_USER [REMOVE_USER …], -r REMOVE_USER [REMOVE_USER …]Remove given agent from given queue(s). Can be “all”to act on all agents.–add ADD_USER [ADD_USER …], -a ADD_USER [ADD_USER …]Add given users to given queue(s).–dryrun, -d Dry run; don’t apply any changes–token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables
which can be set in a queue_helper.env
file in the current directory.
Source: queue_helper.py
1#!/usr/bin/env python
2"""
3usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]] [--queue QUEUE [QUEUE ...]]
4 [--join JOIN_AGENT [JOIN_AGENT ...]] [--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
5 [--remove REMOVE_USER [REMOVE_USER ...]] [--add ADD_USER [ADD_USER ...]] [--dryrun]
6 [--token TOKEN]
7
8Modify call queue settings from the CLI
9
10optional arguments:
11 -h, --help show this help message and exit
12 --location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
13 name of location to work on. If missing then work on all locations.
14 --queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
15 name(s) of queue(s) to operate on. If missing then work on all queues in location.
16 --join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
17 Join given user(s) on given queue(s). Can be "all" to act on all agents.
18 --unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
19 Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.
20 --remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
21 Remove given agent from given queue(s). Can be "all" to act on all agents.
22 --add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
23 Add given users to given queue(s).
24 --dryrun, -d Dry run; don't apply any changes
25 --token TOKEN admin access token to use
26"""
27import asyncio
28import logging
29import os
30import sys
31from argparse import ArgumentParser
32from collections.abc import AsyncGenerator
33from typing import Optional
34
35from dotenv import load_dotenv
36
37from wxc_sdk import Tokens
38from wxc_sdk.as_api import AsWebexSimpleApi
39from wxc_sdk.integration import Integration
40from wxc_sdk.people import Person
41from wxc_sdk.scopes import parse_scopes
42from wxc_sdk.telephony.callqueue import CallQueue
43from wxc_sdk.telephony.hg_and_cq import Agent
44
45
46def agent_name(agent: Agent) -> str:
47 return f'{agent.first_name} {agent.last_name}'
48
49
50def env_path() -> str:
51 """
52 determine path for .env to load environment variables from; based on name of this file
53 :return: .env file path
54 """
55 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
56
57
58def yml_path() -> str:
59 """
60 determine path of YML file to persist tokens
61 :return: path to YML file
62 :rtype: str
63 """
64 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
65
66
67def build_integration() -> Integration:
68 """
69 read integration parameters from environment variables and create an integration
70 :return: :class:`wxc_sdk.integration.Integration` instance
71 """
72 client_id = os.getenv('INTEGRATION_CLIENT_ID')
73 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
74 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
75 redirect_url = 'http://localhost:6001/redirect'
76 if not all((client_id, client_secret, scopes)):
77 raise ValueError('failed to get integration parameters from environment')
78 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
79 redirect_url=redirect_url)
80
81
82def get_tokens() -> Optional[Tokens]:
83 """
84 Tokens are read from a YML file. If needed an OAuth flow is initiated.
85
86 :return: tokens
87 :rtype: :class:`wxc_sdk.tokens.Tokens`
88 """
89
90 integration = build_integration()
91 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
92 return tokens
93
94
95async def main():
96 async def act_on_queue(queue: CallQueue):
97 """
98 Act on a single queue
99 """
100 # we need the queue details b/c the queue instance passed as parameter is from a list() call
101 # ... and thus is missing all the details like agents
102 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
103 agent_names = set(map(agent_name, details.agents))
104
105 def notify(message: str) -> str:
106 """
107 an action notification with queue information
108 """
109 return f'queue "{details.name:{queue_len}}" in "{queue.location_name:{location_len}}": {message}'
110
111 def validate_agents(names: list[str], operation: str) -> list[str]:
112 """
113 check if all names in given list exist as agents on current queue
114 """
115 if 'all' in names:
116 return set(agent_names)
117
118 not_found = [name for name in names if name not in agent_names]
119 if not_found:
120 print('\n'.join(notify(f'{name} not found for {operation}"')
121 for name in not_found),
122 file=sys.stderr)
123 return set(name for name in names if name not in set(not_found))
124
125 # validate list of names or join, unjoin, and remove against actual list of agents
126 to_join = validate_agents(join_agents, 'join')
127 to_unjoin = validate_agents(unjoin_agents, 'unjoin')
128 to_remove = validate_agents(remove_users, 'remove')
129
130 # check for agents we are asked to add but which already exist as agents on the queue
131 existing_agent_ids = set(agent.agent_id for agent in details.agents)
132 agent_exists = [agent_name(user) for user in add_users
133 if user.person_id in existing_agent_ids]
134 if agent_exists:
135 print('\n'.join(notify(f'{name} already is agent')
136 for name in agent_exists),
137 file=sys.stderr)
138 # reduced set of users to add
139 to_add = [user for user in add_users
140 if agent_name(user) not in set(agent_exists)]
141 else:
142 # ... or add all users
143 to_add = add_users
144
145 # the updated list of agents for the current queue
146 new_agents = []
147
148 # do we actually need an update?
149 update_needed = False
150
151 # create copy of each agent instance; we don't want to update the original agent objects
152 # to make sure that details still holds the state before any update
153 agents = [agent.copy(deep=True) for agent in details.agents]
154
155 # iterate through the existing agents and see if we have to apply any change
156 for agent in agents:
157 name = agent_name(agent)
158 # do we have to take action to join this agent?
159 if name in to_join and not agent.join_enabled:
160 print(notify(f'{name}, join'))
161 update_needed = True
162 agent.join_enabled = True
163 # do we have to take action to unjoin this agent?
164 if name in to_unjoin and agent.join_enabled:
165 print(notify(f'{name}, unjoin'))
166 update_needed = True
167 agent.join_enabled = False
168 # do we have to remove this agent?
169 if name in to_remove:
170 print(notify(f'{name}, remove'))
171 update_needed = True
172 # skip to next agent; so that we don't add this agent to the updated list of agents
173 continue
174 new_agents.append(agent)
175
176 # add new agents
177 new_agents.extend(Agent(agent_id=user.person_id)
178 for user in to_add)
179
180 # update the queue
181 if (update_needed or to_add) and not args.dryrun:
182 # simplified update: we only messed with the agents
183 update = CallQueue(agents=new_agents)
184 await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id,
185 update=update)
186 print(notify('queue updated'))
187 # and get details after the update
188 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
189 print(notify('got details after update'))
190
191 # print summary
192 print(f'queue "{queue.name:{queue_len}}" in "{queue.location_name}"')
193 print(f' phone number: {details.phone_number}')
194 print(f' extension: {details.extension}')
195 print(' agents')
196 if details.agents:
197 name_len = max(map(len, map(agent_name, details.agents)))
198 for agent in details.agents:
199 print(f' {agent_name(agent):{name_len}}: {"not " if not agent.join_enabled else ""}joined')
200 return
201
202 async def validate_users(user_names: list[str]) -> AsyncGenerator[Person, None, None]:
203 """
204 Validate list of names of users to be added and yield a Person instance for each one
205 """
206 # search for all names in parallel
207 lists: list[list[Person]] = await asyncio.gather(
208 *[api.people.list(display_name=name) for name in user_names], return_exceptions=True)
209 for name, user_list in zip(user_names, lists):
210 if isinstance(user_list, Exception):
211 user = None
212 else:
213 user = next((u for u in user_list if name == agent_name(u)), None)
214 if user is None:
215 print(f'user "{name}" not found', file=sys.stderr)
216 continue
217 yield user
218 return
219
220 # parse command line
221 parser = ArgumentParser(description='Modify call queue settings from the CLI')
222 parser.add_argument('--location', '-l', type=str, required=False, nargs='+',
223 help='name of location to work on. If missing then work on all locations.')
224
225 parser.add_argument('--queue', '-q', type=str, required=False, nargs='+',
226 help='name(s) of queue(s) to operate on. If missing then work on all queues in location.')
227
228 parser.add_argument('--join', '-j', type=str, required=False, nargs='+', dest='join_agent',
229 help='Join given user(s) on given queue(s). Can be "all" to act on all agents.')
230
231 parser.add_argument('--unjoin', '-u', type=str, required=False, nargs='+', dest='unjoin_agent',
232 help='Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.')
233
234 parser.add_argument('--remove', '-r', type=str, required=False, nargs='+', dest='remove_user',
235 help='Remove given agent from given queue(s). Can be "all" to act on all agents.')
236
237 parser.add_argument('--add', '-a', type=str, required=False, nargs='+', dest='add_user',
238 help='Add given users to given queue(s).')
239 parser.add_argument('--dryrun', '-d', required=False, action='store_true',
240 help='Dry run; don\'t apply any changes')
241 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
242
243 args = parser.parse_args()
244
245 # get environment variables from .env; required for integration parameters
246 load_dotenv(env_path())
247
248 tokens = args.token or None
249 if tokens is None:
250 # get tokens from cache or create a new set of tokens using the integration defined in .env
251 tokens = get_tokens()
252
253 async with AsWebexSimpleApi(tokens=tokens) as api:
254 # validate location parameter
255 location_names = args.location or []
256
257 # list of all locations with names matching one of the provided names
258 locations = [loc for loc in await api.locations.list()
259 if not location_names or loc.name in set(location_names)]
260
261 if not location_names:
262 print(f'Considering all {len(locations)} locations')
263
264 # set of names of matching locations
265 found_location_names = set(loc.name for loc in locations)
266
267 # Error message for each location name argument not matching an actual location
268 for location_name in location_names:
269 if location_name not in found_location_names:
270 print(f'location "{location_name}" not found', file=sys.stderr)
271
272 if not locations:
273 print('Found no locations to work on', file=sys.stderr)
274 exit(1)
275
276 # which queues do we need to operate on?
277 location_ids = set(loc.location_id for loc in locations)
278 queue_names = args.queue
279 all_queues = queue_names is None
280 # full list of queues
281 queues = await api.telephony.callqueue.list()
282 # filter based on location parameter
283 queues = [queue for queue in queues
284 if (all_queues or queue.name in queue_names) and queue.location_id in location_ids]
285
286 # len of queue names for nicer output
287 queue_len = max(len(queue.name) for queue in queues)
288
289 # now we can actually go back and re-evaluate the list of locations; for the location length we only need
290 # to consider locations we actually have a target queue in
291 location_ids = set(queue.location_id for queue in queues)
292
293 # max length of location names for nicely formatted output
294 location_len = max(len(loc.name)
295 for loc in locations
296 if loc.location_id in location_ids)
297
298 # get the names for join, unjoin, remove, and add
299 join_agents = args.join_agent or []
300 unjoin_agents = args.unjoin_agent or []
301 remove_users = args.remove_user or []
302 add_users = args.add_user or []
303
304 # validate users; make sure that users exist with the provided names
305 add_users = [u async for u in validate_users(user_names=add_users)]
306
307 # apply actions to all queues
308 await asyncio.gather(*[act_on_queue(queue) for queue in queues])
309
310
311if __name__ == '__main__':
312 # enable DEBUG logging to a file; REST log shows all requests
313 logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
314 filemode='w', level=logging.DEBUG)
315 asyncio.run(main())
Using service APP tokens to access a API endpoints
The script uses service app credentials to get an access token and then use this access token to call Webex Calling APIs.
Source: service_app.py
1#!/usr/bin/env python
2"""
3Demo for Webex service app: using service APP tokens to access a API endpoints
4"""
5import logging
6import sys
7from json import dumps, loads
8from os import getenv
9from os.path import basename, splitext, isfile
10from typing import Optional
11
12from dotenv import load_dotenv
13from yaml import safe_load, safe_dump
14
15from wxc_sdk import WebexSimpleApi
16from wxc_sdk.integration import Integration
17from wxc_sdk.tokens import Tokens
18
19
20def yml_path() -> str:
21 """
22 Get filename for YML file to cache access and refresh token
23 """
24 return f'{splitext(basename(__file__))[0]}.yml'
25
26
27def env_path() -> str:
28 """
29 Get path to .env file to read service app settings from
30 :return:
31 """
32 return f'{splitext(basename(__file__))[0]}.env'
33
34
35def read_tokens_from_file() -> Optional[Tokens]:
36 """
37 Get service app tokens from cache file, return None if cache does not exist
38 """
39 path = yml_path()
40 if not isfile(path):
41 return None
42 try:
43 with open(path, mode='r') as f:
44 data = safe_load(f)
45 tokens = Tokens.model_validate(data)
46 except Exception:
47 return None
48 return tokens
49
50
51def write_tokens_to_file(tokens: Tokens):
52 """
53 Write tokens to cache
54 """
55 with open(yml_path(), mode='w') as f:
56 safe_dump(tokens.model_dump(exclude_none=True), f)
57
58
59def get_access_token() -> Tokens:
60 """
61 Get a new access token using refresh token, service app client id, service app client secret
62 """
63 tokens = Tokens(refresh_token=getenv('SERVICE_APP_REFRESH_TOKEN'))
64 integration = Integration(client_id=getenv('SERVICE_APP_CLIENT_ID'),
65 client_secret=getenv('SERVICE_APP_CLIENT_SECRET'),
66 scopes=[], redirect_url=None)
67 integration.refresh(tokens=tokens)
68 write_tokens_to_file(tokens)
69 return tokens
70
71
72def get_tokens() -> Optional[Tokens]:
73 """
74 Get tokens from cache or create new access token using service app credentials
75 """
76 # try to read from file
77 tokens = read_tokens_from_file()
78 # .. or create new access token using refresh token
79 if tokens is None:
80 tokens = get_access_token()
81 if tokens.remaining < 24 * 60 * 60:
82 tokens = get_access_token()
83 return tokens
84
85
86def service_app():
87 """
88 Use service app access token to call Webex Calling API endpoints
89 :return:
90 """
91 load_dotenv(env_path())
92 # assert that all required environment variable are set
93 if not all(getenv(s) for s in ('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET')):
94 print(
95 f'SERVICE_APP_REFRESH_TOKEN, SERVICE_APP_CLIENT_ID, and SERVICE_APP_CLIENT_SECRET need to be defined in '
96 f'environment or in "{env_path()}"',
97 file=sys.stderr)
98 exit(1)
99
100 # get tokens and dump to console
101 tokens = get_tokens()
102 print(dumps(loads(tokens.json()), indent=2))
103 print()
104 print('scopes:')
105 print('\n'.join(f' * {s}' for s in sorted(tokens.scope.split())))
106
107 # use tokens to access APIs
108 api = WebexSimpleApi(tokens=tokens)
109
110 users = list(api.people.list())
111 print(f'{len(users)} users')
112
113 queues = list(api.telephony.callqueue.list())
114 print(f'{len(queues)} call queues')
115
116
117if __name__ == '__main__':
118 logging.basicConfig(level=logging.DEBUG)
119 service_app()
Pool unassigned TNs on hunt groups to catch calls to unassigned TNs
This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number. The idea is to catch all incoming calls to unassigned TNs and handle them accordingly.
usage: catch_tns.py [-h] [–test] [–location LOCATION] [–token TOKEN][–cleanup]optional arguments:-h, –help show this help message and exit–test test only; don’t actually apply any config–location LOCATION Location to work on–token TOKEN admin access token to use.–cleanup remove all pooling HGs
Source: catch_tns.py
1#!/usr/bin/env python
2"""
3This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number.
4The idea is to catch all incoming calls to unassigned TNs and handle them accordingly
5
6 usage: catch_tns.py [-h] [--test] [--location LOCATION] [--token TOKEN]
7 [--cleanup]
8
9 optional arguments:
10 -h, --help show this help message and exit
11 --test test only; don't actually apply any config
12 --location LOCATION Location to work on
13 --token TOKEN admin access token to use.
14 --cleanup remove all pooling HGs
15"""
16import asyncio
17import logging
18import os
19import sys
20from argparse import ArgumentParser, Namespace
21from collections import defaultdict
22from operator import attrgetter
23from typing import Optional, Union
24
25from dotenv import load_dotenv
26
27from wxc_sdk.as_api import AsWebexSimpleApi
28from wxc_sdk.common import IdAndName, AlternateNumber, RingPattern
29from wxc_sdk.integration import Integration
30from wxc_sdk.scopes import parse_scopes
31from wxc_sdk.telephony import NumberListPhoneNumber
32from wxc_sdk.telephony.forwarding import CallForwarding
33from wxc_sdk.telephony.huntgroup import HuntGroup
34from wxc_sdk.tokens import Tokens
35
36POOL_HG_NAME = 'POOL_'
37
38
39def env_path() -> str:
40 """
41 determine path for .env to load environment variables from; based on name of this file
42 :return: .env file path
43 """
44 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
45
46
47def yml_path() -> str:
48 """
49 determine path of YML file to persist tokens
50 :return: path to YML file
51 :rtype: str
52 """
53 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
54
55
56def build_integration() -> Integration:
57 """
58 read integration parameters from environment variables and create an integration
59 :return: :class:`wxc_sdk.integration.Integration` instance
60 """
61 client_id = os.getenv('INTEGRATION_CLIENT_ID')
62 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
63 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
64 redirect_url = 'http://localhost:6001/redirect'
65 if not all((client_id, client_secret, scopes)):
66 raise ValueError('failed to get integration parameters from environment')
67 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
68 redirect_url=redirect_url)
69
70
71def get_tokens() -> Optional[Tokens]:
72 """
73 Tokens are read from a YML file. If needed an OAuth flow is initiated.
74
75 :return: tokens
76 :rtype: :class:`wxc_sdk.tokens.Tokens`
77 """
78
79 integration = build_integration()
80 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
81 return tokens
82
83
84async def location_id_from_args(api: AsWebexSimpleApi, args: Namespace) -> Optional[str]:
85 """
86 Get location id for --location parameter
87 """
88 if not args.location:
89 return None
90 location = next((loc
91 for loc in await api.locations.list(name=args.location)
92 if loc.name == args.location),
93 None)
94 return location and location.location_id
95
96
97async def cleanup(api: AsWebexSimpleApi, args: Namespace):
98 """
99 clean up (delete) pooling HGs
100 """
101 location_id = await location_id_from_args(api, args)
102 existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location_id)
103 if hg.name.startswith(POOL_HG_NAME)]
104
105 async def delete_one(hg: HuntGroup):
106 if not args.test:
107 await api.telephony.huntgroup.delete_huntgroup(location_id=hg.location_id,
108 huntgroup_id=hg.id)
109 print(f'Deleted HG "{hg.name}" in location "{hg.location_name}"')
110
111 await asyncio.gather(*[delete_one(hg)
112 for hg in existing_hg_list])
113
114
115async def pool_tns_location(api: AsWebexSimpleApi, args: Namespace, location: IdAndName,
116 tns: list[NumberListPhoneNumber]):
117 """
118 Pool unassigned TNs for a given location
119 """
120
121 async def add_to_hg(hg: Union[HuntGroup, str],
122 tns: list[NumberListPhoneNumber]):
123 """
124 Add a bunch of TNs to given HG
125 :param hg: can be an existing HG or a name of a HG to be created
126 :param tns: list of TNs to be added to the HG
127 """
128 if isinstance(hg, str):
129 # create new HG
130 print(f'Creating HG "{hg}" in location "{location.name}" for: '
131 f'{", ".join(tn.phone_number for tn in tns)}')
132 if args.test:
133 return
134 settings = HuntGroup.create(name=hg,
135 phone_number=tns[0].phone_number)
136 new_id = await api.telephony.huntgroup.create(location_id=location.id,
137 settings=settings)
138 # Get details of new HG and also the forwarding settings
139 # * details are needed for the recursive call to add_to_hg .. in case we have more than one TN to add
140 # * ... and forwarding settings are needed b/c we want to set CFwdAll to location's main number
141 details, forwarding = await asyncio.gather(
142 api.telephony.huntgroup.details(location_id=location.id, huntgroup_id=new_id),
143 api.telephony.huntgroup.forwarding.settings(location_id=location.id, feature_id=new_id))
144 forwarding: CallForwarding
145 details: HuntGroup
146
147 # set call forwarding
148 print(f'HG "{hg}" in location "{location.name}": set CFwdAll to {main_number}')
149 forwarding.always.enabled = True
150 forwarding.always.destination = main_number
151 await api.telephony.huntgroup.forwarding.update(location_id=location.id, feature_id=new_id,
152 forwarding=forwarding)
153 if len(tns) > 1:
154 # add the remaining as alternate numbers
155 await add_to_hg(hg=details, tns=tns[1:])
156 return
157
158 # add tns as alternate numbers
159 print(f'HG "{hg.name}" in location "{location.name}", adding: '
160 f'{", ".join(tn.phone_number for tn in tns)}')
161 if args.test:
162 return
163 # weirdly on GET alternate numbers are returned in alternate_number_settings ...
164 alternate_numbers = hg.alternate_number_settings.alternate_numbers
165 alternate_numbers.extend(AlternateNumber(phone_number=tn.phone_number,
166 ring_pattern=RingPattern.normal)
167 for tn in tns)
168 # ... while for an update Wx expects the alternate numbers in an alternate_number attribute
169 update = HuntGroup(alternate_numbers=alternate_numbers)
170 await api.telephony.huntgroup.update(location_id=location.id,
171 huntgroup_id=hg.id,
172 update=update)
173
174 return
175
176 # if the list of available TNs includes the main number then something is wonky. The main number should be owned
177 # by something
178 main_number = next((tn for tn in tns if tn.main_number), None)
179 if main_number is not None:
180 print(f'Error: main number {main_number.phone_number} in location "{location.name}" is not assigned to '
181 f'anything!', file=sys.stderr)
182 return
183
184 # get main number of location
185 main_number = (await api.telephony.location.details(location_id=location.id)).calling_line_id.phone_number
186
187 # get list if "pool" HGs in location
188 existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location.id)
189 if hg.name.startswith(POOL_HG_NAME)]
190
191 # we need the details for all HGs: list() response is missing the alternate number list
192 # get all HG details in parallel
193 existing_hg_list = await asyncio.gather(*[api.telephony.huntgroup.details(location_id=location.id,
194 huntgroup_id=hg.id)
195 for hg in existing_hg_list])
196 existing_hg_list: list[HuntGroup]
197
198 # start with an empty list of tasks
199 tasks = []
200
201 # assign TNs to existing "pool" HGs
202
203 # these are the HGs we can still assign some TNs to
204 hgs_with_open_slots = (hg
205 for hg in existing_hg_list
206 if len(hg.alternate_number_settings.alternate_numbers) < 10)
207 tns.sort(key=attrgetter('phone_number'))
208 while tns:
209 hg_with_open_slots = next(hgs_with_open_slots, None)
210 if hg_with_open_slots is None:
211 # no more HGs we can assign TNs to --> we are done here
212 break
213
214 # add some tns to this hg; hg can have max 10 alternate numbers
215 tns_to_add = 10 - len(hg_with_open_slots.alternate_number_settings.alternate_numbers)
216 tasks.append(add_to_hg(hg=hg_with_open_slots, tns=tns[:tns_to_add]))
217
218 # continue w/ remaining TNs
219 tns = tns[tns_to_add:]
220
221 # assign remaining TNs to new "pool" HGs
222 # .. in batches of 11
223 existing_names = set(hg.name for hg in existing_hg_list)
224 new_hg_names = (name
225 for i in range(1, 1000)
226 if (name := f'{POOL_HG_NAME}{i:03d}') not in existing_names)
227 while tns:
228 tasks.append(add_to_hg(hg=next(new_hg_names), tns=tns[:11]))
229 tns = tns[11:]
230
231 # Now run all tasks
232 await asyncio.gather(*tasks)
233 return
234
235
236async def pool_tns(api: AsWebexSimpleApi, args: Namespace):
237 """
238 Assign unassigned numbers to pool HGs
239 """
240 location_id = await location_id_from_args(api, args)
241
242 # get available TNs. If a location argument was present then limit to that location
243 numbers = await api.telephony.phone_numbers(available=True, location_id=location_id)
244
245 # we need to work on TNs by location ...
246 tns_by_location: dict[str, list[NumberListPhoneNumber]] = defaultdict(list)
247 # ... and we want to collect location information (specifically the name)
248 locations: dict[str, IdAndName] = dict()
249 for tn in numbers:
250 locations[tn.location.id] = tn.location
251 tns_by_location[tn.location.id].append(tn)
252
253 # work on all locations in parallel
254 await asyncio.gather(*[pool_tns_location(api=api, args=args,
255 location=locations[location_id], tns=tns)
256 for location_id, tns in tns_by_location.items()])
257
258
259async def catch_tns():
260 """
261 Main async logic
262 """
263 parser = ArgumentParser()
264 parser.add_argument('--test', required=False, help='test only; don\'t actually apply any config',
265 action='store_true')
266 parser.add_argument('--location', required=False, help='Location to work on', type=str)
267 parser.add_argument('--token', type=str, required=False, help='admin access token to use.')
268 parser.add_argument('--cleanup', required=False, help='remove all pooling HGs', action='store_true')
269 args = parser.parse_args()
270
271 load_dotenv(env_path())
272
273 tokens = args.token or None
274
275 if tokens is None:
276 # get tokens from cache or create a new set of tokens using the integration defined in .env
277 tokens = get_tokens()
278 async with AsWebexSimpleApi(tokens=tokens) as api:
279 if args.cleanup:
280 await cleanup(api, args)
281 else:
282 await pool_tns(api, args)
283
284
285if __name__ == '__main__':
286 logging.basicConfig(level=logging.INFO)
287 asyncio.run(catch_tns())
Downgrade room device workspaces from Webex Calling to free calling
This script looks for workspaces in a given location (or all workspaces) and downgrades them from Webex Calling to free calling.
usage: room_devices.py [-h] [–location LOCATION] [–wsnames WSNAMES] [–test] {show,clear}CLI tool to manage room device calling entitlementspositional arguments:{show,clear} show: show all room devices with their calling settings, clear: remove callinglicense from devicesoptional arguments:-h, –help show this help message and exit–location LOCATION work on devices in given location–wsnames WSNAMES file name of a file with workspace names to operate on; one name per line–test test run only
Source: room_devices.py
1#!/usr/bin/env python
2"""
3 usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}
4
5 CLI tool to manage room device calling entitlements
6
7 positional arguments:
8 {show,clear} show: show all room devices with their calling settings, clear: remove calling
9 license from devices
10
11 optional arguments:
12 -h, --help show this help message and exit
13 --location LOCATION work on devices in given location
14 --wsnames WSNAMES file name of a file with workspace names to operate on; one name per line
15 --test test run only
16
17 The tool reads environment variables from room_devices.env:
18 SERVICE_APP_CLIENT_ID=<clients id of a service app created on developer.webex.com>
19 SERVICE_APP_CLIENT_SECRET=<clients secret of a service app created on developer.webex.com>
20 SERVICE_APP_REFRESH_TOKEN=<refresh token of the service app obtained after the service app has been
21 authorized for an org>
22
23 This information is used to obtain an access token required to authorize API access
24
25 This is a super-set of the scopes the service app needs:
26 * spark-admin:workspaces_write
27 * Identity:one_time_password
28 * identity:placeonetimepassword_create
29 * spark:people_read
30 * spark-admin:workspace_locations_read
31 * spark-admin:workspaces_read
32 * spark:devices_write
33 * spark:devices_read
34 * spark:kms
35 * spark-admin:devices_read
36 * spark-admin:workspace_locations_write
37 * spark-admin:licenses_read
38 * spark-admin:telephony_config_read
39 * spark-admin:telephony_config_write
40 * spark-admin:devices_write
41 * spark-admin:people_read
42
43 More service app details: https://developer.webex.com/docs/service-apps
44
45 Tokens get persisted in room_devices.yml.
46"""
47import asyncio
48import logging
49import sys
50import time
51from argparse import ArgumentParser
52from collections import defaultdict
53from functools import reduce
54from itertools import chain
55from os import getenv, getcwd
56from os.path import splitext, basename, isfile, join
57from typing import Optional
58
59from dotenv import load_dotenv
60from yaml import safe_load, safe_dump
61
62from wxc_sdk.as_api import AsWebexSimpleApi
63from wxc_sdk.base import webex_id_to_uuid
64from wxc_sdk.common import OwnerType
65from wxc_sdk.devices import Device
66from wxc_sdk.integration import Integration
67from wxc_sdk.locations import Location
68from wxc_sdk.telephony import NumberListPhoneNumber
69from wxc_sdk.tokens import Tokens
70from wxc_sdk.workspace_locations import WorkspaceLocation
71from wxc_sdk.workspaces import Workspace, WorkspaceSupportedDevices, CallingType, WorkspaceCalling
72
73
74def yml_path() -> str:
75 """
76 Get filename for YML file to cache access and refresh token
77 """
78 return f'{splitext(basename(__file__))[0]}.yml'
79
80
81def env_path() -> str:
82 """
83 Get path to .env file to read service app settings from
84 """
85 return f'{splitext(basename(__file__))[0]}.env'
86
87
88def read_tokens_from_file() -> Optional[Tokens]:
89 """
90 Get service app tokens from cache file, return None if cache does not exist or read fails
91 """
92 path = yml_path()
93 if not isfile(path):
94 return None
95 try:
96 with open(path, mode='r') as f:
97 data = safe_load(f)
98 tokens = Tokens.model_validate(data)
99 except Exception:
100 return None
101 return tokens
102
103
104def write_tokens_to_file(tokens: Tokens):
105 """
106 Write tokens to cache
107 """
108 with open(yml_path(), mode='w') as f:
109 safe_dump(tokens.model_dump(exclude_none=True), f)
110
111
112def get_access_token() -> Tokens:
113 """
114 Get a new access token using refresh token, service app client id, service app client secret
115 """
116 tokens = Tokens(refresh_token=getenv('SERVICE_APP_REFRESH_TOKEN'))
117 integration = Integration(client_id=getenv('SERVICE_APP_CLIENT_ID'),
118 client_secret=getenv('SERVICE_APP_CLIENT_SECRET'),
119 scopes=[], redirect_url=None)
120 integration.refresh(tokens=tokens)
121 write_tokens_to_file(tokens)
122 return tokens
123
124
125def get_tokens() -> Optional[Tokens]:
126 """
127 Get tokens from cache or create new access token using service app credentials
128 """
129 # try to read from file
130 tokens = read_tokens_from_file()
131 # .. or create new access token using refresh token
132 if tokens is None:
133 tokens = get_access_token()
134 if tokens.remaining < 24 * 60 * 60:
135 tokens = get_access_token()
136 return tokens
137
138
139def main() -> int:
140 """
141 Main code
142 """
143 # parse args
144 parser = ArgumentParser(prog=basename(__file__), description='CLI tool to manage room device calling entitlements')
145 parser.add_argument('operation', choices=['show', 'clear'], help='show: show all room devices with their calling '
146 'settings, clear: remove calling license from '
147 'devices')
148 parser.add_argument('--location', type=str, help='work on devices in given location')
149 parser.add_argument('--wsnames', type=str, help='file name of a file with workspace names to operate on; '
150 'one name per line')
151 parser.add_argument('--test', action='store_true', help='test run only')
152 args = parser.parse_args()
153 operation = args.operation
154 test_run = args.test
155 location = args.location
156 ws_names = args.wsnames
157
158 # get tokens; as an alternative you can just get a developer token from developer.webex.com and use:
159 # tokens = '<developer token from developer.webex.com>'
160 load_dotenv(dotenv_path=env_path())
161 err = ''
162 tokens = None
163 try:
164 tokens = get_tokens()
165 except Exception as e:
166 err = f'{e}'
167 if not tokens:
168 print(f'failed to obtain access tokens: {err}', file=sys.stderr)
169 return 1
170
171 async def as_main() -> int:
172 """
173 Async main to be able to use concurrency
174 """
175
176 async def downgrade_workspace(ws: Workspace):
177 """
178 Downgrade one workspace to free calling
179 """
180
181 def log(s: str, file=sys.stdout):
182 print(f'downgrade workspace "{ws.display_name:{ws_name_len}}": {s}', file=file)
183
184 if ws.calling.type != CallingType.webex:
185 raise ValueError(f'calling type is "{ws.calling.type}", not "{CallingType.webex.value}"')
186 if test_run:
187 log('skipping update, test run only')
188 else:
189 log('updating calling settings')
190 update = ws.model_copy(deep=True)
191 update.calling = WorkspaceCalling(type=CallingType.free)
192 update.workspace_location_id = None
193 update.location_id = None
194 await api.workspaces.update(workspace_id=ws.workspace_id, settings=update)
195 log('done')
196
197 async with AsWebexSimpleApi(tokens=tokens) as api:
198
199 # get list of locations and workspace locations
200 ws_location_list, location_list = await asyncio.gather(
201 api.workspace_locations.list(display_name=location),
202 api.locations.list(name=location))
203 location_list: list[Location]
204 ws_location_list: list[WorkspaceLocation]
205
206 # validate location argument
207 if location:
208 target_location = next((loc for loc in location_list if loc.name == location), None)
209 target_ws_location = next((loc for loc in ws_location_list if loc.display_name == location), None)
210 if not all((target_ws_location, target_location)):
211 print(f'location "{location}" not found', file=sys.stderr)
212 return 1
213 else:
214 target_location = None
215 target_ws_location = None
216
217 # get workspaces, numbers, and devices (in target location)
218 workspaces, numbers, devices = await asyncio.gather(
219 api.workspaces.list(workspace_location_id=target_ws_location and target_ws_location.id),
220 api.telephony.phone_numbers(location_id=target_location and target_location.location_id,
221 owner_type=OwnerType.place),
222 api.devices.list(workspace_location_id=target_ws_location and target_ws_location.id,
223 product_type='roomdesk')
224 )
225 workspaces: list[Workspace]
226 numbers: list[NumberListPhoneNumber]
227 devices: list[Device]
228
229 # only workspaces supporting desk devices
230 workspaces = [ws for ws in workspaces
231 if ws.supported_devices == WorkspaceSupportedDevices.collaboration_devices]
232
233 # if a path to a file with workspace names was given, then filter based on the file contents
234 if ws_names:
235 with open(ws_names, mode='r') as f:
236 workspace_names = set(s_line for line in f if (s_line := line.strip()))
237 workspaces = [ws for ws in workspaces
238 if ws.display_name in workspace_names]
239 if not workspaces:
240 print('No workspaces', file=sys.stderr)
241 return 1
242
243 # only devices in workspaces (no personal devices)
244 devices = [d for d in devices if d.workspace_id is not None]
245
246 # prepare some lookups
247 workspace_locations_by_id: dict[str, WorkspaceLocation] = {wsl.id: wsl for wsl in ws_location_list}
248 numbers_by_workspace_uuid: dict[str, list[NumberListPhoneNumber]] = reduce(
249 lambda r, el: r[webex_id_to_uuid(el.owner.owner_id)].append(el) or r,
250 numbers,
251 defaultdict(list))
252 devices_by_workspace_id: dict[str, list[Device]] = reduce(
253 lambda r, el: r[el.workspace_id].append(el) or r,
254 devices,
255 defaultdict(list))
256
257 # sort workspaces by workspace location name and workspace name; workspace location can be unset
258 workspaces.sort(key=lambda ws: ('' if not ws.workspace_location_id else
259 workspace_locations_by_id[ws.workspace_location_id].display_name,
260 ws.display_name))
261 # some field lengths for nicer output
262 wsl_name_len = max(len(wsl.display_name) for wsl in ws_location_list)
263 ws_name_len = max(len(ws.display_name) for ws in workspaces)
264
265 # ... chain([1], ...) to avoid max() on empty sequence
266 pn_len = max(chain([1], (len(n.phone_number) for n in numbers if n.phone_number)))
267 ext_len = max(chain([1], (len(n.extension) for n in numbers if n.extension)))
268
269 # print workspaces with workspace locations, numbers, and devices
270 for workspace in workspaces:
271 if not workspace.workspace_location_id:
272 wsl_name = ''
273 else:
274 wsl_name = workspace_locations_by_id[workspace.workspace_location_id].display_name
275 print(f'workspace location "{wsl_name:{wsl_name_len}}", '
276 f'workspace "{workspace.display_name:{ws_name_len}}"')
277
278 # are there any numbers in that workspace?
279 numbers = numbers_by_workspace_uuid.get(webex_id_to_uuid(workspace.workspace_id))
280 if numbers:
281 for number in numbers:
282 print(f' number: {number.phone_number or "-" * pn_len:{pn_len}}/'
283 f'{number.extension or "-" * ext_len:{ext_len}}')
284 devices = devices_by_workspace_id.get(workspace.workspace_id)
285 if devices:
286 for device in devices:
287 print(f' device: {device.display_name}')
288
289 if operation == 'show':
290 # we are done here
291 return 0
292
293 # now we want to downgrade (disable calling) on all workspaces
294 print()
295 print('Starting downgrade')
296 results = await asyncio.gather(*[downgrade_workspace(ws) for ws in workspaces], return_exceptions=True)
297
298 # print errors ... if any
299 for ws, result in zip(workspaces, results):
300 ws: Workspace
301 if isinstance(result, Exception):
302 print(f'Failed to downgrade "{ws.display_name:{ws_name_len}}": {result}', file=sys.stderr)
303
304 if any(isinstance(r, Exception) for r in results):
305 return 1
306 return 0
307
308 return asyncio.run(as_main())
309
310
311if __name__ == '__main__':
312 root_logger = logging.getLogger()
313 h = logging.StreamHandler(stream=sys.stderr)
314 h.setLevel(logging.INFO)
315 root_logger.setLevel(logging.INFO)
316 root_logger.addHandler(h)
317
318 # log REST API interactions to file
319 file_fmt = logging.Formatter(fmt='%(asctime)s %(levelname)s %(message)s')
320 file_fmt.converter = time.gmtime
321
322 rest_log_name = join(getcwd(), f'{splitext(basename(__file__))[0]}.log')
323 rest_log_handler = logging.FileHandler(rest_log_name, mode='w')
324 rest_log_handler.setLevel(logging.DEBUG)
325 rest_log_handler.setFormatter(file_fmt)
326 rest_logger = logging.getLogger('wxc_sdk.as_rest')
327 rest_logger.setLevel(logging.DEBUG)
328 rest_logger.addHandler(rest_log_handler)
329
330 exit(main())