Examples
Code examples can be found in the examples directory on GitHub
Also take a look at the unit tests in the tests directory.
Get all calling users
Source: calling_users.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org
5"""
6
7from dotenv import load_dotenv
8
9from wxc_sdk import WebexSimpleApi
10
11load_dotenv()
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))
Get all calling users (async variant)
Source: calling_users_async.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org using the (experimental) async API
5"""
6import asyncio
7import time
8
9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv()
14
15
16async def get_calling_users():
17 """
18 Get details of all calling enabled users by:
19 1) getting all calling users
20 2) collecting all users that have a calling license
21 3) getting details for all users
22 """
23 async with AsWebexSimpleApi(concurrent_requests=40) as api:
24 print('Collecting calling licenses')
25 calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26 if lic.webex_calling)
27
28 # get users with a calling license
29 calling_users = [user async for user in api.people.list_gen()
30 if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31 print(f'{len(calling_users)} users:')
32 print('\n'.join(user.display_name for user in calling_users))
33
34 # get details for all users
35 start = time.perf_counter()
36 details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37 for user in calling_users])
38 expired = time.perf_counter() - start
39 print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42asyncio.run(get_calling_users())
Default call forwarding settings for all users
This example start with the list of all calling users and then calls
wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure()
for each user. To speed up things a
ThreadPoolExecutor
is used and all update operations are scheduled for execution by the thread pool.
Source: reset_call_forwarding.py
1#!/usr/bin/env python
2"""
3Example script
4Reset call forwarding to default for all users in the org
5"""
6
7import logging
8from concurrent.futures import ThreadPoolExecutor
9
10from dotenv import load_dotenv
11
12from wxc_sdk import WebexSimpleApi
13from wxc_sdk.all_types import PersonForwardingSetting
14
15load_dotenv()
16
17logging.basicConfig(level=logging.INFO)
18
19# set to DEBUG to see the actual requests
20logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
21
22api = WebexSimpleApi()
23
24# get all calling users
25calling_users = [user for user in api.people.list(calling_data=True)
26 if user.location_id]
27
28# set call forwarding to default for all users
29with ThreadPoolExecutor() as pool:
30 # default call forwarding settings
31 forwarding = PersonForwardingSetting.default()
32
33 # schedule update for each user and wait for completion
34 list(pool.map(
35 lambda user: api.person_settings.forwarding.configure(person_id=user.person_id,
36 forwarding=forwarding),
37 calling_users))
Modify number of rings configuration for users read from CSV
Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.
For all these users a VM setting is updated and the results are written to a CSV for further processing.
Source: modify_voicemail.py
1#!/usr/bin/env python
2"""
3Example Script
4Modifying number of rings configuration in voicemail settings
5Run -> python3 modify_voicemail.py modify_voicemail.csv
6"""
7import csv
8import sys
9import traceback
10from concurrent.futures import ThreadPoolExecutor
11
12from dotenv import load_dotenv
13
14from wxc_sdk import WebexSimpleApi
15from wxc_sdk.all_types import *
16
17VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
18
19# loading environment variables - use .env file for development
20load_dotenv()
21
22
23def update_vm_settings():
24 """
25 actually update VM settings for all users present in input CSV
26 """
27 api = WebexSimpleApi()
28 final_report = []
29 mail_ids = []
30 # using wxc_sdk.people.PeopleApi.list to iterate over persons
31 # Parameter calling_data needs to be set to true to gat calling specific information
32 # calling users have the attribute location_id set
33 calling_users = [user for user in api.people.list(calling_data=True)
34 if user.location_id]
35 print(f'{len(calling_users)} users:')
36 print('\n'.join(user.display_name for user in calling_users))
37
38 # get CSV file name from command line
39 with open(str(sys.argv[1]), 'r') as csv_file:
40 reader = csv.DictReader(csv_file)
41 # read all records from CSV. Ony consider records w/o error
42 for col in reader:
43 if not col['ERROR']:
44 # collect email address from USERNAME column for further processing
45 mail_ids.append(col['USERNAME'])
46 else:
47 final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49 # work on calling users that have an email address that we read from the CSV
50 filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52 print("\nCalling Users in CI - Count ", len(calling_users))
53 print("Mail IDs from input file after removing error columns - Count ", len(mail_ids))
54 print("FilteredUsers Users - Count", len(filteredUsers))
55
56 def set_number_of_rings(user: Person):
57 """
58 Read VM config for a user
59 :param user: user to update
60 """
61 try:
62 # shortcut
63 vm = api.person_settings.voicemail
64
65 # Read Current configuration
66 vm_settings = vm.read(person_id=user.person_id)
67 print(f'\n Existing Configuration: {vm_settings} ')
68
69 # Modify number of rings value
70 vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71 vm.configure(person_id=user.person_id, settings=vm_settings)
72 # Read configuration after changes
73 vm_settings = vm.read(person_id=user.person_id)
74 print(f'\n New Configuration: {vm_settings} ')
75 final_report.append((user.display_name, 'SUCCESS'))
76 except Exception as e:
77 final_report.append((user.display_name, 'FAILURE'))
78 print("type error: " + str(e))
79 print(traceback.format_exc())
80 return
81
82 # Modify settings for the filtered users
83 with ThreadPoolExecutor() as pool:
84 list(pool.map(lambda user: set_number_of_rings(user),
85 filteredUsers))
86
87 print(final_report)
88 with open('output.csv', 'w') as f:
89 write = csv.writer(f)
90 write.writerow(["USERNAME", "STATUS"])
91 write.writerows(final_report)
92
93
94if __name__ == '__main__':
95 update_vm_settings()
Holiday schedule w/ US national holidays for all US locations
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5"""
6
7import logging
8from collections import defaultdict
9from concurrent.futures import ThreadPoolExecutor
10from datetime import date
11from threading import Lock
12from typing import List
13
14from dotenv import load_dotenv
15
16from calendarific import CalendarifiyApi, Holiday
17from wxc_sdk import WebexSimpleApi
18from wxc_sdk.locations import Location
19from wxc_sdk.all_types import ScheduleType, Event, Schedule
20
21log = logging.getLogger(__name__)
22
23# a lock per location to protect observe_in_location()
24location_locks: dict[str, Lock] = defaultdict(Lock)
25
26# Use parallel threads for provisioning?
27USE_THREADING = True
28
29# True: delete holiday schedule instead of creating one
30CLEAN_UP = False
31
32# first and last year for which to create public holiday events
33FIRST_YEAR = 2022
34LAST_YEAR = 2022
35
36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
37
38
39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
40 """
41 create/update a "National Holiday" schedule in one location
42
43 :param api: Webex api
44 :type api: WebexSimpleApi
45 :param location: location to work on
46 :type location: Location
47 :param holidays: list of holidays to observe
48 :type holidays: List[Holiday]
49 """
50 # there should always only one thread messing with the holiday schedule of a location
51 with location_locks[location.location_id]:
52 year = holidays[0].date.year
53 schedule_name = 'National Holidays'
54
55 # shortcut
56 ats = api.telephony.schedules
57
58 # existing "National Holiday" schedule or None
59 schedule = next((schedule
60 for schedule in ats.list(obj_id=location.location_id,
61 schedule_type=ScheduleType.holidays,
62 name=schedule_name)
63 if schedule.name == schedule_name),
64 None)
65 if CLEAN_UP:
66 if schedule:
67 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
68 ats.delete_schedule(obj_id=location.location_id,
69 schedule_type=ScheduleType.holidays,
70 schedule_id=schedule.schedule_id)
71 return
72 if schedule:
73 # we need the details: list response doesn't have events
74 schedule = ats.details(obj_id=location.location_id,
75 schedule_type=ScheduleType.holidays,
76 schedule_id=schedule.schedule_id)
77 # create list of desired schedule entries
78 # * one per holiday
79 # * only future holidays
80 # * not on a Sunday
81 today = date.today()
82 events = [Event(name=f'{holiday.name} {holiday.date.year}',
83 start_date=holiday.date,
84 end_date=holiday.date,
85 all_day_enabled=True)
86 for holiday in holidays
87 if holiday.date >= today and holiday.date.weekday() != 6]
88
89 if not schedule:
90 # create new schedule
91 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
92 if not events:
93 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
94 return
95 schedule = Schedule(name=schedule_name,
96 schedule_type=ScheduleType.holidays,
97 events=events)
98 log.debug(
99 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100 f'events')
101 schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103 return
104
105 # update existing schedule
106 with ThreadPoolExecutor() as pool:
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_THREADING:
114 list(pool.map(
115 lambda event: ats.event_delete(obj_id=location.location_id,
116 schedule_type=ScheduleType.holidays,
117 event_id=event.event_id),
118 to_delete))
119 else:
120 for event in to_delete:
121 ats.event_delete(obj_id=location.location_id,
122 schedule_type=ScheduleType.holidays,
123 event_id=event.event_id)
124
125 # add events which don't exist yet
126 existing_dates = set(event.start_date
127 for event in schedule.events)
128 to_add = [event
129 for event in events
130 if event.start_date not in existing_dates]
131 if not to_add:
132 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
133 return
134 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
135 if USE_THREADING:
136 list(pool.map(
137 lambda event: ats.event_create(
138 obj_id=location.location_id,
139 schedule_type=ScheduleType.holidays,
140 schedule_id=schedule.schedule_id,
141 event=event),
142 to_add))
143 else:
144 for event in to_add:
145 ats.event_create(
146 obj_id=location.location_id,
147 schedule_type=ScheduleType.holidays,
148 schedule_id=schedule.schedule_id,
149 event=event)
150 log.info(f'observe_in_location({location.name}, {year}): done.')
151 return
152
153
154def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
155 year: int = None):
156 """
157 US national holidays for given locations
158
159 :param api: Webex api
160 :type api: WebexSimpleApi
161 :param locations: list of locations in which US national holidays should be observed
162 :type locations: List[Location]
163 :param year: year for national holidays. Default: current year
164 :type year: int
165 """
166 # default: this year
167 year = year or date.today().year
168
169 # get national holidays for specified year
170 holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171
172 # update holiday schedule for each location
173 with ThreadPoolExecutor() as pool:
174 if USE_THREADING:
175 list(pool.map(
176 lambda location: observe_in_location(api=api, location=location, holidays=holidays),
177 locations))
178 else:
179 for location in locations:
180 observe_in_location(api=api, location=location, holidays=holidays)
181 return
182
183
184if __name__ == '__main__':
185 # read dotenv which has some environment variables like Webex API token and Calendarify
186 # API key.
187 load_dotenv()
188
189 # enable logging
190 logging.basicConfig(level=logging.DEBUG,
191 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192 logging.getLogger('urllib3').setLevel(logging.INFO)
193 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
194
195 # the actual action
196 with WebexSimpleApi(concurrent_requests=5) as wx_api:
197 # get all US locations
198 log.info('Getting locations...')
199 us_locations = [location
200 for location in wx_api.locations.list()
201 if location.address.country == 'US']
202
203 # create national holiday schedule for given year(s) and locations
204 start_year = 2022
205 last_year = 2023
206 if USE_THREADING:
207 with ThreadPoolExecutor() as pool:
208 list(pool.map(
209 lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
210 range(FIRST_YEAR, LAST_YEAR + 1)))
211 else:
212 for year in range(FIRST_YEAR, LAST_YEAR + 1):
213 observe_national_holidays(api=wx_api, year=year, locations=us_locations)
Holiday schedule w/ US national holidays for all US locations (async variant)
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays_async.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5
6Using the asyc SDK variant
7"""
8import asyncio
9import functools
10import logging
11from collections import defaultdict
12from datetime import date
13from typing import List
14
15from dotenv import load_dotenv
16
17from calendarific import CalendarifiyApi, Holiday
18from wxc_sdk.all_types import ScheduleType, Event, Schedule
19from wxc_sdk.as_api import AsWebexSimpleApi
20from wxc_sdk.locations import Location
21
22log = logging.getLogger(__name__)
23
24# a lock per location to protect observe_in_location()
25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
26
27# Use parallel tasks for provisioning?
28USE_TASKS = True
29
30# True: delete holiday schedule instead of creating one
31CLEAN_UP = False
32
33# first and last year for which to create public holiday events
34FIRST_YEAR = 2022
35LAST_YEAR = 2023
36
37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
38
39
40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
41 """
42 create/update a "National Holiday" schedule in one location
43
44 :param api: Webex api
45 :type api: WebexSimpleApi
46 :param location: location to work on
47 :type location: Location
48 :param holidays: list of holidays to observe
49 :type holidays: List[Holiday]
50 """
51 # there should always only one thread messing with the holiday schedule of a location
52 async with location_locks[location.location_id]:
53 year = holidays[0].date.year
54 schedule_name = 'National Holidays'
55
56 # shortcut
57 ats = api.telephony.schedules
58
59 # existing "National Holiday" schedule or None
60 schedule = next((schedule
61 for schedule in await ats.list(obj_id=location.location_id,
62 schedule_type=ScheduleType.holidays,
63 name=schedule_name)
64 if schedule.name == schedule_name),
65 None)
66 if CLEAN_UP:
67 if schedule:
68 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
69 await ats.delete_schedule(obj_id=location.location_id,
70 schedule_type=ScheduleType.holidays,
71 schedule_id=schedule.schedule_id)
72 return
73 if schedule:
74 # we need the details: list response doesn't have events
75 schedule = await ats.details(obj_id=location.location_id,
76 schedule_type=ScheduleType.holidays,
77 schedule_id=schedule.schedule_id)
78 # create list of desired schedule entries
79 # * one per holiday
80 # * only future holidays
81 # * not on a Sunday
82 today = date.today()
83 events = [Event(name=f'{holiday.name} {holiday.date.year}',
84 start_date=holiday.date,
85 end_date=holiday.date,
86 all_day_enabled=True)
87 for holiday in holidays
88 if holiday.date >= today and holiday.date.weekday() != 6]
89
90 if not schedule:
91 # create new schedule
92 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
93 if not events:
94 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
95 return
96 schedule = Schedule(name=schedule_name,
97 schedule_type=ScheduleType.holidays,
98 events=events)
99 log.debug(
100 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101 f'events')
102 schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104 return
105
106 # update existing schedule
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_TASKS:
114 await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115 schedule_type=ScheduleType.holidays,
116 event_id=event.event_id)
117 for event in to_delete])
118 else:
119 for event in to_delete:
120 await ats.event_delete(obj_id=location.location_id,
121 schedule_type=ScheduleType.holidays,
122 event_id=event.event_id)
123
124 # add events which don't exist yet
125 existing_dates = set(event.start_date
126 for event in schedule.events)
127 to_add = [event
128 for event in events
129 if event.start_date not in existing_dates]
130 if not to_add:
131 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
132 return
133 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
134 if USE_TASKS:
135 await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
136 schedule_type=ScheduleType.holidays,
137 schedule_id=schedule.schedule_id,
138 event=event)
139 for event in to_add])
140 else:
141 for event in to_add:
142 await ats.event_create(obj_id=location.location_id,
143 schedule_type=ScheduleType.holidays,
144 schedule_id=schedule.schedule_id,
145 event=event)
146 log.info(f'observe_in_location({location.name}, {year}): done.')
147 return
148
149
150async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
151 year: int = None):
152 """
153 US national holidays for given locations
154
155 :param api: Webex api
156 :type api: WebexSimpleApi
157 :param locations: list of locations in which US national holidays should be observed
158 :type locations: List[Location]
159 :param year: year for national holidays. Default: current year
160 :type year: int
161 """
162 # default: this year
163 year = year or date.today().year
164
165 # get national holidays for specified year
166 loop = asyncio.get_running_loop()
167 # avoid sync all:
168 # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
169 holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
170 country='US', year=year, holiday_type='national'))
171
172 # update holiday schedule for each location
173 if USE_TASKS:
174 await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
175 for location in locations])
176 else:
177 for location in locations:
178 await observe_in_location(api=api, location=location, holidays=holidays)
179 return
180
181
182if __name__ == '__main__':
183 # read dotenv which has some environment variables like Webex API token and Calendarify
184 # API key.
185 load_dotenv()
186
187 # enable logging
188 logging.basicConfig(level=logging.DEBUG,
189 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
190 logging.getLogger('urllib3').setLevel(logging.INFO)
191 logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
192
193 # the actual action
194 async def do_provision():
195
196 async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
197 # get all US locations
198 log.info('Getting locations...')
199 us_locations = [location
200 for location in await wx_api.locations.list()
201 if location.address.country == 'US']
202
203 # create national holiday schedule for given year(s) and locations
204 if USE_TASKS:
205 await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
206 for year in range(FIRST_YEAR, LAST_YEAR + 1)])
207 else:
208 for year in range(FIRST_YEAR, LAST_YEAR + 1):
209 await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
210
211 asyncio.run(do_provision())
Persist tokens and obtain new tokens interactively
A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.
This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python
webbrowser module and calling the open()
method with the authorization URL of a given integration to open that
URL in the system web browser. The user can then authenticate and grant access to the integration. In the last
step of a successful authorization flow the web browser is redirected to the redirect_url
of the
integration.
The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect
.
This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.
The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID
,
TOKEN_INTEGRATION_CLIENT_SECRET
, TOKEN_INTEGRATION_CLIENT_SCOPES
). These variables can also be defined in
get_tokens.env
in the current directory:
# rename this to get_tokens.env and set values
# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=
# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=
# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"
The sample code persists the tokens in get_tokens.yml
in the current directory. On startup the sample code tries
to read tokens from that file. If needed a new access token is obtained using the refresh token.
An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml
Source: get_tokens.py
1#!/usr/bin/env python
2"""
3Example script
4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
5the local web browser
6"""
7import json
8import logging
9import os
10from typing import Optional
11
12from dotenv import load_dotenv
13from yaml import safe_load, safe_dump
14
15from wxc_sdk import WebexSimpleApi
16from wxc_sdk.integration import Integration
17from wxc_sdk.scopes import parse_scopes
18from wxc_sdk.tokens import Tokens
19
20log = logging.getLogger(__name__)
21
22
23def env_path() -> str:
24 """
25 determine path for .env to load environment variables from
26
27 :return: .env file path
28 """
29 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
30
31
32def yml_path() -> str:
33 """
34 determine path of YML file to persist tokens
35
36 :return: path to YML file
37 :rtype: str
38 """
39 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
40
41
42def build_integration() -> Integration:
43 """
44 read integration parameters from environment variables and create an integration
45
46 :return: :class:`wxc_sdk.integration.Integration` instance
47 """
48 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
49 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
50 scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
51 redirect_url = 'http://localhost:6001/redirect'
52 if not all((client_id, client_secret, scopes)):
53 raise ValueError('failed to get integration parameters from environment')
54 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
55 redirect_url=redirect_url)
56
57
58def get_tokens() -> Optional[Tokens]:
59 """
60
61 Tokens are read from a YML file. If needed an OAuth flow is initiated.
62
63 :return: tokens
64 :rtype: :class:`wxc_sdk.tokens.Tokens`
65 """
66
67 def write_tokens(tokens_to_cache: Tokens):
68 with open(yml_path(), mode='w') as f:
69 safe_dump(json.loads(tokens_to_cache.json()), f)
70 return
71
72 def read_tokens() -> Optional[Tokens]:
73 try:
74 with open(yml_path(), mode='r') as f:
75 data = safe_load(f)
76 tokens_read = Tokens.parse_obj(data)
77 except Exception as e:
78 log.info(f'failed to read tokens from file: {e}')
79 tokens_read = None
80 return tokens_read
81
82 integration = build_integration()
83 tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
84 write_to_cache=write_tokens)
85 return tokens
86
87
88logging.basicConfig(level=logging.DEBUG)
89
90# load environment variables from .env
91path = env_path()
92log.info(f'reading {path}')
93load_dotenv(env_path())
94
95tokens = get_tokens()
96
97# use the tokens to get identity of authenticated user
98api = WebexSimpleApi(tokens=tokens)
99me = api.people.me()
100print(f'authenticated as {me.display_name} ({me.emails[0]}) ')
Read/update call intercept settings of a user
usage: call_intercept.py [-h] [–token TOKEN] user_email [{on,off}]positional arguments:user_email email address of user{on,off} operation to applyoptions:-h, –help show this help message and exit–token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a .env
file
Source: call_intercept.py
1#!/usr/bin/env python
2"""
3Script to read/update call intercept settings of a calling user.
4
5The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
6obtains tokens via an OAuth flow.
7
8 usage: call_intercept.py [-h] [--token TOKEN] user_email [{on,off}]
9
10 positional arguments:
11 user_email email address of user
12 {on,off} operation to apply
13
14 options:
15 -h, --help show this help message and exit
16 --token TOKEN admin access token to use
17"""
18import argparse
19import logging
20import os
21import re
22import sys
23from json import loads
24from typing import Optional
25
26from dotenv import load_dotenv
27from wxc_sdk import WebexSimpleApi
28from wxc_sdk.integration import Integration
29from wxc_sdk.person_settings.call_intercept import InterceptSetting
30from wxc_sdk.rest import RestError
31from wxc_sdk.scopes import parse_scopes
32from wxc_sdk.tokens import Tokens
33from yaml import safe_dump, safe_load
34
35log = logging.getLogger(__name__)
36
37
38def env_path() -> str:
39 """
40 determine path for .env to load environment variables from
41
42 :return: .env file path
43 """
44 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
45
46
47def yml_path() -> str:
48 """
49 determine path of YML file to persist tokens
50
51 :return: path to YML file
52 :rtype: str
53 """
54 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
55
56
57def build_integration() -> Integration:
58 """
59 read integration parameters from environment variables and create an integration
60
61 :return: :class:`wxc_sdk.integration.Integration` instance
62 """
63 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
64 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
65 scopes = os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES')
66 if scopes:
67 scopes = parse_scopes(scopes)
68 if not all((client_id, client_secret, scopes)):
69 raise ValueError('failed to get integration parameters from environment')
70 redirect_url = 'http://localhost:6001/redirect'
71 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
72 redirect_url=redirect_url)
73
74
75def get_tokens() -> Optional[Tokens]:
76 """
77
78 Tokens are read from a YML file. If needed an OAuth flow is initiated.
79
80 :return: tokens
81 :rtype: :class:`wxc_sdk.tokens.Tokens`
82 """
83
84 def write_tokens(tokens_to_cache: Tokens):
85 with open(yml_path(), mode='w') as f:
86 safe_dump(loads(tokens_to_cache.json()), f)
87 return
88
89 def read_tokens() -> Optional[Tokens]:
90 try:
91 with open(yml_path(), mode='r') as f:
92 data = safe_load(f)
93 tokens_read = Tokens.parse_obj(data)
94 except Exception as e:
95 log.info(f'failed to read tokens from file: {e}')
96 tokens_read = None
97 return tokens_read
98
99 integration = build_integration()
100 tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
101 write_to_cache=write_tokens)
102 return tokens
103
104
105RE_EMAIL = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
106
107
108def email_type(value):
109 if not RE_EMAIL.match(value):
110 raise argparse.ArgumentTypeError(f"'{value}' is not a valid email")
111 return value
112
113
114def main():
115 """
116 where the magic happens
117 """
118 # read .env file with the settings for the integration to be used to obtain tokens
119 load_dotenv(env_path())
120
121 parser = argparse.ArgumentParser()
122 parser.add_argument('user_email', type=email_type, help='email address of user')
123 parser.add_argument('on_off', choices=['on', 'off'], nargs='?', help='operation to apply')
124 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
125 args = parser.parse_args()
126
127 if args.token:
128 tokens = args.token
129 elif (tokens := os.getenv('WEBEX_ACCESS_TOKEN')) is None:
130 tokens = get_tokens()
131
132 if not tokens:
133 print('Failed to get tokens', file=sys.stderr)
134 exit(1)
135
136 # set level to DEBUG to see debug of REST requests
137 logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
138
139 with WebexSimpleApi(tokens=tokens) as api:
140 # get user
141 email = args.user_email.lower()
142 user = next((user
143 for user in api.people.list(email=email)
144 if user.emails[0] == email), None)
145 if user is None:
146 print(f'User "{email}" not found', file=sys.stderr)
147 exit(1)
148
149 # display call intercept status
150 try:
151 intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
152 except RestError as e:
153 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
154 exit(1)
155
156 print('on' if intercept.enabled else 'off')
157 if args.on_off:
158 # action: turn on/off
159 intercept = InterceptSetting.default()
160 intercept.enabled = args.on_off == 'on'
161 try:
162 api.person_settings.call_intercept.configure(person_id=user.person_id,
163 intercept=intercept)
164 except RestError as e:
165 print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
166 exit(1)
167
168 # read call intercept again
169 try:
170 intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
171 except RestError as e:
172 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
173 exit(1)
174
175 # display state after update
176 print(f"set to {'on' if intercept.enabled else 'off'}")
177
178 exit(0)
179
180
181if __name__ == '__main__':
182 main()