Examples
Code examples can be found in the examples directory on GitHub
Also take a look at the unit tests in the tests directory.
Get all calling users
Source: calling_users.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org
5"""
6
7from dotenv import load_dotenv
8
9from wxc_sdk import WebexSimpleApi
10
11load_dotenv()
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))
Get all calling users (async variant)
Source: calling_users_async.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org using the (experimental) async API
5"""
6import asyncio
7import time
8
9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv()
14
15
16async def get_calling_users():
17 """
18 Get details of all calling enabled users by:
19 1) getting all calling users
20 2) collecting all users that have a calling license
21 3) getting details for all users
22 """
23 async with AsWebexSimpleApi(concurrent_requests=40) as api:
24 print('Collecting calling licenses')
25 calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26 if lic.webex_calling)
27
28 # get users with a calling license
29 calling_users = [user async for user in api.people.list_gen()
30 if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31 print(f'{len(calling_users)} users:')
32 print('\n'.join(user.display_name for user in calling_users))
33
34 # get details for all users
35 start = time.perf_counter()
36 details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37 for user in calling_users])
38 expired = time.perf_counter() - start
39 print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42asyncio.run(get_calling_users())
Get all users without phones
Source: users_wo_devices.py
1#!/usr/bin/env python
2"""
3Get calling users without devices
4"""
5import asyncio
6import logging
7import os
8from itertools import chain
9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import Tokens
14from wxc_sdk.as_api import AsWebexSimpleApi
15from wxc_sdk.common import UserType
16from wxc_sdk.integration import Integration
17from wxc_sdk.person_settings import PersonDevicesResponse
18from wxc_sdk.scopes import parse_scopes
19
20
21def env_path() -> str:
22 """
23 determine path for .env to load environment variables from; based on name of this file
24 :return: .env file path
25 """
26 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
27
28
29def yml_path() -> str:
30 """
31 determine path of YML file to persist tokens
32 :return: path to YML file
33 :rtype: str
34 """
35 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
36
37
38def build_integration() -> Integration:
39 """
40 read integration parameters from environment variables and create an integration
41 :return: :class:`wxc_sdk.integration.Integration` instance
42 """
43 client_id = os.getenv('INTEGRATION_CLIENT_ID')
44 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
45 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
46 redirect_url = 'http://localhost:6001/redirect'
47 if not all((client_id, client_secret, scopes)):
48 raise ValueError('failed to get integration parameters from environment')
49 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
50 redirect_url=redirect_url)
51
52
53def get_tokens() -> Optional[Tokens]:
54 """
55 Tokens are read from a YML file. If needed an OAuth flow is initiated.
56
57 :return: tokens
58 :rtype: :class:`wxc_sdk.tokens.Tokens`
59 """
60
61 integration = build_integration()
62 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
63 return tokens
64
65
66async def main():
67 # get environment variables from .env; required for integration parameters
68 load_dotenv(env_path())
69
70 # get tokens from cache or create a new set of tokens using the integration defined in .env
71 tokens = get_tokens()
72
73 async with AsWebexSimpleApi(tokens=tokens) as api:
74 # get calling users
75 calling_users = [user for user in await api.people.list(calling_data=True)
76 if user.location_id]
77
78 # get device info for all users
79 user_device_infos = await asyncio.gather(*[api.person_settings.devices(person_id=user.person_id)
80 for user in calling_users])
81 user_device_infos: list[PersonDevicesResponse]
82 users_wo_devices = [user for user, device_info in zip(calling_users, user_device_infos)
83 if not device_info.devices]
84
85 # alternatively we can collect all device owners
86 device_owner_ids = set(owner.owner_id
87 for device in chain.from_iterable(udi.devices for udi in user_device_infos)
88 if (owner := device.owner) and owner.owner_type == UserType.people)
89
90 # ... and collect all other users (the ones not owning a device)
91 users_not_owning_a_device = [user for user in calling_users
92 if user.person_id not in device_owner_ids]
93
94 users_wo_devices.sort(key=lambda u: u.display_name)
95 print(f'{len(users_wo_devices)} users w/o devices:')
96 print('\n'.join(f'{user.display_name} ({user.emails[0]})'
97 for user in users_wo_devices))
98
99 print()
100 users_not_owning_a_device.sort(key=lambda u: u.display_name)
101 print(f'{len(users_not_owning_a_device)} users not owning a device:')
102 print('\n'.join(f'{user.display_name} ({user.emails[0]})'
103 for user in users_not_owning_a_device))
104
105
106if __name__ == '__main__':
107 # enable DEBUG logging to a file; REST log shows all requests
108 logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
109 filemode='w', level=logging.DEBUG, format='%(asctime)s %(threadName)s %(message)s')
110 asyncio.run(main())
Default call forwarding settings for all users
This example start with the list of all calling users and then calls
wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure()
for each user. To speed up things a
ThreadPoolExecutor
is used and all update operations are scheduled for execution by the thread pool.
Source: reset_call_forwarding.py
1#!/usr/bin/env python
2"""
3Example script
4Reset call forwarding to default for all users in the org
5"""
6import asyncio
7import logging
8import time
9
10from dotenv import load_dotenv
11
12from wxc_sdk.all_types import PersonForwardingSetting
13from wxc_sdk.as_api import AsWebexSimpleApi
14
15
16async def main():
17 # load environment. SDk fetches the access token from WEBEX_ACCESS_TOKEN environment variable
18 load_dotenv()
19
20 logging.basicConfig(level=logging.INFO)
21
22 # set to DEBUG to see the actual requests
23 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
24
25 async with AsWebexSimpleApi() as api:
26
27 # get all calling users
28 start = time.perf_counter_ns()
29 calling_users = [user for user in await api.people.list(calling_data=True)
30 if user.location_id]
31 print(f'Got {len(calling_users)} calling users in '
32 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
33
34 # set call forwarding to default for all users
35 # default call forwarding settings
36 forwarding = PersonForwardingSetting.default()
37
38 # schedule update for each user and wait for completion
39 start = time.perf_counter_ns()
40 await asyncio.gather(*[api.person_settings.forwarding.configure(person_id=user.person_id,
41 forwarding=forwarding)
42 for user in calling_users])
43 print(f'Reset call forwarding to default for {len(calling_users)} users in '
44 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
45
46if __name__ == '__main__':
47 asyncio.run(main())
Modify number of rings configuration for users read from CSV
Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.
For all these users a VM setting is updated and the results are written to a CSV for further processing.
Source: modify_voicemail.py
1#!/usr/bin/env python
2"""
3Example Script
4Modifying number of rings configuration in voicemail settings
5Run -> python3 modify_voicemail.py modify_voicemail.csv
6"""
7import csv
8import sys
9import traceback
10from concurrent.futures import ThreadPoolExecutor
11
12from dotenv import load_dotenv
13
14from wxc_sdk import WebexSimpleApi
15from wxc_sdk.all_types import *
16
17VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
18
19# loading environment variables - use .env file for development
20load_dotenv()
21
22
23def update_vm_settings():
24 """
25 actually update VM settings for all users present in input CSV
26 """
27 api = WebexSimpleApi()
28 final_report = []
29 mail_ids = []
30 # using wxc_sdk.people.PeopleApi.list to iterate over persons
31 # Parameter calling_data needs to be set to true to gat calling specific information
32 # calling users have the attribute location_id set
33 calling_users = [user for user in api.people.list(calling_data=True)
34 if user.location_id]
35 print(f'{len(calling_users)} users:')
36 print('\n'.join(user.display_name for user in calling_users))
37
38 # get CSV file name from command line
39 with open(str(sys.argv[1]), 'r') as csv_file:
40 reader = csv.DictReader(csv_file)
41 # read all records from CSV. Ony consider records w/o error
42 for col in reader:
43 if not col['ERROR']:
44 # collect email address from USERNAME column for further processing
45 mail_ids.append(col['USERNAME'])
46 else:
47 final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49 # work on calling users that have an email address that we read from the CSV
50 filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52 print("\nCalling Users in CI - Count ", len(calling_users))
53 print("Mail IDs from input file after removing error columns - Count ", len(mail_ids))
54 print("FilteredUsers Users - Count", len(filteredUsers))
55
56 def set_number_of_rings(user: Person):
57 """
58 Read VM config for a user
59 :param user: user to update
60 """
61 try:
62 # shortcut
63 vm = api.person_settings.voicemail
64
65 # Read Current configuration
66 vm_settings = vm.read(person_id=user.person_id)
67 print(f'\n Existing Configuration: {vm_settings} ')
68
69 # Modify number of rings value
70 vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71 vm.configure(person_id=user.person_id, settings=vm_settings)
72 # Read configuration after changes
73 vm_settings = vm.read(person_id=user.person_id)
74 print(f'\n New Configuration: {vm_settings} ')
75 final_report.append((user.display_name, 'SUCCESS'))
76 except Exception as e:
77 final_report.append((user.display_name, 'FAILURE'))
78 print("type error: " + str(e))
79 print(traceback.format_exc())
80 return
81
82 # Modify settings for the filtered users
83 with ThreadPoolExecutor() as pool:
84 list(pool.map(lambda user: set_number_of_rings(user),
85 filteredUsers))
86
87 print(final_report)
88 with open('output.csv', 'w') as f:
89 write = csv.writer(f)
90 write.writerow(["USERNAME", "STATUS"])
91 write.writerows(final_report)
92
93
94if __name__ == '__main__':
95 update_vm_settings()
Holiday schedule w/ US national holidays for all US locations
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5"""
6
7import logging
8from collections import defaultdict
9from concurrent.futures import ThreadPoolExecutor
10from datetime import date
11from threading import Lock
12from typing import List
13
14from dotenv import load_dotenv
15
16from calendarific import CalendarifiyApi, Holiday
17from wxc_sdk import WebexSimpleApi
18from wxc_sdk.locations import Location
19from wxc_sdk.all_types import ScheduleType, Event, Schedule
20
21log = logging.getLogger(__name__)
22
23# a lock per location to protect observe_in_location()
24location_locks: dict[str, Lock] = defaultdict(Lock)
25
26# Use parallel threads for provisioning?
27USE_THREADING = True
28
29# True: delete holiday schedule instead of creating one
30CLEAN_UP = False
31
32# first and last year for which to create public holiday events
33FIRST_YEAR = 2022
34LAST_YEAR = 2024
35
36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
37
38
39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
40 """
41 create/update a "National Holiday" schedule in one location
42
43 :param api: Webex api
44 :type api: WebexSimpleApi
45 :param location: location to work on
46 :type location: Location
47 :param holidays: list of holidays to observe
48 :type holidays: List[Holiday]
49 """
50 # there should always only one thread messing with the holiday schedule of a location
51 with location_locks[location.location_id]:
52 year = holidays[0].date.year
53 schedule_name = 'National Holidays'
54
55 # shortcut
56 ats = api.telephony.schedules
57
58 # existing "National Holiday" schedule or None
59 schedule = next((schedule
60 for schedule in ats.list(obj_id=location.location_id,
61 schedule_type=ScheduleType.holidays,
62 name=schedule_name)
63 if schedule.name == schedule_name),
64 None)
65 if CLEAN_UP:
66 if schedule:
67 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
68 ats.delete_schedule(obj_id=location.location_id,
69 schedule_type=ScheduleType.holidays,
70 schedule_id=schedule.schedule_id)
71 return
72 if schedule:
73 # we need the details: list response doesn't have events
74 schedule = ats.details(obj_id=location.location_id,
75 schedule_type=ScheduleType.holidays,
76 schedule_id=schedule.schedule_id)
77 # create list of desired schedule entries
78 # * one per holiday
79 # * only future holidays
80 # * not on a Sunday
81 today = date.today()
82 events = [Event(name=f'{holiday.name} {holiday.date.year}',
83 start_date=holiday.date,
84 end_date=holiday.date,
85 all_day_enabled=True)
86 for holiday in holidays
87 if holiday.date >= today and holiday.date.weekday() != 6]
88
89 if not schedule:
90 # create new schedule
91 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
92 if not events:
93 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
94 return
95 schedule = Schedule(name=schedule_name,
96 schedule_type=ScheduleType.holidays,
97 events=events)
98 log.debug(
99 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100 f'events')
101 schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103 return
104
105 # update existing schedule
106 with ThreadPoolExecutor() as pool:
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_THREADING:
114 list(pool.map(
115 lambda event: ats.event_delete(obj_id=location.location_id,
116 schedule_type=ScheduleType.holidays,
117 schedule_id=schedule.schedule_id,
118 event_id=event.event_id),
119 to_delete))
120 else:
121 for event in to_delete:
122 ats.event_delete(obj_id=location.location_id,
123 schedule_type=ScheduleType.holidays,
124 schedule_id=schedule.schedule_id,
125 event_id=event.event_id)
126
127 # add events which don't exist yet
128 existing_dates = set(event.start_date
129 for event in schedule.events)
130 to_add = [event
131 for event in events
132 if event.start_date not in existing_dates]
133 if not to_add:
134 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
135 return
136 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
137 if USE_THREADING:
138 list(pool.map(
139 lambda event: ats.event_create(
140 obj_id=location.location_id,
141 schedule_type=ScheduleType.holidays,
142 schedule_id=schedule.schedule_id,
143 event=event),
144 to_add))
145 else:
146 for event in to_add:
147 ats.event_create(
148 obj_id=location.location_id,
149 schedule_type=ScheduleType.holidays,
150 schedule_id=schedule.schedule_id,
151 event=event)
152 log.info(f'observe_in_location({location.name}, {year}): done.')
153 return
154
155
156def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
157 year: int = None):
158 """
159 US national holidays for given locations
160
161 :param api: Webex api
162 :type api: WebexSimpleApi
163 :param locations: list of locations in which US national holidays should be observed
164 :type locations: List[Location]
165 :param year: year for national holidays. Default: current year
166 :type year: int
167 """
168 # default: this year
169 year = year or date.today().year
170
171 # get national holidays for specified year
172 holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
173
174 # update holiday schedule for each location
175 with ThreadPoolExecutor() as pool:
176 if USE_THREADING:
177 list(pool.map(
178 lambda location: observe_in_location(api=api, location=location, holidays=holidays),
179 locations))
180 else:
181 for location in locations:
182 observe_in_location(api=api, location=location, holidays=holidays)
183 return
184
185
186if __name__ == '__main__':
187 # read dotenv which has some environment variables like Webex API token and Calendarify
188 # API key.
189 load_dotenv()
190
191 # enable logging
192 logging.basicConfig(level=logging.DEBUG,
193 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
194 logging.getLogger('urllib3').setLevel(logging.INFO)
195 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
196
197 # the actual action
198 with WebexSimpleApi(concurrent_requests=5) as wx_api:
199 # get all US locations
200 log.info('Getting locations...')
201 us_locations = [location
202 for location in wx_api.locations.list()
203 if location.address.country == 'US']
204
205 # set up location locks
206 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207 list(location_locks[loc.location_id] for loc in us_locations)
208
209 # create national holiday schedule for given year(s) and locations
210 if USE_THREADING:
211 with ThreadPoolExecutor() as pool:
212 list(pool.map(
213 lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
214 range(FIRST_YEAR, LAST_YEAR + 1)))
215 else:
216 for year in range(FIRST_YEAR, LAST_YEAR + 1):
217 observe_national_holidays(api=wx_api, year=year, locations=us_locations)
Holiday schedule w/ US national holidays for all US locations (async variant)
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays_async.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5
6Using the asyc SDK variant
7"""
8import asyncio
9import functools
10import logging
11from collections import defaultdict
12from datetime import date
13from typing import List
14
15from dotenv import load_dotenv
16
17from calendarific import CalendarifiyApi, Holiday
18from wxc_sdk.all_types import ScheduleType, Event, Schedule
19from wxc_sdk.as_api import AsWebexSimpleApi
20from wxc_sdk.locations import Location
21
22log = logging.getLogger(__name__)
23
24# a lock per location to protect observe_in_location()
25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
26
27# Use parallel tasks for provisioning?
28USE_TASKS = True
29
30# True: delete holiday schedule instead of creating one
31CLEAN_UP = False
32
33# first and last year for which to create public holiday events
34FIRST_YEAR = 2022
35LAST_YEAR = 2024
36
37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
38
39
40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
41 """
42 create/update a "National Holiday" schedule in one location
43
44 :param api: Webex api
45 :type api: WebexSimpleApi
46 :param location: location to work on
47 :type location: Location
48 :param holidays: list of holidays to observe
49 :type holidays: List[Holiday]
50 """
51 # there should always only one thread messing with the holiday schedule of a location
52 async with location_locks[location.location_id]:
53 year = holidays[0].date.year
54 schedule_name = 'National Holidays'
55
56 # shortcut
57 ats = api.telephony.schedules
58
59 # existing "National Holiday" schedule or None
60 schedule = next((schedule
61 for schedule in await ats.list(obj_id=location.location_id,
62 schedule_type=ScheduleType.holidays,
63 name=schedule_name)
64 if schedule.name == schedule_name),
65 None)
66 if CLEAN_UP:
67 if schedule:
68 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
69 await ats.delete_schedule(obj_id=location.location_id,
70 schedule_type=ScheduleType.holidays,
71 schedule_id=schedule.schedule_id)
72 return
73 if schedule:
74 # we need the details: list response doesn't have events
75 schedule = await ats.details(obj_id=location.location_id,
76 schedule_type=ScheduleType.holidays,
77 schedule_id=schedule.schedule_id)
78 # create list of desired schedule entries
79 # * one per holiday
80 # * only future holidays
81 # * not on a Sunday
82 today = date.today()
83 events = [Event(name=f'{holiday.name} {holiday.date.year}',
84 start_date=holiday.date,
85 end_date=holiday.date,
86 all_day_enabled=True)
87 for holiday in holidays
88 if holiday.date >= today and holiday.date.weekday() != 6]
89
90 if not schedule:
91 # create new schedule
92 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
93 if not events:
94 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
95 return
96 schedule = Schedule(name=schedule_name,
97 schedule_type=ScheduleType.holidays,
98 events=events)
99 log.debug(
100 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101 f'events')
102 schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104 return
105
106 # update existing schedule
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_TASKS:
114 await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115 schedule_type=ScheduleType.holidays,
116 schedule_id=schedule.schedule_id,
117 event_id=event.event_id)
118 for event in to_delete])
119 else:
120 for event in to_delete:
121 await ats.event_delete(obj_id=location.location_id,
122 schedule_type=ScheduleType.holidays,
123 schedule_id=schedule.schedule_id,
124 event_id=event.event_id)
125
126 # add events which don't exist yet
127 existing_dates = set(event.start_date
128 for event in schedule.events)
129 to_add = [event
130 for event in events
131 if event.start_date not in existing_dates]
132 if not to_add:
133 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
134 return
135 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
136 if USE_TASKS:
137 await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
138 schedule_type=ScheduleType.holidays,
139 schedule_id=schedule.schedule_id,
140 event=event)
141 for event in to_add])
142 else:
143 for event in to_add:
144 await ats.event_create(obj_id=location.location_id,
145 schedule_type=ScheduleType.holidays,
146 schedule_id=schedule.schedule_id,
147 event=event)
148 log.info(f'observe_in_location({location.name}, {year}): done.')
149 return
150
151
152async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
153 year: int = None):
154 """
155 US national holidays for given locations
156
157 :param api: Webex api
158 :type api: WebexSimpleApi
159 :param locations: list of locations in which US national holidays should be observed
160 :type locations: List[Location]
161 :param year: year for national holidays. Default: current year
162 :type year: int
163 """
164 # default: this year
165 year = year or date.today().year
166
167 # get national holidays for specified year
168 loop = asyncio.get_running_loop()
169 # avoid sync all:
170 # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171 holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
172 country='US', year=year, holiday_type='national'))
173
174 # update holiday schedule for each location
175 if USE_TASKS:
176 await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
177 for location in locations])
178 else:
179 for location in locations:
180 await observe_in_location(api=api, location=location, holidays=holidays)
181 return
182
183
184if __name__ == '__main__':
185 # read dotenv which has some environment variables like Webex API token and Calendarify
186 # API key.
187 load_dotenv()
188
189 # enable logging
190 logging.basicConfig(level=logging.DEBUG,
191 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192 logging.getLogger('urllib3').setLevel(logging.INFO)
193 logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
194
195 # the actual action
196 async def do_provision():
197
198 async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
199 # get all US locations
200 log.info('Getting locations...')
201 us_locations = [location
202 for location in await wx_api.locations.list()
203 if location.address.country == 'US']
204
205 # set up location locks
206 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207 list(location_locks[loc.location_id] for loc in us_locations)
208
209 # create national holiday schedule for given year(s) and locations
210 if USE_TASKS:
211 await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
212 for year in range(FIRST_YEAR, LAST_YEAR + 1)])
213 else:
214 for year in range(FIRST_YEAR, LAST_YEAR + 1):
215 await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
216
217 asyncio.run(do_provision())
Persist tokens and obtain new tokens interactively
A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.
This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python
webbrowser module and calling the open()
method with the authorization URL of a given integration to open that
URL in the system web browser. The user can then authenticate and grant access to the integration. In the last
step of a successful authorization flow the web browser is redirected to the redirect_url
of the
integration.
The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect
.
This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.
The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID
,
TOKEN_INTEGRATION_CLIENT_SECRET
, TOKEN_INTEGRATION_CLIENT_SCOPES
). These variables can also be defined in
get_tokens.env
in the current directory:
# rename this to get_tokens.env and set values
# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=
# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=
# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"
The sample code persists the tokens in get_tokens.yml
in the current directory. On startup the sample code tries
to read tokens from that file. If needed a new access token is obtained using the refresh token.
An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml
Source: get_tokens.py
1#!/usr/bin/env python
2"""
3Example script
4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
5the local web browser
6"""
7import logging
8import os
9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import WebexSimpleApi
14from wxc_sdk.integration import Integration
15from wxc_sdk.scopes import parse_scopes
16from wxc_sdk.tokens import Tokens
17
18log = logging.getLogger(__name__)
19
20
21def env_path() -> str:
22 """
23 determine path for .env to load environment variables from
24
25 :return: .env file path
26 """
27 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
28
29
30def yml_path() -> str:
31 """
32 determine path of YML file to persist tokens
33
34 :return: path to YML file
35 :rtype: str
36 """
37 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
38
39
40def build_integration() -> Integration:
41 """
42 read integration parameters from environment variables and create an integration
43
44 :return: :class:`wxc_sdk.integration.Integration` instance
45 """
46 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
47 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
48 scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
49 redirect_url = 'http://localhost:6001/redirect'
50 if not all((client_id, client_secret, scopes)):
51 raise ValueError('failed to get integration parameters from environment')
52 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
53 redirect_url=redirect_url)
54
55
56def get_tokens() -> Optional[Tokens]:
57 """
58
59 Tokens are read from a YML file. If needed an OAuth flow is initiated.
60
61 :return: tokens
62 :rtype: :class:`wxc_sdk.tokens.Tokens`
63 """
64
65 integration = build_integration()
66 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
67 return tokens
68
69
70logging.basicConfig(level=logging.DEBUG)
71
72# load environment variables from .env
73path = env_path()
74log.info(f'reading {path}')
75load_dotenv(env_path())
76
77tokens = get_tokens()
78
79# use the tokens to get identity of authenticated user
80api = WebexSimpleApi(tokens=tokens)
81me = api.people.me()
82print(f'authenticated as {me.display_name} ({me.emails[0]}) ')
Read/update call intercept settings of a user
usage: call_intercept.py [-h] [–token TOKEN] user_email [{on,off}]positional arguments:user_email email address of user{on,off} operation to applyoptions:-h, –help show this help message and exit–token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a .env
file
Source: call_intercept.py
1#!/usr/bin/env python
2"""
3Script to read/update call intercept settings of a calling user.
4
5The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
6obtains tokens via an OAuth flow.
7
8 usage: call_intercept.py [-h] [--token TOKEN] user_email [{on,off}]
9
10 positional arguments:
11 user_email email address of user
12 {on,off} operation to apply
13
14 options:
15 -h, --help show this help message and exit
16 --token TOKEN admin access token to use
17"""
18import argparse
19import logging
20import os
21import re
22import sys
23from json import loads
24from typing import Optional
25
26from dotenv import load_dotenv
27from wxc_sdk import WebexSimpleApi
28from wxc_sdk.integration import Integration
29from wxc_sdk.person_settings.call_intercept import InterceptSetting
30from wxc_sdk.rest import RestError
31from wxc_sdk.scopes import parse_scopes
32from wxc_sdk.tokens import Tokens
33from yaml import safe_dump, safe_load
34
35log = logging.getLogger(__name__)
36
37
38def env_path() -> str:
39 """
40 determine path for .env to load environment variables from
41
42 :return: .env file path
43 """
44 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
45
46
47def yml_path() -> str:
48 """
49 determine path of YML file to persist tokens
50
51 :return: path to YML file
52 :rtype: str
53 """
54 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
55
56
57def build_integration() -> Integration:
58 """
59 read integration parameters from environment variables and create an integration
60
61 :return: :class:`wxc_sdk.integration.Integration` instance
62 """
63 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
64 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
65 scopes = os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES')
66 if scopes:
67 scopes = parse_scopes(scopes)
68 if not all((client_id, client_secret, scopes)):
69 raise ValueError('failed to get integration parameters from environment')
70 redirect_url = 'http://localhost:6001/redirect'
71 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
72 redirect_url=redirect_url)
73
74
75def get_tokens() -> Optional[Tokens]:
76 """
77
78 Tokens are read from a YML file. If needed an OAuth flow is initiated.
79
80 :return: tokens
81 :rtype: :class:`wxc_sdk.tokens.Tokens`
82 """
83
84 def write_tokens(tokens_to_cache: Tokens):
85 with open(yml_path(), mode='w') as f:
86 safe_dump(loads(tokens_to_cache.json()), f)
87 return
88
89 def read_tokens() -> Optional[Tokens]:
90 try:
91 with open(yml_path(), mode='r') as f:
92 data = safe_load(f)
93 tokens_read = Tokens.parse_obj(data)
94 except Exception as e:
95 log.info(f'failed to read tokens from file: {e}')
96 tokens_read = None
97 return tokens_read
98
99 integration = build_integration()
100 tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
101 write_to_cache=write_tokens)
102 return tokens
103
104
105RE_EMAIL = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
106
107
108def email_type(value):
109 if not RE_EMAIL.match(value):
110 raise argparse.ArgumentTypeError(f"'{value}' is not a valid email")
111 return value
112
113
114def main():
115 """
116 where the magic happens
117 """
118 # read .env file with the settings for the integration to be used to obtain tokens
119 load_dotenv(env_path())
120
121 parser = argparse.ArgumentParser()
122 parser.add_argument('user_email', type=email_type, help='email address of user')
123 parser.add_argument('on_off', choices=['on', 'off'], nargs='?', help='operation to apply')
124 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
125 args = parser.parse_args()
126
127 if args.token:
128 tokens = args.token
129 elif (tokens := os.getenv('WEBEX_ACCESS_TOKEN')) is None:
130 tokens = get_tokens()
131
132 if not tokens:
133 print('Failed to get tokens', file=sys.stderr)
134 exit(1)
135
136 # set level to DEBUG to see debug of REST requests
137 logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
138
139 with WebexSimpleApi(tokens=tokens) as api:
140 # get user
141 email = args.user_email.lower()
142 user = next((user
143 for user in api.people.list(email=email)
144 if user.emails[0] == email), None)
145 if user is None:
146 print(f'User "{email}" not found', file=sys.stderr)
147 exit(1)
148
149 # display call intercept status
150 try:
151 intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
152 except RestError as e:
153 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
154 exit(1)
155
156 print('on' if intercept.enabled else 'off')
157 if args.on_off:
158 # action: turn on/off
159 intercept = InterceptSetting.default()
160 intercept.enabled = args.on_off == 'on'
161 try:
162 api.person_settings.call_intercept.configure(person_id=user.person_id,
163 intercept=intercept)
164 except RestError as e:
165 print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
166 exit(1)
167
168 # read call intercept again
169 try:
170 intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
171 except RestError as e:
172 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
173 exit(1)
174
175 # display state after update
176 print(f"set to {'on' if intercept.enabled else 'off'}")
177
178 exit(0)
179
180
181if __name__ == '__main__':
182 main()
Read/update call queue agent join states
usage: queue_helper.py [-h] [–location LOCATION [LOCATION …]][–queue QUEUE [QUEUE …]][–join JOIN_AGENT [JOIN_AGENT …]][–unjoin UNJOIN_AGENT [UNJOIN_AGENT …]][–remove REMOVE_USER [REMOVE_USER …]][–add ADD_USER [ADD_USER …]] [–dryrun][–token TOKEN]Modify call queue settings from the CLIoptional arguments:-h, –help show this help message and exit–location LOCATION [LOCATION …], -l LOCATION [LOCATION …]name of location to work on. If missing then work onall locations.–queue QUEUE [QUEUE …], -q QUEUE [QUEUE …]name(s) of queue(s) to operate on. If missing thenwork on all queues in location.–join JOIN_AGENT [JOIN_AGENT …], -j JOIN_AGENT [JOIN_AGENT …]Join given user(s) on given queue(s). Can be “all” toact on all agents.–unjoin UNJOIN_AGENT [UNJOIN_AGENT …], -u UNJOIN_AGENT [UNJOIN_AGENT …]Unjoin given agent(s) from given queue(s). Can be“all” to act on all agents.–remove REMOVE_USER [REMOVE_USER …], -r REMOVE_USER [REMOVE_USER …]Remove given agent from given queue(s). Can be “all”to act on all agents.–add ADD_USER [ADD_USER …], -a ADD_USER [ADD_USER …]Add given users to given queue(s).–dryrun, -d Dry run; don’t apply any changes–token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables
which can be set in a queue_helper.env
file in the current directory.
Source: queue_helper.py
1#!/usr/bin/env python
2"""
3usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]] [--queue QUEUE [QUEUE ...]]
4 [--join JOIN_AGENT [JOIN_AGENT ...]] [--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
5 [--remove REMOVE_USER [REMOVE_USER ...]] [--add ADD_USER [ADD_USER ...]] [--dryrun]
6 [--token TOKEN]
7
8Modify call queue settings from the CLI
9
10optional arguments:
11 -h, --help show this help message and exit
12 --location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
13 name of location to work on. If missing then work on all locations.
14 --queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
15 name(s) of queue(s) to operate on. If missing then work on all queues in location.
16 --join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
17 Join given user(s) on given queue(s). Can be "all" to act on all agents.
18 --unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
19 Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.
20 --remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
21 Remove given agent from given queue(s). Can be "all" to act on all agents.
22 --add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
23 Add given users to given queue(s).
24 --dryrun, -d Dry run; don't apply any changes
25 --token TOKEN admin access token to use
26"""
27import asyncio
28import logging
29import os
30import sys
31from argparse import ArgumentParser
32from collections.abc import AsyncGenerator
33from typing import Optional
34
35from dotenv import load_dotenv
36
37from wxc_sdk import Tokens
38from wxc_sdk.as_api import AsWebexSimpleApi
39from wxc_sdk.common import UserType
40from wxc_sdk.integration import Integration
41from wxc_sdk.people import Person
42from wxc_sdk.scopes import parse_scopes
43from wxc_sdk.telephony.callqueue import CallQueue
44from wxc_sdk.telephony.hg_and_cq import Agent
45
46
47def agent_name(agent: Agent) -> str:
48 return f'{agent.first_name} {agent.last_name}'
49
50
51def env_path() -> str:
52 """
53 determine path for .env to load environment variables from; based on name of this file
54 :return: .env file path
55 """
56 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
57
58
59def yml_path() -> str:
60 """
61 determine path of YML file to persist tokens
62 :return: path to YML file
63 :rtype: str
64 """
65 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
66
67
68def build_integration() -> Integration:
69 """
70 read integration parameters from environment variables and create an integration
71 :return: :class:`wxc_sdk.integration.Integration` instance
72 """
73 client_id = os.getenv('INTEGRATION_CLIENT_ID')
74 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
75 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
76 redirect_url = 'http://localhost:6001/redirect'
77 if not all((client_id, client_secret, scopes)):
78 raise ValueError('failed to get integration parameters from environment')
79 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
80 redirect_url=redirect_url)
81
82
83def get_tokens() -> Optional[Tokens]:
84 """
85 Tokens are read from a YML file. If needed an OAuth flow is initiated.
86
87 :return: tokens
88 :rtype: :class:`wxc_sdk.tokens.Tokens`
89 """
90
91 integration = build_integration()
92 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
93 return tokens
94
95
96async def main():
97 async def act_on_queue(queue: CallQueue):
98 """
99 Act on a single queue
100 """
101 # we need the queue details b/c the queue instance passed as parameter is from a list() call
102 # ... and thus is missing all the details like agents
103 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
104 agent_names = set(map(agent_name, details.agents))
105
106 def notify(message: str) -> str:
107 """
108 an action notification with queue information
109 """
110 return f'queue "{details.name:{queue_len}}" in "{queue.location_name:{location_len}}": {message}'
111
112 def validate_agents(names: list[str], operation: str) -> list[str]:
113 """
114 check if all names in given list exist as agents on current queue
115 """
116 if 'all' in names:
117 return set(agent_names)
118
119 not_found = [name for name in names if name not in agent_names]
120 if not_found:
121 print('\n'.join(notify(f'{name} not found for {operation}"')
122 for name in not_found),
123 file=sys.stderr)
124 return set(name for name in names if name not in set(not_found))
125
126 # validate list of names or join, unjoin, and remove against actual list of agents
127 to_join = validate_agents(join_agents, 'join')
128 to_unjoin = validate_agents(unjoin_agents, 'unjoin')
129 to_remove = validate_agents(remove_users, 'remove')
130
131 # check for agents we are asked to add but which already exist as agents on the queue
132 existing_agent_ids = set(agent.agent_id for agent in details.agents)
133 agent_exists = [agent_name(user) for user in add_users
134 if user.person_id in existing_agent_ids]
135 if agent_exists:
136 print('\n'.join(notify(f'{name} already is agent')
137 for name in agent_exists),
138 file=sys.stderr)
139 # reduced set of users to add
140 to_add = [user for user in add_users
141 if agent_name(user) not in set(agent_exists)]
142 else:
143 # ... or add all users
144 to_add = add_users
145
146 # the updated list of agents for the current queue
147 new_agents = []
148
149 # do we actually need an update?
150 update_needed = False
151
152 # create copy of each agent instance; we don't want to update the original agent objects
153 # to make sure that details still holds the state before any update
154 agents = [agent.copy(deep=True) for agent in details.agents]
155
156 # iterate through the existing agents and see if we have to apply any change
157 for agent in agents:
158 name = agent_name(agent)
159 # do we have to take action to join this agent?
160 if name in to_join and not agent.join_enabled:
161 print(notify(f'{name}, join'))
162 update_needed = True
163 agent.join_enabled = True
164 # do we have to take action to unjoin this agent?
165 if name in to_unjoin and agent.join_enabled:
166 print(notify(f'{name}, unjoin'))
167 update_needed = True
168 agent.join_enabled = False
169 # do we have to remove this agent?
170 if name in to_remove:
171 print(notify(f'{name}, remove'))
172 update_needed = True
173 # skip to next agent; so that we don't add this agent to the updated list of agents
174 continue
175 new_agents.append(agent)
176
177 # add new agents
178 new_agents.extend(Agent(agent_id=user.person_id, user_type=UserType.people)
179 for user in to_add)
180
181 # update the queue
182 if (update_needed or to_add) and not args.dryrun:
183 # simplified update: we only messed with the agents
184 update = CallQueue(agents=new_agents)
185 await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id,
186 update=update)
187 print(notify('queue updated'))
188 # and get details after the update
189 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
190 print(notify('got details after update'))
191
192 # print summary
193 print(f'queue "{queue.name:{queue_len}}" in "{queue.location_name}"')
194 print(f' phone number: {details.phone_number}')
195 print(f' extension: {details.extension}')
196 print(' agents')
197 if details.agents:
198 name_len = max(map(len, map(agent_name, details.agents)))
199 for agent in details.agents:
200 print(f' {agent_name(agent):{name_len}}: {"not " if not agent.join_enabled else ""}joined')
201 return
202
203 async def validate_users(user_names: list[str]) -> AsyncGenerator[Person, None, None]:
204 """
205 Validate list of names of users to be added and yield a Person instance for each one
206 """
207 # search for all names in parallel
208 lists: list[list[Person]] = await asyncio.gather(
209 *[api.people.list(display_name=name) for name in user_names], return_exceptions=True)
210 for name, user_list in zip(user_names, lists):
211 if isinstance(user_list, Exception):
212 user = None
213 else:
214 user = next((u for u in user_list if name == agent_name(u)), None)
215 if user is None:
216 print(f'user "{name}" not found', file=sys.stderr)
217 continue
218 yield user
219 return
220
221 # parse command line
222 parser = ArgumentParser(description='Modify call queue settings from the CLI')
223 parser.add_argument('--location', '-l', type=str, required=False, nargs='+',
224 help='name of location to work on. If missing then work on all locations.')
225
226 parser.add_argument('--queue', '-q', type=str, required=False, nargs='+',
227 help='name(s) of queue(s) to operate on. If missing then work on all queues in location.')
228
229 parser.add_argument('--join', '-j', type=str, required=False, nargs='+', dest='join_agent',
230 help='Join given user(s) on given queue(s). Can be "all" to act on all agents.')
231
232 parser.add_argument('--unjoin', '-u', type=str, required=False, nargs='+', dest='unjoin_agent',
233 help='Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.')
234
235 parser.add_argument('--remove', '-r', type=str, required=False, nargs='+', dest='remove_user',
236 help='Remove given agent from given queue(s). Can be "all" to act on all agents.')
237
238 parser.add_argument('--add', '-a', type=str, required=False, nargs='+', dest='add_user',
239 help='Add given users to given queue(s).')
240 parser.add_argument('--dryrun', '-d', required=False, action='store_true',
241 help='Dry run; don\'t apply any changes')
242 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
243
244 args = parser.parse_args()
245
246 # get environment variables from .env; required for integration parameters
247 load_dotenv(env_path())
248
249 tokens = args.token or None
250 if tokens is None:
251 # get tokens from cache or create a new set of tokens using the integration defined in .env
252 tokens = get_tokens()
253
254 async with AsWebexSimpleApi(tokens=tokens) as api:
255 # validate location parameter
256 location_names = args.location or []
257
258 # list of all locations with names matching one of the provided names
259 locations = [loc for loc in await api.locations.list()
260 if not location_names or loc.name in set(location_names)]
261
262 if not location_names:
263 print(f'Considering all {len(locations)} locations')
264
265 # set of names of matching locations
266 found_location_names = set(loc.name for loc in locations)
267
268 # Error message for each location name argument not matching an actual location
269 for location_name in location_names:
270 if location_name not in found_location_names:
271 print(f'location "{location_name}" not found', file=sys.stderr)
272
273 if not locations:
274 print('Found no locations to work on', file=sys.stderr)
275 exit(1)
276
277 # which queues do we need to operate on?
278 location_ids = set(loc.location_id for loc in locations)
279 queue_names = args.queue
280 all_queues = queue_names is None
281 # full list of queues
282 queues = await api.telephony.callqueue.list()
283 # filter based on location parameter
284 queues = [queue for queue in queues
285 if (all_queues or queue.name in queue_names) and queue.location_id in location_ids]
286
287 # len of queue names for nicer output
288 queue_len = max(len(queue.name) for queue in queues)
289
290 # now we can actually go back and re-evaluate the list of locations; for the location length we only need
291 # to consider locations we actually have a target queue in
292 location_ids = set(queue.location_id for queue in queues)
293
294 # max length of location names for nicely formatted output
295 location_len = max(len(loc.name)
296 for loc in locations
297 if loc.location_id in location_ids)
298
299 # get the names for join, unjoin, remove, and add
300 join_agents = args.join_agent or []
301 unjoin_agents = args.unjoin_agent or []
302 remove_users = args.remove_user or []
303 add_users = args.add_user or []
304
305 # validate users; make sure that users exist with the provided names
306 add_users = [u async for u in validate_users(user_names=add_users)]
307
308 # apply actions to all queues
309 await asyncio.gather(*[act_on_queue(queue) for queue in queues])
310
311
312if __name__ == '__main__':
313 # enable DEBUG logging to a file; REST log shows all requests
314 logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
315 filemode='w', level=logging.DEBUG)
316 asyncio.run(main())