Browse Source

Enhancement & bugfix for the on demand spawning strategy

Yann Weber 4 years ago
parent
commit
e2dd5a1be4
2 changed files with 46 additions and 21 deletions
  1. 10
    0
      src/pyworker.c
  2. 36
    21
      src/responder.c

+ 10
- 0
src/pyworker.c View File

@@ -75,6 +75,16 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
75 75
 		"Worker[%d] Waiting request with %s.%s()", wrk_id,
76 76
 		py_entrypoint, PYENTRY_FUNNAME);
77 77
 
78
+
79
+	sop.sem_op = 1; //indicate worker as ready & idle
80
+	if(semop(semid, &sop, 1) < 0)
81
+	{
82
+		err = errno;
83
+		syslog(LOG_ERR,
84
+		       "Worker[%d] error incrementing the semaphore : %s",
85
+		       wrk_id, strerror(err));
86
+	}
87
+
78 88
 	count = 0;
79 89
 	while ((!count || count != max_reqs) && FCGI_Accept() >= 0)
80 90
 	{

+ 36
- 21
src/responder.c View File

@@ -35,12 +35,14 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
35 35
 	pid_t ret;
36 36
 	struct sembuf sop;
37 37
 	struct timespec timeout;
38
+	short idle;
38 39
 
39 40
 	sop.sem_num = 0;
40 41
 	sop.sem_op = 0;
41 42
 	sop.sem_flg = 0;
42 43
 	timeout.tv_sec = 0;
43 44
 	timeout.tv_nsec = 100000000;
45
+	idle = 0;
44 46
 
45 47
 	syslog(LOG_INFO, "Preparing workers");
46 48
 
@@ -66,6 +68,8 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
66 68
 	{
67 69
 		wrk_pids[n_wrk] = spawn(py_entrypoint, n_wrk, semid, max_reqs);
68 70
 	}
71
+	//Wait at least for a process to be ready
72
+	while(!semtimedop(semid, &sop, 1, &timeout));
69 73
 
70 74
 	// main loop, taking care to restart terminated workers, 
71 75
 	// spawn new one if needed, etc.
@@ -95,6 +99,18 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
95 99
 				       ret);
96 100
 				continue;
97 101
 			}
102
+			idle=0;
103
+			sop.sem_op = -1;
104
+			ret = semop(semid, &sop, 1);
105
+			sop.sem_op = 0;
106
+			if(ret < 0)
107
+			{
108
+				err = errno;
109
+				syslog(LOG_ALERT,
110
+				       "Unable to dec sem after child exit : %s",
111
+				       strerror(err));
112
+				clean_exit(err);
113
+			}
98 114
 			if(status)
99 115
 			{
100 116
 				syslog(LOG_WARNING,
@@ -110,29 +126,34 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
110 126
 			// child stopped, looking for it
111 127
 			if(wanted_n < n_wrk)
112 128
 			{	// need to shift the list and dec n_wrk
129
+				syslog(LOG_DEBUG, "GC Workers");
130
+syslog( LOG_DEBUG, "GC want %d have %d", wanted_n, n_wrk);
113 131
 				memmove(wrk_pids+n, wrk_pids+n+1,
114 132
 				        sizeof(pid_t) * (n_wrk - n));
115 133
 				n_wrk--;
116 134
 			}
117 135
 			else
118 136
 			{	// respawn on same slot
137
+				syslog(LOG_INFO, "respawn #%d", n);
119 138
 				wrk_pids[n] = spawn(py_entrypoint, n,
120 139
 				                    semid, max_reqs);
140
+				continue;
121 141
 			}
122 142
 		}
123
-		if(n_wrk == max_wrk)
124
-		{
125
-			nanosleep(&timeout, NULL);
126
-			continue;
127
-		}
128 143
 		ret = semtimedop(semid, &sop, 1, &timeout);
144
+syslog( LOG_DEBUG, "semtimeop ret=%d want %d have %d", ret, wanted_n, n_wrk);
129 145
 		if(ret < 0)
130 146
 		{
131 147
 			err = errno;
132 148
 			if(err == EAGAIN)
133 149
 			{
150
+syslog(LOG_DEBUG, "IDLE want %d have %d\t min=%d", wanted_n, n_wrk, min_wrk);
134 151
 				// workers idle
135
-				if(wanted_n > min_wrk)
152
+				if(!idle)
153
+				{
154
+					idle = 1;
155
+				}
156
+				else if(wanted_n > min_wrk && n_wrk - wanted_n < 2)
136 157
 				{
137 158
 					wanted_n--;
138 159
 				}
@@ -141,8 +162,9 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
141 162
 			syslog(LOG_ERR, "Unable to read semaphore : %s",
142 163
 			       strerror(err));
143 164
 		}
144
-		if(!ret)
165
+		if(!ret && n_wrk < max_wrk)
145 166
 		{
167
+			idle=0;
146 168
 			syslog( LOG_DEBUG,
147 169
 				"All workers busy, spawning a new one");
148 170
 			n = n_wrk;
@@ -168,21 +190,10 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
168 190
 pid_t spawn(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
169 191
 {
170 192
 	pid_t res;
171
-	struct sembuf sop;
172
-	int err;
193
+	struct timespec timeout;
194
+	timeout.tv_sec = 0;
195
+	timeout.tv_nsec = 100000000;
173 196
 	
174
-	sop.sem_num = 0;
175
-	sop.sem_op = 1;
176
-	sop.sem_flg = 0;
177
-
178
-	if(semop(semid, &sop, 1) < 0)
179
-	{
180
-		err = errno;
181
-		syslog(LOG_ALERT,
182
-		       "Failed to semop before spawning a child : %s",
183
-		       strerror(errno));
184
-		clean_exit(err);
185
-	}
186 197
 
187 198
 	res = fork();
188 199
 	if(res == -1)
@@ -196,6 +207,10 @@ pid_t spawn(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
196 207
 		// Child process
197 208
 		exit(work(py_entrypoint, wrk_id, semid, max_reqs));
198 209
 	}
210
+	// Sleep to avoid spawning like hell thinking all workers are
211
+	// busy. Let some time to this one to go up...
212
+	// TODO: find a better way to avoid spawning to max_wrk
213
+	nanosleep(&timeout, NULL);
199 214
 	syslog(	LOG_INFO,
200 215
 		"Worker #%d spawned with PID %d", wrk_id, res);
201 216
 	return res;

Loading…
Cancel
Save